# -*- coding: utf-8 -*-
import sys
import datetime
import time
import threading
from slixmpp.test import *
from slixmpp.xmlstream import ElementBase
from slixmpp.plugins.xep_0323.device import Device
class TestStreamSensorData(SlixTest):
"""
Test using the XEP-0323 plugin.
"""
def setUp(self):
pass
def _time_now(self):
return datetime.datetime.now().replace(microsecond=0).isoformat()
def tearDown(self):
self.stream_close()
def testRequestAccept(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device22")
myDevice._add_field(name="Temperature", typename="numeric", unit="°C")
myDevice._set_momentary_timestamp("2013-03-07T16:24:30")
myDevice._add_field_momentary_data("Temperature", "23.4", flags={"automaticReadout": "true"})
self.xmpp['xep_0323'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
self.recv("""
""")
self.send("""
""")
self.send("""
""")
def testRequestRejectAuth(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
self.xmpp['xep_0323']._set_authenticated("darth@deathstar.com")
self.recv("""
""")
self.send("""
Access denied
""")
def testRequestNode(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
Invalid nodeId Device33
""")
print("."),
self.recv("""
""")
self.send("""
""")
def testRequestField(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.4", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
Invalid field Current
""")
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testRequestMultiTimestampSingleField(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.4", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field(name='Current', typename="numeric", unit="A")
myDevice._add_field(name='Height', typename="string")
myDevice._add_field_timestamp_data(name="Voltage", value="230.6", timestamp="2000-01-01T01:01:02")
myDevice._add_field_timestamp_data(name="Height", value="115 m", timestamp="2000-01-01T01:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testRequestMultiTimestampAllFields(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.4", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field(name='Current', typename="numeric", unit="A")
myDevice._add_field(name='Height', typename="string")
myDevice._add_field_timestamp_data(name="Voltage", value="230.6", timestamp="2000-01-01T01:01:02")
myDevice._add_field_timestamp_data(name="Height", value="115 m", timestamp="2000-01-01T01:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testRequestAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", callback=None)
self.send("""
""")
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33', 'Device22'], callback=None)
self.send("""
""")
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", fields=['Temperature', 'Voltage'], callback=None)
self.send("""
""")
def testRequestRejectAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
def my_callback(from_jid, result, nodeId=None, timestamp=None, fields=None, error_msg=None):
if (result == "rejected") and (error_msg == "Invalid device Device22"):
results.append("rejected")
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33', 'Device22'], callback=my_callback)
self.send("""
""")
self.recv("""
Invalid device Device22
""")
self.failUnless(results == ["rejected"],
"Rejected callback was not properly executed")
def testRequestAcceptedAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
def my_callback(from_jid, result, nodeId=None, timestamp=None, fields=None, error_msg=None):
results.append(result)
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33', 'Device22'], callback=my_callback)
self.send("""
""")
self.recv("""
""")
self.failUnless(results == ["accepted"],
"Accepted callback was not properly executed")
def testRequestFieldsAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
callback_data = {}
def my_callback(from_jid, result, nodeId=None, timestamp=None, fields=None, error_msg=None):
results.append(result)
if result == "fields":
callback_data["nodeId"] = nodeId
callback_data["timestamp"] = timestamp
callback_data["error_msg"] = error_msg
for f in fields:
callback_data["field_" + f['name']] = f
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost",
to_jid="you@google.com",
nodeIds=['Device33'],
callback=my_callback)
#self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33'], callback=my_callback);
self.send("""
""")
self.recv("""
""")
self.recv("""
""")
self.recv("""
""")
self.failUnlessEqual(results, ["accepted","fields","done"])
# self.assertIn("nodeId", callback_data);
self.assertTrue("nodeId" in callback_data)
self.failUnlessEqual(callback_data["nodeId"], "Device33")
# self.assertIn("timestamp", callback_data);
self.assertTrue("timestamp" in callback_data)
self.failUnlessEqual(callback_data["timestamp"], "2000-01-01T00:01:02")
#self.assertIn("field_Voltage", callback_data);
self.assertTrue("field_Voltage" in callback_data)
self.failUnlessEqual(callback_data["field_Voltage"], {"name": "Voltage", "value": "230.4", "typename": "numeric", "unit": "V", "flags": {"invoiced": "true"}})
#self.assertIn("field_TestBool", callback_data);
self.assertTrue("field_TestBool" in callback_data)
self.failUnlessEqual(callback_data["field_TestBool"], {"name": "TestBool", "value": "true", "typename": "boolean" })
def testServiceDiscoveryClient(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
self.recv("""
""")
self.send("""
""")
def testServiceDiscoveryComponent(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
self.recv("""
""")
self.send("""
""")
def testRequestTimeout(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
callback_data = {}
def my_callback(from_jid, result, nodeId=None, timestamp=None, error_msg=None):
results.append(result)
if result == "failure":
callback_data["nodeId"] = nodeId
callback_data["timestamp"] = timestamp
callback_data["error_msg"] = error_msg
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost",
to_jid="you@google.com",
nodeIds=['Device33'],
callback=my_callback)
self.send("""
""")
self.recv("""
""")
self.recv("""
Timeout.
""")
self.failUnlessEqual(results, ["accepted","failure"]);
# self.assertIn("nodeId", callback_data);
self.assertTrue("nodeId" in callback_data)
self.failUnlessEqual(callback_data["nodeId"], "Device33")
# self.assertIn("timestamp", callback_data);
self.assertTrue("timestamp" in callback_data)
self.failUnlessEqual(callback_data["timestamp"], "2013-03-07T17:13:30")
# self.assertIn("error_msg", callback_data);
self.assertTrue("error_msg" in callback_data)
self.failUnlessEqual(callback_data["error_msg"], "Timeout.")
def testDelayedRequest(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device22")
myDevice._add_field(name="Temperature", typename="numeric", unit="°C")
myDevice._set_momentary_timestamp("2013-03-07T16:24:30")
myDevice._add_field_momentary_data("Temperature", "23.4", flags={"automaticReadout": "true"})
self.xmpp['xep_0323'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
dtnow = datetime.datetime.now()
ts_2sec = datetime.timedelta(0,2)
dtnow_plus_2sec = dtnow + ts_2sec
when_flag = dtnow_plus_2sec.replace(microsecond=0).isoformat()
self.recv("""
""")
self.send("""
""")
time.sleep(1)
self.send("""
""")
self.send("""
""")
def testDelayedRequestFail(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device22")
myDevice._add_field(name="Temperature", typename="numeric", unit="°C")
myDevice._set_momentary_timestamp("2013-03-07T16:24:30")
myDevice._add_field_momentary_data("Temperature", "23.4", flags={"automaticReadout": "true"})
self.xmpp['xep_0323'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
dtnow = datetime.datetime.now()
ts_2sec = datetime.timedelta(0,2)
dtnow_minus_2sec = dtnow - ts_2sec
when_flag = dtnow_minus_2sec.replace(microsecond=0).isoformat()
self.recv("""
""")
# Remove the returned datetime to allow predictable test
xml_stanza = self._filtered_stanza_prepare()
error_text = xml_stanza['rejected']['error'] #['text']
error_text = error_text[:error_text.find(':')]
xml_stanza['rejected']['error'] = error_text
self._filtered_stanza_check("""
Invalid datetime in 'when' flag, cannot set a time in the past. Current time
""", xml_stanza)
def _filtered_stanza_prepare(self, timeout=.5):
sent = self.xmpp.socket.next_sent(timeout)
if sent is None:
self.fail("No stanza was sent.")
xml = self.parse_xml(sent)
self.fix_namespaces(xml, 'jabber:client')
sent = self.xmpp._build_stanza(xml, 'jabber:client')
return sent
def _filtered_stanza_check(self, data, filtered, defaults=None, use_values=True, method='exact'):
self.check(filtered, data,
method=method,
defaults=defaults,
use_values=use_values)
def testRequestFieldFrom(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.1", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.2", timestamp="2000-02-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.3", timestamp="2000-03-01T00:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testRequestFieldTo(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.1", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.2", timestamp="2000-02-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.3", timestamp="2000-03-01T00:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testRequestFieldFromTo(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device44")
myDevice._add_field(name='Voltage', typename="numeric", unit="V")
myDevice._add_field_timestamp_data(name="Voltage", value="230.1", timestamp="2000-01-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.2", timestamp="2000-02-01T00:01:02", flags={"invoiced": "true"})
myDevice._add_field_timestamp_data(name="Voltage", value="230.3", timestamp="2000-03-01T00:01:02", flags={"invoiced": "true"})
self.xmpp['xep_0323'].register_node('Device44', myDevice, commTimeout=0.5)
print("."),
self.recv("""
""")
self.send("""
""")
self.send("""
""")
self.send("""
""")
def testDelayedRequestClient(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
callback_data = {}
def my_callback(from_jid, result, nodeId=None, timestamp=None, fields=None, error_msg=None):
results.append(result)
if result == "fields":
callback_data["nodeId"] = nodeId
callback_data["timestamp"] = timestamp
callback_data["error_msg"] = error_msg
for f in fields:
callback_data["field_" + f['name']] = f
self.xmpp['xep_0323'].request_data(from_jid="tester@localhost",
to_jid="you@google.com",
nodeIds=['Device33'],
callback=my_callback)
#self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33'], callback=my_callback);
self.send("""
""")
self.recv("""
""")
self.recv("""
""")
self.recv("""
""")
self.recv("""
""")
self.failUnlessEqual(results, ["queued","started","fields","done"]);
# self.assertIn("nodeId", callback_data);
self.assertTrue("nodeId" in callback_data)
self.failUnlessEqual(callback_data["nodeId"], "Device33")
# self.assertIn("timestamp", callback_data);
self.assertTrue("timestamp" in callback_data)
self.failUnlessEqual(callback_data["timestamp"], "2000-01-01T00:01:02")
# self.assertIn("field_Voltage", callback_data);
self.assertTrue("field_Voltage" in callback_data)
self.failUnlessEqual(callback_data["field_Voltage"], {"name": "Voltage", "value": "230.4", "typename": "numeric", "unit": "V", "flags": {"invoiced": "true"}})
# self.assertIn("field_TestBool", callback_data);
self.assertTrue("field_TestBool" in callback_data)
self.failUnlessEqual(callback_data["field_TestBool"], {"name": "TestBool", "value": "true", "typename": "boolean" })
def testRequestFieldsCancelAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0323'])
results = []
def my_callback(from_jid, result, nodeId=None, timestamp=None, fields=None, error_msg=None):
results.append(result)
session = self.xmpp['xep_0323'].request_data(from_jid="tester@localhost", to_jid="you@google.com", nodeIds=['Device33'], callback=my_callback)
self.send("""
""")
self.recv("""
""")
self.xmpp['xep_0323'].cancel_request(session=session)
self.send("""
""")
self.recv("""
""")
self.failUnlessEqual(results, ["accepted","cancelled"])
def testDelayedRequestCancel(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0323'])
myDevice = Device("Device22")
myDevice._add_field(name="Temperature", typename="numeric", unit="°C")
myDevice._set_momentary_timestamp("2013-03-07T16:24:30")
myDevice._add_field_momentary_data("Temperature", "23.4", flags={"automaticReadout": "true"})
self.xmpp['xep_0323'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
dtnow = datetime.datetime.now()
ts_2sec = datetime.timedelta(0,2)
dtnow_plus_2sec = dtnow + ts_2sec
when_flag = dtnow_plus_2sec.replace(microsecond=0).isoformat()
self.recv("""
""")
self.send("""
""")
self.recv("""
""")
self.send("""
""")
# Test cancel of non-existing request
self.recv("""
""")
self.send("""
Cancel request received, no matching request is active.
""")
# Ensure we don't get anything after cancellation
self.send(None)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamSensorData)