From 45689fd8799186fd6be0b308745aef428ab50dcc Mon Sep 17 00:00:00 2001 From: Joachim Lindborg Date: Fri, 30 Aug 2013 02:29:52 +0200 Subject: First implementation of the xep_0323 and xep_325 used in IoT systems. Tests are added for stanza and streams --- sleekxmpp/plugins/xep_0323/sensordata.py | 699 ++++++++++++++++++++++++++++++- 1 file changed, 676 insertions(+), 23 deletions(-) (limited to 'sleekxmpp/plugins/xep_0323/sensordata.py') diff --git a/sleekxmpp/plugins/xep_0323/sensordata.py b/sleekxmpp/plugins/xep_0323/sensordata.py index 1343069d..ff671663 100644 --- a/sleekxmpp/plugins/xep_0323/sensordata.py +++ b/sleekxmpp/plugins/xep_0323/sensordata.py @@ -2,19 +2,25 @@ SleekXMPP: The Sleek XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of SleekXMPP. See the file LICENSE for copying permission. """ import logging +import time +import datetime +from threading import Thread, Lock, Timer + +from sleekxmpp.plugins.xep_0323.timerreset import TimerReset from sleekxmpp.xmlstream import JID from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath from sleekxmpp.plugins.base import BasePlugin from sleekxmpp.plugins.xep_0323 import stanza +from sleekxmpp.plugins.xep_0323.stanza import Sensordata log = logging.getLogger(__name__) @@ -23,43 +29,690 @@ log = logging.getLogger(__name__) class XEP_0323(BasePlugin): """ - XEP-0323 IoT Sensor Data + XEP-0323: IoT Sensor Data + + + This XEP provides the underlying architecture, basic operations and data + structures for sensor data communication over XMPP networks. It includes + a hardware abstraction model, removing any technical detail implemented + in underlying technologies. + + Also see + + Configuration Values: + threaded -- Indicates if communication with sensors should be threaded. + Defaults to True. + + Events: + Sensor side + ----------- + Sensordata Event:Req -- Received a request for data + Sensordata Event:Cancel -- Received a cancellation for a request + + Client side + ----------- + Sensordata Event:Accepted -- Received a accept from sensor for a request + Sensordata Event:Rejected -- Received a reject from sensor for a request + Sensordata Event:Cancelled -- Received a cancel confirm from sensor + Sensordata Event:Fields -- Received fields from sensor for a request + This may be triggered multiple times since + the sensor can split up its response in + multiple messages. + Sensordata Event:Failure -- Received a failure indication from sensor + for a request. Typically a comm timeout. + + Attributes: + threaded -- Indicates if command events should be threaded. + Defaults to True. + sessions -- A dictionary or equivalent backend mapping + session IDs to dictionaries containing data + relevant to a request's session. This dictionary is used + both by the client and sensor side. On client side, seqnr + is used as key, while on sensor side, a session_id is used + as key. This ensures that the two will not collide, so + one instance can be both client and sensor. + Sensor side + ----------- + nodes -- A dictionary mapping sensor nodes that are serviced through + this XMPP instance to their device handlers ("drivers"). + Client side + ----------- + last_seqnr -- The last used sequence number (integer). One sequence of + communication (e.g. -->request, <--accept, <--fields) + between client and sensor is identified by a unique + sequence number (unique between the client/sensor pair) + + Methods: + plugin_init -- Overrides base_plugin.plugin_init + post_init -- Overrides base_plugin.post_init + plugin_end -- Overrides base_plugin.plugin_end + + Sensor side + ----------- + register_node -- Register a sensor as available from this XMPP + instance. + + Client side + ----------- + request_data -- Initiates a request for data from one or more + sensors. Non-blocking, a callback function will + be called when data is available. + """ name = 'xep_0323' description = 'XEP-0323 Internet of Things - Sensor Data' - dependencies = set(['xep_0030']) # set(['xep_0030', 'xep_0004', 'xep_0082', 'xep_0131']) + dependencies = set(['xep_0030']) stanza = stanza + + default_config = { + 'threaded': True +# 'session_db': None + } + def plugin_init(self): - pass - # self.node_event_map = {} + """ Start the XEP-0323 plugin """ + + self.xmpp.register_handler( + Callback('Sensordata Event:Req', + StanzaPath('iq@type=get/req'), + self._handle_event_req)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Accepted', + StanzaPath('iq@type=result/accepted'), + self._handle_event_accepted)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Rejected', + StanzaPath('iq@type=error/rejected'), + self._handle_event_rejected)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Cancel', + StanzaPath('iq@type=get/cancel'), + self._handle_event_cancel)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Cancelled', + StanzaPath('iq@type=result/cancelled'), + self._handle_event_cancelled)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Fields', + StanzaPath('message/fields'), + self._handle_event_fields)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Failure', + StanzaPath('message/failure'), + self._handle_event_failure)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Started', + StanzaPath('message/started'), + self._handle_event_started)) + + # Server side dicts + self.nodes = {}; + self.sessions = {}; + + self.last_seqnr = 0; + self.seqnr_lock = Lock(); + + ## For testning only + self.test_authenticated_from = "" + + def post_init(self): + """ Init complete. Register our features in Serivce discovery. """ + BasePlugin.post_init(self) + self.xmpp['xep_0030'].add_feature(Sensordata.namespace) + self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple()) - # self.xmpp.register_handler( - # Callback('Sensordata Event: Get', - # StanzaPath('message/sensordata_event/get'), - # self._handle_event_get)) + def _new_session(self): + """ Return a new session ID. """ + return str(time.time()) + '-' + self.xmpp.new_id() def plugin_end(self): - # self.xmpp.remove_handler('Sensordata Event: Get') - pass - - def get_value(self, jid, msg): + """ Stop the XEP-0323 plugin """ + self.sessions.clear(); + self.xmpp.remove_handler('Sensordata Event:Req') + self.xmpp.remove_handler('Sensordata Event:Accepted') + self.xmpp.remove_handler('Sensordata Event:Rejected') + self.xmpp.remove_handler('Sensordata Event:Cancel') + self.xmpp.remove_handler('Sensordata Event:Cancelled') + self.xmpp.remove_handler('Sensordata Event:Fields') + self.xmpp['xep_0030'].del_feature(feature=Sensordata.namespace) + self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple()); + + + # ================================================================= + # Sensor side (data provider) API + + def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): + """ + Register a sensor/device as available for serving of data through this XMPP + instance. + + The device object may by any custom implementation to support + specific devices, but it must implement the functions: + has_field + request_fields + according to the interfaces shown in the example device.py file. + + Arguments: + nodeId -- The identifier for the device + device -- The device object + commTimeout -- Time in seconds to wait between each callback from device during + a data readout. Float. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + self.nodes[nodeId] = {"device": device, + "commTimeout": commTimeout, + "sourceId": sourceId, + "cacheType": cacheType}; + + def _set_authenticated(self, auth=''): + """ Internal testing function """ + self.test_authenticated_from = auth; + + + def _handle_event_req(self, iq): + """ + Event handler for reception of an Iq with req - this is a request. + + Verifies that + - all the requested nodes are available + - at least one of the requested fields is available from at least + one of the nodes + + If the request passes verification, an accept response is sent, and + the readout process is started in a separate thread. + If the verification fails, a reject message is sent. + """ + + seqnr = iq['req']['seqnr']; + error_msg = ''; + req_ok = True; + + # Authentication + if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: + # Invalid authentication + req_ok = False; + error_msg = "Access denied"; + + # Nodes + process_nodes = []; + if len(iq['req']['nodes']) > 0: + for n in iq['req']['nodes']: + if not n['nodeId'] in self.nodes: + req_ok = False; + error_msg = "Invalid nodeId " + n['nodeId']; + process_nodes = [n['nodeId'] for n in iq['req']['nodes']]; + else: + process_nodes = self.nodes.keys(); + + # Fields - if we just find one we are happy, otherwise we reject + process_fields = []; + if len(iq['req']['fields']) > 0: + found = False + for f in iq['req']['fields']: + for node in self.nodes: + if self.nodes[node]["device"].has_field(f['name']): + found = True; + break; + if not found: + req_ok = False; + error_msg = "Invalid field " + f['name']; + process_fields = [f['name'] for n in iq['req']['fields']]; + + req_flags = iq['req']._get_flags(); + + request_delay_sec = None + if 'when' in req_flags: + # Timed request - requires datetime string in iso format + # ex. 2013-04-05T15:00:03 + dt = None + try: + dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S") + except ValueError: + req_ok = False; + error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)." + + if not dt is None: + # Datetime properly formatted + dtnow = datetime.datetime.now() + dtdiff = dt - dtnow + request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600 + if request_delay_sec <= 0: + req_ok = False; + error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat(); + + if req_ok: + session = self._new_session(); + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr}; + self.sessions[session]["commTimers"] = {}; + self.sessions[session]["nodeDone"] = {}; + + #print("added session: " + str(self.sessions)) + + iq.reply(); + iq['accepted']['seqnr'] = seqnr; + if not request_delay_sec is None: + iq['accepted']['queued'] = "true" + iq.send(block=False); + + self.sessions[session]["node_list"] = process_nodes; + + if not request_delay_sec is None: + # Delay request to requested time + timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags)) + self.sessions[session]["commTimers"]["delaytimer"] = timer; + timer.start(); + return + + if self.threaded: + #print("starting thread") + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags)) + tr_req.start() + #print("started thread") + else: + self._threaded_node_request(session, process_fields, req_flags); + + else: + iq.reply(); + iq['type'] = 'error'; + iq['rejected']['seqnr'] = seqnr; + iq['rejected']['error'] = error_msg; + iq.send(block=False); + + def _threaded_node_request(self, session, process_fields, flags): + """ + Helper function to handle the device readouts in a separate thread. + + Arguments: + session -- The request session id + process_fields -- The fields to request from the devices + flags -- [optional] flags to pass to the devices, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + """ + for node in self.sessions[session]["node_list"]: + self.sessions[session]["nodeDone"][node] = False; + + for node in self.sessions[session]["node_list"]: + timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); + self.sessions[session]["commTimers"][node] = timer; + #print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout'])) + timer.start(); + self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback); + + def _event_comm_timeout(self, session, nodeId): + """ + Triggered if any of the readout operations timeout. + Sends a failure message back to the client, stops communicating + with the failing device. + + Arguments: + session -- The request session id + nodeId -- The id of the device which timed out + """ + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['failure']['seqnr'] = self.sessions[session]['seqnr']; + msg['failure']['error']['text'] = "Timeout"; + msg['failure']['error']['nodeId'] = nodeId; + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + msg['failure']['done'] = 'true'; + msg.send(); + # The session is complete, delete it + #print("del session " + session + " due to timeout") + del self.sessions[session]; + + def _event_delayed_req(self, session, process_fields, req_flags): + """ + Triggered when the timer from a delayed request fires. + + Arguments: + session -- The request session id + process_fields -- The fields to request from the devices + flags -- [optional] flags to pass to the devices, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } """ - Recieving a stanza for erading values - # verify provisioning + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['started']['seqnr'] = self.sessions[session]['seqnr']; + msg.send(); + + if self.threaded: + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags)) + tr_req.start() + else: + self._threaded_node_request(session, process_fields, req_flags); + + def _all_nodes_done(self, session): + """ + Checks wheter all devices are done replying to the readout. + + Arguments: + session -- The request session id + """ + for n in self.sessions[session]["nodeDone"]: + if not self.sessions[session]["nodeDone"][n]: + return False; + return True; + + def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None): + """ + Callback function called by the devices when they have any additional data. + Composes a message with the data and sends it back to the client, and resets + the timeout timer for the device. + + Arguments: + session -- The request session id + nodeId -- The device id which initiated the callback + result -- The current result status of the readout. Valid values are: + "error" - Readout failed. + "fields" - Contains readout data. + "done" - Indicates that the readout is complete. May contain + readout data. + timestamp_block -- [optional] Only applies when result != "error" + The readout data. Structured as a dictionary: + { + timestamp: timestamp for this datablock, + fields: list of field dictionary (one per readout field). + readout field dictionary format: + { + type: The field type (numeric, boolean, dateTime, timeSpan, string, enum) + name: The field name + value: The field value + unit: The unit of the field. Only applies to type numeric. + dataType: The datatype of the field. Only applies to type enum. + flags: [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + } + } + error_msg -- [optional] Only applies when result == "error". + Error details when a request failed. + """ + if not session in self.sessions: + # This can happend if a session was deleted, like in a cancellation. Just drop the data. + return + + if result == "error": + self.sessions[session]["commTimers"][nodeId].cancel(); + + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['failure']['seqnr'] = self.sessions[session]['seqnr']; + msg['failure']['error']['text'] = error_msg; + msg['failure']['error']['nodeId'] = nodeId; + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + msg['failure']['done'] = 'true'; + # The session is complete, delete it + # print("del session " + session + " due to error") + del self.sessions[session]; + msg.send(); + else: + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['fields']['seqnr'] = self.sessions[session]['seqnr']; + + if timestamp_block is not None and len(timestamp_block) > 0: + node = msg['fields'].add_node(nodeId); + ts = node.add_timestamp(timestamp_block["timestamp"]); + + for f in timestamp_block["fields"]: + data = ts.add_data( typename=f['type'], + name=f['name'], + value=f['value'], + unit=f['unit'], + dataType=f['dataType'], + flags=f['flags']); + + if result == "done": + self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["nodeDone"][nodeId] = True; + msg['fields']['done'] = 'true'; + if (self._all_nodes_done(session)): + # The session is complete, delete it + # print("del session " + session + " due to complete") + del self.sessions[session]; + else: + # Restart comm timer + self.sessions[session]["commTimers"][nodeId].reset(); + + msg.send(); + + def _handle_event_cancel(self, iq): + """ Received Iq with cancel - this is a cancel request. + Delete the session and confirm. """ - # verify requested values and categories + seqnr = iq['cancel']['seqnr']; + # Find the session + for s in self.sessions: + if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr: + # found it. Cancel all timers + for n in self.sessions[s]["commTimers"]: + self.sessions[s]["commTimers"][n].cancel(); - # Send accepted - # Thread of the readout + # Confirm + iq.reply(); + iq['type'] = 'result'; + iq['cancelled']['seqnr'] = seqnr; + iq.send(block=False); + + # Delete session + del self.sessions[s] + return - # send started + # Could not find session, send reject + iq.reply(); + iq['type'] = 'error'; + iq['rejected']['seqnr'] = seqnr; + iq['rejected']['error'] = "Cancel request received, no matching request is active."; + iq.send(block=False); - # send data messages + # ================================================================= + # Client side (data retriever) API - # send done - """ - pass + def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None): + """ + Called on the client side to initiade a data readout. + Composes a message with the request and sends it to the device(s). + Does not block, the callback will be called when data is available. + + Arguments: + from_jid -- The jid of the requester + to_jid -- The jid of the device(s) + callback -- The callback function to call when data is availble. + + The callback function must support the following arguments: + + from_jid -- The jid of the responding device(s) + result -- The current result status of the readout. Valid values are: + "accepted" - Readout request accepted + "queued" - Readout request accepted and queued + "rejected" - Readout request rejected + "failure" - Readout failed. + "cancelled" - Confirmation of request cancellation. + "started" - Previously queued request is now started + "fields" - Contains readout data. + "done" - Indicates that the readout is complete. + + nodeId -- [optional] Mandatory when result == "fields" or "failure". + The node Id of the responding device. One callback will only + contain data from one device. + timestamp -- [optional] Mandatory when result == "fields". + The timestamp of data in this callback. One callback will only + contain data from one timestamp. + fields -- [optional] Mandatory when result == "fields". + List of field dictionaries representing the readout data. + Dictionary format: + { + typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum) + name: The field name + value: The field value + unit: The unit of the field. Only applies to type numeric. + dataType: The datatype of the field. Only applies to type enum. + flags: [optional] data classifier flags for the field, e.g. momentary. + Formatted as a dictionary like { "flag name": "flag value" ... } + } + + error_msg -- [optional] Mandatory when result == "rejected" or "failure". + Details about why the request is rejected or failed. + "rejected" means that the request is stopped, but note that the + request will continue even after a "failure". "failure" only means + that communication was stopped to that specific device, other + device(s) (if any) will continue their readout. + + nodeIds -- [optional] Limits the request to the node Ids in this list. + fields -- [optional] Limits the request to the field names in this list. + flags -- [optional] Limits the request according to the flags, or sets + readout conditions such as timing. + + Return value: + session -- Session identifier. Client can use this as a reference to cancel + the request. + """ + iq = self.xmpp.Iq(); + iq['from'] = from_jid; + iq['to'] = to_jid; + iq['type'] = "get"; + seqnr = self._get_new_seqnr(); + iq['id'] = seqnr; + iq['req']['seqnr'] = seqnr; + if nodeIds is not None: + for nodeId in nodeIds: + iq['req'].add_node(nodeId); + if fields is not None: + for field in fields: + iq['req'].add_field(field); + + iq['req']._set_flags(flags); + + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback}; + iq.send(block=False); + + return seqnr; + + def cancel_request(self, session): + """ + Called on the client side to cancel a request for data readout. + Composes a message with the cancellation and sends it to the device(s). + Does not block, the callback will be called when cancellation is + confirmed. + + Arguments: + session -- The session id of the request to cancel + """ + seqnr = session + iq = self.xmpp.Iq(); + iq['from'] = self.sessions[seqnr]['from'] + iq['to'] = self.sessions[seqnr]['to']; + iq['type'] = "get"; + iq['id'] = seqnr; + iq['cancel']['seqnr'] = seqnr; + iq.send(block=False); + + def _get_new_seqnr(self): + """ Returns a unique sequence number (unique across threads) """ + self.seqnr_lock.acquire(); + self.last_seqnr = self.last_seqnr + 1; + self.seqnr_lock.release(); + return str(self.last_seqnr); + + def _handle_event_accepted(self, iq): + """ Received Iq with accepted - request was accepted """ + seqnr = iq['accepted']['seqnr']; + result = "accepted" + if iq['accepted']['queued'] == 'true': + result = "queued" + + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result=result); + + def _handle_event_rejected(self, iq): + """ Received Iq with rejected - this is a reject. + Delete the session. """ + seqnr = iq['rejected']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']); + # Session terminated + del self.sessions[seqnr]; + + def _handle_event_cancelled(self, iq): + """ + Received Iq with cancelled - this is a cancel confirm. + Delete the session. + """ + #print("Got cancelled") + seqnr = iq['cancelled']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result="cancelled"); + # Session cancelled + del self.sessions[seqnr]; + + def _handle_event_fields(self, msg): + """ + Received Msg with fields - this is a data reponse to a request. + If this is the last data block, issue a "done" callback. + """ + seqnr = msg['fields']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + for node in msg['fields']['nodes']: + for ts in node['timestamps']: + fields = []; + for d in ts['datas']: + field_block = {}; + field_block["name"] = d['name']; + field_block["typename"] = d._get_typename(); + field_block["value"] = d['value']; + if not d['unit'] == "": field_block["unit"] = d['unit']; + if not d['dataType'] == "": field_block["dataType"] = d['dataType']; + flags = d._get_flags(); + if not len(flags) == 0: + field_block["flags"] = flags; + fields.append(field_block); + + callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields); + + if msg['fields']['done'] == "true": + callback(from_jid=msg['from'], result="done"); + # Session done + del self.sessions[seqnr]; + + def _handle_event_failure(self, msg): + """ + Received Msg with failure - our request failed + Delete the session. + """ + seqnr = msg['failure']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']); + + # Session failed + del self.sessions[seqnr]; + + def _handle_event_started(self, msg): + """ + Received Msg with started - our request was queued and is now started. + """ + seqnr = msg['started']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=msg['from'], result="started"); -- cgit v1.2.3