# -*- coding: utf-8 -*-
from slixmpp.test import *
import slixmpp.plugins.xep_0323 as xep_0323
namespace='sn'
class TestSensorDataStanzas(SlixTest):
def setUp(self):
pass
#register_stanza_plugin(Iq, xep_0323.stanza.Request)
#register_stanza_plugin(Iq, xep_0323.stanza.Accepted)
#register_stanza_plugin(Message, xep_0323.stanza.Failure)
#register_stanza_plugin(xep_0323.stanza.Failure, xep_0323.stanza.Error)
#register_stanza_plugin(Iq, xep_0323.stanza.Rejected)
#register_stanza_plugin(Message, xep_0323.stanza.Fields)
#register_stanza_plugin(Message, xep_0323.stanza.Request)
#register_stanza_plugin(Message, xep_0323.stanza.Accepted)
#register_stanza_plugin(Message, xep_0323.stanza.Failure)
# register_stanza_plugin(Message, xep_0323.stanza.Result)
# register_stanza_plugin(Message, xep_0323.stanza.Gone)
# register_stanza_plugin(Message, xep_0323.stanza.Inactive)
# register_stanza_plugin(Message, xep_0323.stanza.Paused)
def testRequest(self):
"""
test of request stanza
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
self.check(iq,"""
"""
)
def testRequestNodes(self):
"""
test of request nodes stanza
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
iq['req'].add_node("Device02", "Source02", "CacheType");
iq['req'].add_node("Device44");
self.check(iq,"""
"""
)
iq['req'].del_node("Device02");
self.check(iq,"""
"""
)
iq['req'].del_nodes();
self.check(iq,"""
"""
)
def testRequestField(self):
"""
test of request field stanza
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
iq['req'].add_field("Top temperature");
iq['req'].add_field("Bottom temperature");
self.check(iq,"""
"""
)
iq['req'].del_field("Top temperature")
self.check(iq,"""
"""
)
iq['req'].del_fields()
self.check(iq,"""
"""
)
def testAccepted(self):
"""
test of request stanza
"""
iq = self.Iq()
iq['type'] = 'result'
iq['from'] = 'device@clayster.com'
iq['to'] = 'master@clayster.com/amr'
iq['id'] = '2'
iq['accepted']['seqnr'] = '2'
self.check(iq,"""
"""
)
def testRejected(self):
"""
test of request stanza
"""
iq = self.Iq()
iq['type'] = 'error'
iq['from'] = 'device@clayster.com'
iq['to'] = 'master@clayster.com/amr'
iq['id'] = '4'
iq['rejected']['seqnr'] = '4'
iq['rejected']['error'] = 'Access denied.'
self.check(iq,"""
Access denied.
"""
)
def testFailure(self):
"""
test of failure stanza
"""
msg = self.Message()
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['failure']['seqnr'] = '3'
msg['failure']['done'] = 'true'
msg['failure']['error']['nodeId'] = 'Device01'
msg['failure']['error']['timestamp'] = '2013-03-07T17:13:30'
msg['failure']['error']['text'] = 'Timeout.'
self.check(msg,"""
Timeout.
"""
)
def testFields(self):
"""
test of fields stanza
"""
msg = self.Message()
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['fields']['seqnr'] = '1'
node = msg['fields'].add_node("Device02");
ts = node.add_timestamp("2013-03-07T16:24:30");
data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K');
data['momentary'] = 'true';
data['automaticReadout'] = 'true';
self.check(msg,"""
"""
)
node = msg['fields'].add_node("EmptyDevice");
node = msg['fields'].add_node("Device04");
ts = node.add_timestamp("EmptyTimestamp");
self.check(msg,"""
"""
)
node = msg['fields'].add_node("Device77");
ts = node.add_timestamp("2013-05-03T12:00:01");
data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K');
data['historicalDay'] = 'true';
data = ts.add_data(typename="numeric", name="Speed", value="312.42", unit='km/h');
data['historicalWeek'] = 'false';
data = ts.add_data(typename="string", name="Temperature name", value="Bottom oil");
data['historicalMonth'] = 'true';
data = ts.add_data(typename="string", name="Speed name", value="Top speed");
data['historicalQuarter'] = 'false';
data = ts.add_data(typename="dateTime", name="T1", value="1979-01-01T00:00:00");
data['historicalYear'] = 'true';
data = ts.add_data(typename="dateTime", name="T2", value="2000-01-01T01:02:03");
data['historicalOther'] = 'false';
data = ts.add_data(typename="timeSpan", name="TS1", value="P5Y");
data['missing'] = 'true';
data = ts.add_data(typename="timeSpan", name="TS2", value="PT2M1S");
data['manualEstimate'] = 'false';
data = ts.add_data(typename="enum", name="top color", value="red", dataType="string");
data['invoiced'] = 'true';
data = ts.add_data(typename="enum", name="bottom color", value="black", dataType="string");
data['powerFailure'] = 'false';
data = ts.add_data(typename="boolean", name="Temperature real", value="false");
data['historicalDay'] = 'true';
data = ts.add_data(typename="boolean", name="Speed real", value="true");
data['historicalWeek'] = 'false';
self.check(msg,"""
"""
)
def testTimestamp(self):
msg = self.Message();
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['fields']['seqnr'] = '1'
node = msg['fields'].add_node("Device02");
node = msg['fields'].add_node("Device03");
ts = node.add_timestamp("2013-03-07T16:24:30");
ts = node.add_timestamp("2013-03-07T16:24:31");
self.check(msg,"""
"""
)
def testStringIdsMatcher(self):
"""
test of StringIds follow spec
"""
emptyStringIdXML=''
msg = self.Message()
msg['fields']['stringIds'] = "Nisse"
self.check(msg,emptyStringIdXML)
msg['fields']['stringIds'] = "Nisse___nje#"
self.check(msg,emptyStringIdXML)
msg['fields']['stringIds'] = "1"
self.check(msg,emptyStringIdXML)
suite = unittest.TestLoader().loadTestsFromTestCase(TestSensorDataStanzas)