# -*- coding: utf-8 -*- from slixmpp.test import * import slixmpp.plugins.xep_0323 as xep_0323 namespace='sn' class TestSensorDataStanzas(SlixTest): def setUp(self): pass #register_stanza_plugin(Iq, xep_0323.stanza.Request) #register_stanza_plugin(Iq, xep_0323.stanza.Accepted) #register_stanza_plugin(Message, xep_0323.stanza.Failure) #register_stanza_plugin(xep_0323.stanza.Failure, xep_0323.stanza.Error) #register_stanza_plugin(Iq, xep_0323.stanza.Rejected) #register_stanza_plugin(Message, xep_0323.stanza.Fields) #register_stanza_plugin(Message, xep_0323.stanza.Request) #register_stanza_plugin(Message, xep_0323.stanza.Accepted) #register_stanza_plugin(Message, xep_0323.stanza.Failure) # register_stanza_plugin(Message, xep_0323.stanza.Result) # register_stanza_plugin(Message, xep_0323.stanza.Gone) # register_stanza_plugin(Message, xep_0323.stanza.Inactive) # register_stanza_plugin(Message, xep_0323.stanza.Paused) def testRequest(self): """ test of request stanza """ iq = self.Iq() iq['type'] = 'get' iq['from'] = 'master@clayster.com/amr' iq['to'] = 'device@clayster.com' iq['id'] = '1' iq['req']['seqnr'] = '1' iq['req']['momentary'] = 'true' self.check(iq,""" """ ) def testRequestNodes(self): """ test of request nodes stanza """ iq = self.Iq() iq['type'] = 'get' iq['from'] = 'master@clayster.com/amr' iq['to'] = 'device@clayster.com' iq['id'] = '1' iq['req']['seqnr'] = '1' iq['req']['momentary'] = 'true' iq['req'].add_node("Device02", "Source02", "CacheType"); iq['req'].add_node("Device44"); self.check(iq,""" """ ) iq['req'].del_node("Device02"); self.check(iq,""" """ ) iq['req'].del_nodes(); self.check(iq,""" """ ) def testRequestField(self): """ test of request field stanza """ iq = self.Iq() iq['type'] = 'get' iq['from'] = 'master@clayster.com/amr' iq['to'] = 'device@clayster.com' iq['id'] = '1' iq['req']['seqnr'] = '1' iq['req']['momentary'] = 'true' iq['req'].add_field("Top temperature"); iq['req'].add_field("Bottom temperature"); self.check(iq,""" """ ) iq['req'].del_field("Top temperature") self.check(iq,""" """ ) iq['req'].del_fields() self.check(iq,""" """ ) def testAccepted(self): """ test of request stanza """ iq = self.Iq() iq['type'] = 'result' iq['from'] = 'device@clayster.com' iq['to'] = 'master@clayster.com/amr' iq['id'] = '2' iq['accepted']['seqnr'] = '2' self.check(iq,""" """ ) def testRejected(self): """ test of request stanza """ iq = self.Iq() iq['type'] = 'error' iq['from'] = 'device@clayster.com' iq['to'] = 'master@clayster.com/amr' iq['id'] = '4' iq['rejected']['seqnr'] = '4' iq['rejected']['error'] = 'Access denied.' self.check(iq,""" Access denied. """ ) def testFailure(self): """ test of failure stanza """ msg = self.Message() msg['from'] = 'device@clayster.com' msg['to'] = 'master@clayster.com/amr' msg['failure']['seqnr'] = '3' msg['failure']['done'] = 'true' msg['failure']['error']['nodeId'] = 'Device01' msg['failure']['error']['timestamp'] = '2013-03-07T17:13:30' msg['failure']['error']['text'] = 'Timeout.' self.check(msg,""" Timeout. """ ) def testFields(self): """ test of fields stanza """ msg = self.Message() msg['from'] = 'device@clayster.com' msg['to'] = 'master@clayster.com/amr' msg['fields']['seqnr'] = '1' node = msg['fields'].add_node("Device02"); ts = node.add_timestamp("2013-03-07T16:24:30"); data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K'); data['momentary'] = 'true'; data['automaticReadout'] = 'true'; self.check(msg,""" """ ) node = msg['fields'].add_node("EmptyDevice"); node = msg['fields'].add_node("Device04"); ts = node.add_timestamp("EmptyTimestamp"); self.check(msg,""" """ ) node = msg['fields'].add_node("Device77"); ts = node.add_timestamp("2013-05-03T12:00:01"); data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K'); data['historicalDay'] = 'true'; data = ts.add_data(typename="numeric", name="Speed", value="312.42", unit='km/h'); data['historicalWeek'] = 'false'; data = ts.add_data(typename="string", name="Temperature name", value="Bottom oil"); data['historicalMonth'] = 'true'; data = ts.add_data(typename="string", name="Speed name", value="Top speed"); data['historicalQuarter'] = 'false'; data = ts.add_data(typename="dateTime", name="T1", value="1979-01-01T00:00:00"); data['historicalYear'] = 'true'; data = ts.add_data(typename="dateTime", name="T2", value="2000-01-01T01:02:03"); data['historicalOther'] = 'false'; data = ts.add_data(typename="timeSpan", name="TS1", value="P5Y"); data['missing'] = 'true'; data = ts.add_data(typename="timeSpan", name="TS2", value="PT2M1S"); data['manualEstimate'] = 'false'; data = ts.add_data(typename="enum", name="top color", value="red", dataType="string"); data['invoiced'] = 'true'; data = ts.add_data(typename="enum", name="bottom color", value="black", dataType="string"); data['powerFailure'] = 'false'; data = ts.add_data(typename="boolean", name="Temperature real", value="false"); data['historicalDay'] = 'true'; data = ts.add_data(typename="boolean", name="Speed real", value="true"); data['historicalWeek'] = 'false'; self.check(msg,""" """ ) def testTimestamp(self): msg = self.Message(); msg['from'] = 'device@clayster.com' msg['to'] = 'master@clayster.com/amr' msg['fields']['seqnr'] = '1' node = msg['fields'].add_node("Device02"); node = msg['fields'].add_node("Device03"); ts = node.add_timestamp("2013-03-07T16:24:30"); ts = node.add_timestamp("2013-03-07T16:24:31"); self.check(msg,""" """ ) def testStringIdsMatcher(self): """ test of StringIds follow spec """ emptyStringIdXML='' msg = self.Message() msg['fields']['stringIds'] = "Nisse" self.check(msg,emptyStringIdXML) msg['fields']['stringIds'] = "Nisse___nje#" self.check(msg,emptyStringIdXML) msg['fields']['stringIds'] = "1" self.check(msg,emptyStringIdXML) suite = unittest.TestLoader().loadTestsFromTestCase(TestSensorDataStanzas)