diff options
Diffstat (limited to 'sleekxmpp')
-rw-r--r-- | sleekxmpp/plugins/__init__.py | 3 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/__init__.py | 2 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/device.py | 243 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/sensordata.py | 699 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/stanza/__init__.py | 2 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/stanza/base.py | 5 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/stanza/sensordata.py | 796 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0323/timerreset.py | 64 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/__init__.py | 18 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/control.py | 574 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/device.py | 125 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/stanza/__init__.py | 12 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/stanza/base.py | 13 | ||||
-rw-r--r-- | sleekxmpp/plugins/xep_0325/stanza/control.py | 526 |
14 files changed, 3020 insertions, 62 deletions
diff --git a/sleekxmpp/plugins/__init__.py b/sleekxmpp/plugins/__init__.py index 10a79439..bdd06d50 100644 --- a/sleekxmpp/plugins/__init__.py +++ b/sleekxmpp/plugins/__init__.py @@ -79,5 +79,6 @@ __all__ = [ 'xep_0302', # XMPP Compliance Suites 2012 'xep_0308', # Last Message Correction 'xep_0313', # Message Archive Management - 'xep_0323', # IoT Sensor Data + 'xep_0323', # IoT Systems Sensor Data + 'xep_0325', # IoT Systems Control ] diff --git a/sleekxmpp/plugins/xep_0323/__init__.py b/sleekxmpp/plugins/xep_0323/__init__.py index e4e2d0ee..10779ada 100644 --- a/sleekxmpp/plugins/xep_0323/__init__.py +++ b/sleekxmpp/plugins/xep_0323/__init__.py @@ -2,7 +2,7 @@ SleekXMPP: The Sleek XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of SleekXMPP. See the file LICENSE for copying permission. diff --git a/sleekxmpp/plugins/xep_0323/device.py b/sleekxmpp/plugins/xep_0323/device.py new file mode 100644 index 00000000..8414f5b4 --- /dev/null +++ b/sleekxmpp/plugins/xep_0323/device.py @@ -0,0 +1,243 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import datetime + +class Device(object): + """ + Example implementation of a device readout object. + Is registered in the XEP_0323.register_node call + The device object may be any custom implementation to support + specific devices, but it must implement the functions: + has_field + request_fields + """ + + def __init__(self, nodeId): + self.nodeId = nodeId; + self.fields = {}; + # {'type':'numeric', + # 'name':'myname', + # 'value': 42, + # 'unit':'Z'}]; + self.timestamp_data = {}; + self.momentary_data = {}; + self.momentary_timestamp = ""; + + def has_field(self, field): + """ + Returns true if the supplied field name exists in this device. + + Arguments: + field -- The field name + """ + if field in self.fields: + return True; + return False; + + def request_fields(self, fields, flags, session, callback): + """ + Starts a data readout. Verifies the requested fields, + refreshes the data (if needed) and calls the callback + with requested data. + + + Arguments: + fields -- List of field names to readout + flags -- [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + session -- Session id, only used in the callback as identifier + callback -- Callback function to call when data is available. + + The callback function must support the following arguments: + + session -- Session id, as supplied in the request_fields call + nodeId -- Identifier for this device + result -- The current result status of the readout. Valid values are: + "error" - Readout failed. + "fields" - Contains readout data. + "done" - Indicates that the readout is complete. May contain + readout data. + timestamp_block -- [optional] Only applies when result != "error" + The readout data. Structured as a dictionary: + { + timestamp: timestamp for this datablock, + fields: list of field dictionary (one per readout field). + readout field dictionary format: + { + type: The field type (numeric, boolean, dateTime, timeSpan, string, enum) + name: The field name + value: The field value + unit: The unit of the field. Only applies to type numeric. + dataType: The datatype of the field. Only applies to type enum. + flags: [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + } + } + error_msg -- [optional] Only applies when result == "error". + Error details when a request failed. + + """ + + if len(fields) > 0: + # Check availiability + for f in fields: + if f not in self.fields: + self._send_reject(session, callback) + return False; + else: + # Request all fields + fields = self.fields; + + + # Refresh data from device + # ... + + if "momentary" in flags and flags['momentary'] == "true" or \ + "all" in flags and flags['all'] == "true": + ts_block = {}; + timestamp = ""; + + if len(self.momentary_timestamp) > 0: + timestamp = self.momentary_timestamp; + else: + timestamp = self._get_timestamp(); + + field_block = []; + for f in self.momentary_data: + if f in fields: + field_block.append({"name": f, + "type": self.fields[f]["type"], + "unit": self.fields[f]["unit"], + "dataType": self.fields[f]["dataType"], + "value": self.momentary_data[f]["value"], + "flags": self.momentary_data[f]["flags"]}); + ts_block["timestamp"] = timestamp; + ts_block["fields"] = field_block; + + callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block); + return + + from_flag = self._datetime_flag_parser(flags, 'from') + to_flag = self._datetime_flag_parser(flags, 'to') + + for ts in sorted(self.timestamp_data.keys()): + tsdt = datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S") + if not from_flag is None: + if tsdt < from_flag: + #print (str(tsdt) + " < " + str(from_flag)) + continue + if not to_flag is None: + if tsdt > to_flag: + #print (str(tsdt) + " > " + str(to_flag)) + continue + + ts_block = {}; + field_block = []; + + for f in self.timestamp_data[ts]: + if f in fields: + field_block.append({"name": f, + "type": self.fields[f]["type"], + "unit": self.fields[f]["unit"], + "dataType": self.fields[f]["dataType"], + "value": self.timestamp_data[ts][f]["value"], + "flags": self.timestamp_data[ts][f]["flags"]}); + + ts_block["timestamp"] = ts; + ts_block["fields"] = field_block; + callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block); + callback(session, result="done", nodeId=self.nodeId, timestamp_block=None); + + def _datetime_flag_parser(self, flags, flagname): + if not flagname in flags: + return None + + dt = None + try: + dt = datetime.datetime.strptime(flags[flagname], "%Y-%m-%dT%H:%M:%S") + except ValueError: + # Badly formatted datetime, ignore it + pass + return dt + + + def _get_timestamp(self): + """ + Generates a properly formatted timestamp of current time + """ + return datetime.datetime.now().replace(microsecond=0).isoformat() + + def _send_reject(self, session, callback): + """ + Sends a reject to the caller + + Arguments: + session -- Session id, see definition in request_fields function + callback -- Callback function, see definition in request_fields function + """ + callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject"); + + def _add_field(self, name, typename, unit=None, dataType=None): + """ + Adds a field to the device + + Arguments: + name -- Name of the field + typename -- Type of the field (numeric, boolean, dateTime, timeSpan, string, enum) + unit -- [optional] only applies to "numeric". Unit for the field. + dataType -- [optional] only applies to "enum". Datatype for the field. + """ + self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType}; + + def _add_field_timestamp_data(self, name, timestamp, value, flags=None): + """ + Adds timestamped data to a field + + Arguments: + name -- Name of the field + timestamp -- Timestamp for the data (string) + value -- Field value at the timestamp + flags -- [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + """ + if not name in self.fields: + return False; + if not timestamp in self.timestamp_data: + self.timestamp_data[timestamp] = {}; + + self.timestamp_data[timestamp][name] = {"value": value, "flags": flags}; + return True; + + def _add_field_momentary_data(self, name, value, flags=None): + """ + Sets momentary data to a field + + Arguments: + name -- Name of the field + value -- Field value at the timestamp + flags -- [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + """ + if self.fields[name] is None: + return False; + if flags is None: + flags = {}; + + flags["momentary"] = "true" + self.momentary_data[name] = {"value": value, "flags": flags}; + return True; + + def _set_momentary_timestamp(self, timestamp): + """ + This function is only for unit testing to produce predictable results. + """ + self.momentary_timestamp = timestamp; + diff --git a/sleekxmpp/plugins/xep_0323/sensordata.py b/sleekxmpp/plugins/xep_0323/sensordata.py index 1343069d..ff671663 100644 --- a/sleekxmpp/plugins/xep_0323/sensordata.py +++ b/sleekxmpp/plugins/xep_0323/sensordata.py @@ -2,19 +2,25 @@ SleekXMPP: The Sleek XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of SleekXMPP. See the file LICENSE for copying permission. """ import logging +import time +import datetime +from threading import Thread, Lock, Timer + +from sleekxmpp.plugins.xep_0323.timerreset import TimerReset from sleekxmpp.xmlstream import JID from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath from sleekxmpp.plugins.base import BasePlugin from sleekxmpp.plugins.xep_0323 import stanza +from sleekxmpp.plugins.xep_0323.stanza import Sensordata log = logging.getLogger(__name__) @@ -23,43 +29,690 @@ log = logging.getLogger(__name__) class XEP_0323(BasePlugin): """ - XEP-0323 IoT Sensor Data + XEP-0323: IoT Sensor Data + + + This XEP provides the underlying architecture, basic operations and data + structures for sensor data communication over XMPP networks. It includes + a hardware abstraction model, removing any technical detail implemented + in underlying technologies. + + Also see <http://xmpp.org/extensions/xep-0323.html> + + Configuration Values: + threaded -- Indicates if communication with sensors should be threaded. + Defaults to True. + + Events: + Sensor side + ----------- + Sensordata Event:Req -- Received a request for data + Sensordata Event:Cancel -- Received a cancellation for a request + + Client side + ----------- + Sensordata Event:Accepted -- Received a accept from sensor for a request + Sensordata Event:Rejected -- Received a reject from sensor for a request + Sensordata Event:Cancelled -- Received a cancel confirm from sensor + Sensordata Event:Fields -- Received fields from sensor for a request + This may be triggered multiple times since + the sensor can split up its response in + multiple messages. + Sensordata Event:Failure -- Received a failure indication from sensor + for a request. Typically a comm timeout. + + Attributes: + threaded -- Indicates if command events should be threaded. + Defaults to True. + sessions -- A dictionary or equivalent backend mapping + session IDs to dictionaries containing data + relevant to a request's session. This dictionary is used + both by the client and sensor side. On client side, seqnr + is used as key, while on sensor side, a session_id is used + as key. This ensures that the two will not collide, so + one instance can be both client and sensor. + Sensor side + ----------- + nodes -- A dictionary mapping sensor nodes that are serviced through + this XMPP instance to their device handlers ("drivers"). + Client side + ----------- + last_seqnr -- The last used sequence number (integer). One sequence of + communication (e.g. -->request, <--accept, <--fields) + between client and sensor is identified by a unique + sequence number (unique between the client/sensor pair) + + Methods: + plugin_init -- Overrides base_plugin.plugin_init + post_init -- Overrides base_plugin.post_init + plugin_end -- Overrides base_plugin.plugin_end + + Sensor side + ----------- + register_node -- Register a sensor as available from this XMPP + instance. + + Client side + ----------- + request_data -- Initiates a request for data from one or more + sensors. Non-blocking, a callback function will + be called when data is available. + """ name = 'xep_0323' description = 'XEP-0323 Internet of Things - Sensor Data' - dependencies = set(['xep_0030']) # set(['xep_0030', 'xep_0004', 'xep_0082', 'xep_0131']) + dependencies = set(['xep_0030']) stanza = stanza + + default_config = { + 'threaded': True +# 'session_db': None + } + def plugin_init(self): - pass - # self.node_event_map = {} + """ Start the XEP-0323 plugin """ + + self.xmpp.register_handler( + Callback('Sensordata Event:Req', + StanzaPath('iq@type=get/req'), + self._handle_event_req)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Accepted', + StanzaPath('iq@type=result/accepted'), + self._handle_event_accepted)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Rejected', + StanzaPath('iq@type=error/rejected'), + self._handle_event_rejected)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Cancel', + StanzaPath('iq@type=get/cancel'), + self._handle_event_cancel)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Cancelled', + StanzaPath('iq@type=result/cancelled'), + self._handle_event_cancelled)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Fields', + StanzaPath('message/fields'), + self._handle_event_fields)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Failure', + StanzaPath('message/failure'), + self._handle_event_failure)) + + self.xmpp.register_handler( + Callback('Sensordata Event:Started', + StanzaPath('message/started'), + self._handle_event_started)) + + # Server side dicts + self.nodes = {}; + self.sessions = {}; + + self.last_seqnr = 0; + self.seqnr_lock = Lock(); + + ## For testning only + self.test_authenticated_from = "" + + def post_init(self): + """ Init complete. Register our features in Serivce discovery. """ + BasePlugin.post_init(self) + self.xmpp['xep_0030'].add_feature(Sensordata.namespace) + self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple()) - # self.xmpp.register_handler( - # Callback('Sensordata Event: Get', - # StanzaPath('message/sensordata_event/get'), - # self._handle_event_get)) + def _new_session(self): + """ Return a new session ID. """ + return str(time.time()) + '-' + self.xmpp.new_id() def plugin_end(self): - # self.xmpp.remove_handler('Sensordata Event: Get') - pass - - def get_value(self, jid, msg): + """ Stop the XEP-0323 plugin """ + self.sessions.clear(); + self.xmpp.remove_handler('Sensordata Event:Req') + self.xmpp.remove_handler('Sensordata Event:Accepted') + self.xmpp.remove_handler('Sensordata Event:Rejected') + self.xmpp.remove_handler('Sensordata Event:Cancel') + self.xmpp.remove_handler('Sensordata Event:Cancelled') + self.xmpp.remove_handler('Sensordata Event:Fields') + self.xmpp['xep_0030'].del_feature(feature=Sensordata.namespace) + self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple()); + + + # ================================================================= + # Sensor side (data provider) API + + def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): + """ + Register a sensor/device as available for serving of data through this XMPP + instance. + + The device object may by any custom implementation to support + specific devices, but it must implement the functions: + has_field + request_fields + according to the interfaces shown in the example device.py file. + + Arguments: + nodeId -- The identifier for the device + device -- The device object + commTimeout -- Time in seconds to wait between each callback from device during + a data readout. Float. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + self.nodes[nodeId] = {"device": device, + "commTimeout": commTimeout, + "sourceId": sourceId, + "cacheType": cacheType}; + + def _set_authenticated(self, auth=''): + """ Internal testing function """ + self.test_authenticated_from = auth; + + + def _handle_event_req(self, iq): + """ + Event handler for reception of an Iq with req - this is a request. + + Verifies that + - all the requested nodes are available + - at least one of the requested fields is available from at least + one of the nodes + + If the request passes verification, an accept response is sent, and + the readout process is started in a separate thread. + If the verification fails, a reject message is sent. + """ + + seqnr = iq['req']['seqnr']; + error_msg = ''; + req_ok = True; + + # Authentication + if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: + # Invalid authentication + req_ok = False; + error_msg = "Access denied"; + + # Nodes + process_nodes = []; + if len(iq['req']['nodes']) > 0: + for n in iq['req']['nodes']: + if not n['nodeId'] in self.nodes: + req_ok = False; + error_msg = "Invalid nodeId " + n['nodeId']; + process_nodes = [n['nodeId'] for n in iq['req']['nodes']]; + else: + process_nodes = self.nodes.keys(); + + # Fields - if we just find one we are happy, otherwise we reject + process_fields = []; + if len(iq['req']['fields']) > 0: + found = False + for f in iq['req']['fields']: + for node in self.nodes: + if self.nodes[node]["device"].has_field(f['name']): + found = True; + break; + if not found: + req_ok = False; + error_msg = "Invalid field " + f['name']; + process_fields = [f['name'] for n in iq['req']['fields']]; + + req_flags = iq['req']._get_flags(); + + request_delay_sec = None + if 'when' in req_flags: + # Timed request - requires datetime string in iso format + # ex. 2013-04-05T15:00:03 + dt = None + try: + dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S") + except ValueError: + req_ok = False; + error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)." + + if not dt is None: + # Datetime properly formatted + dtnow = datetime.datetime.now() + dtdiff = dt - dtnow + request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600 + if request_delay_sec <= 0: + req_ok = False; + error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat(); + + if req_ok: + session = self._new_session(); + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr}; + self.sessions[session]["commTimers"] = {}; + self.sessions[session]["nodeDone"] = {}; + + #print("added session: " + str(self.sessions)) + + iq.reply(); + iq['accepted']['seqnr'] = seqnr; + if not request_delay_sec is None: + iq['accepted']['queued'] = "true" + iq.send(block=False); + + self.sessions[session]["node_list"] = process_nodes; + + if not request_delay_sec is None: + # Delay request to requested time + timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags)) + self.sessions[session]["commTimers"]["delaytimer"] = timer; + timer.start(); + return + + if self.threaded: + #print("starting thread") + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags)) + tr_req.start() + #print("started thread") + else: + self._threaded_node_request(session, process_fields, req_flags); + + else: + iq.reply(); + iq['type'] = 'error'; + iq['rejected']['seqnr'] = seqnr; + iq['rejected']['error'] = error_msg; + iq.send(block=False); + + def _threaded_node_request(self, session, process_fields, flags): + """ + Helper function to handle the device readouts in a separate thread. + + Arguments: + session -- The request session id + process_fields -- The fields to request from the devices + flags -- [optional] flags to pass to the devices, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + """ + for node in self.sessions[session]["node_list"]: + self.sessions[session]["nodeDone"][node] = False; + + for node in self.sessions[session]["node_list"]: + timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); + self.sessions[session]["commTimers"][node] = timer; + #print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout'])) + timer.start(); + self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback); + + def _event_comm_timeout(self, session, nodeId): + """ + Triggered if any of the readout operations timeout. + Sends a failure message back to the client, stops communicating + with the failing device. + + Arguments: + session -- The request session id + nodeId -- The id of the device which timed out + """ + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['failure']['seqnr'] = self.sessions[session]['seqnr']; + msg['failure']['error']['text'] = "Timeout"; + msg['failure']['error']['nodeId'] = nodeId; + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + msg['failure']['done'] = 'true'; + msg.send(); + # The session is complete, delete it + #print("del session " + session + " due to timeout") + del self.sessions[session]; + + def _event_delayed_req(self, session, process_fields, req_flags): + """ + Triggered when the timer from a delayed request fires. + + Arguments: + session -- The request session id + process_fields -- The fields to request from the devices + flags -- [optional] flags to pass to the devices, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } """ - Recieving a stanza for erading values - # verify provisioning + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['started']['seqnr'] = self.sessions[session]['seqnr']; + msg.send(); + + if self.threaded: + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags)) + tr_req.start() + else: + self._threaded_node_request(session, process_fields, req_flags); + + def _all_nodes_done(self, session): + """ + Checks wheter all devices are done replying to the readout. + + Arguments: + session -- The request session id + """ + for n in self.sessions[session]["nodeDone"]: + if not self.sessions[session]["nodeDone"][n]: + return False; + return True; + + def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None): + """ + Callback function called by the devices when they have any additional data. + Composes a message with the data and sends it back to the client, and resets + the timeout timer for the device. + + Arguments: + session -- The request session id + nodeId -- The device id which initiated the callback + result -- The current result status of the readout. Valid values are: + "error" - Readout failed. + "fields" - Contains readout data. + "done" - Indicates that the readout is complete. May contain + readout data. + timestamp_block -- [optional] Only applies when result != "error" + The readout data. Structured as a dictionary: + { + timestamp: timestamp for this datablock, + fields: list of field dictionary (one per readout field). + readout field dictionary format: + { + type: The field type (numeric, boolean, dateTime, timeSpan, string, enum) + name: The field name + value: The field value + unit: The unit of the field. Only applies to type numeric. + dataType: The datatype of the field. Only applies to type enum. + flags: [optional] data classifier flags for the field, e.g. momentary + Formatted as a dictionary like { "flag name": "flag value" ... } + } + } + error_msg -- [optional] Only applies when result == "error". + Error details when a request failed. + """ + if not session in self.sessions: + # This can happend if a session was deleted, like in a cancellation. Just drop the data. + return + + if result == "error": + self.sessions[session]["commTimers"][nodeId].cancel(); + + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['failure']['seqnr'] = self.sessions[session]['seqnr']; + msg['failure']['error']['text'] = error_msg; + msg['failure']['error']['nodeId'] = nodeId; + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + msg['failure']['done'] = 'true'; + # The session is complete, delete it + # print("del session " + session + " due to error") + del self.sessions[session]; + msg.send(); + else: + msg = self.xmpp.Message(); + msg['from'] = self.sessions[session]['to']; + msg['to'] = self.sessions[session]['from']; + msg['fields']['seqnr'] = self.sessions[session]['seqnr']; + + if timestamp_block is not None and len(timestamp_block) > 0: + node = msg['fields'].add_node(nodeId); + ts = node.add_timestamp(timestamp_block["timestamp"]); + + for f in timestamp_block["fields"]: + data = ts.add_data( typename=f['type'], + name=f['name'], + value=f['value'], + unit=f['unit'], + dataType=f['dataType'], + flags=f['flags']); + + if result == "done": + self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["nodeDone"][nodeId] = True; + msg['fields']['done'] = 'true'; + if (self._all_nodes_done(session)): + # The session is complete, delete it + # print("del session " + session + " due to complete") + del self.sessions[session]; + else: + # Restart comm timer + self.sessions[session]["commTimers"][nodeId].reset(); + + msg.send(); + + def _handle_event_cancel(self, iq): + """ Received Iq with cancel - this is a cancel request. + Delete the session and confirm. """ - # verify requested values and categories + seqnr = iq['cancel']['seqnr']; + # Find the session + for s in self.sessions: + if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr: + # found it. Cancel all timers + for n in self.sessions[s]["commTimers"]: + self.sessions[s]["commTimers"][n].cancel(); - # Send accepted - # Thread of the readout + # Confirm + iq.reply(); + iq['type'] = 'result'; + iq['cancelled']['seqnr'] = seqnr; + iq.send(block=False); + + # Delete session + del self.sessions[s] + return - # send started + # Could not find session, send reject + iq.reply(); + iq['type'] = 'error'; + iq['rejected']['seqnr'] = seqnr; + iq['rejected']['error'] = "Cancel request received, no matching request is active."; + iq.send(block=False); - # send data messages + # ================================================================= + # Client side (data retriever) API - # send done - """ - pass + def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None): + """ + Called on the client side to initiade a data readout. + Composes a message with the request and sends it to the device(s). + Does not block, the callback will be called when data is available. + + Arguments: + from_jid -- The jid of the requester + to_jid -- The jid of the device(s) + callback -- The callback function to call when data is availble. + + The callback function must support the following arguments: + + from_jid -- The jid of the responding device(s) + result -- The current result status of the readout. Valid values are: + "accepted" - Readout request accepted + "queued" - Readout request accepted and queued + "rejected" - Readout request rejected + "failure" - Readout failed. + "cancelled" - Confirmation of request cancellation. + "started" - Previously queued request is now started + "fields" - Contains readout data. + "done" - Indicates that the readout is complete. + + nodeId -- [optional] Mandatory when result == "fields" or "failure". + The node Id of the responding device. One callback will only + contain data from one device. + timestamp -- [optional] Mandatory when result == "fields". + The timestamp of data in this callback. One callback will only + contain data from one timestamp. + fields -- [optional] Mandatory when result == "fields". + List of field dictionaries representing the readout data. + Dictionary format: + { + typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum) + name: The field name + value: The field value + unit: The unit of the field. Only applies to type numeric. + dataType: The datatype of the field. Only applies to type enum. + flags: [optional] data classifier flags for the field, e.g. momentary. + Formatted as a dictionary like { "flag name": "flag value" ... } + } + + error_msg -- [optional] Mandatory when result == "rejected" or "failure". + Details about why the request is rejected or failed. + "rejected" means that the request is stopped, but note that the + request will continue even after a "failure". "failure" only means + that communication was stopped to that specific device, other + device(s) (if any) will continue their readout. + + nodeIds -- [optional] Limits the request to the node Ids in this list. + fields -- [optional] Limits the request to the field names in this list. + flags -- [optional] Limits the request according to the flags, or sets + readout conditions such as timing. + + Return value: + session -- Session identifier. Client can use this as a reference to cancel + the request. + """ + iq = self.xmpp.Iq(); + iq['from'] = from_jid; + iq['to'] = to_jid; + iq['type'] = "get"; + seqnr = self._get_new_seqnr(); + iq['id'] = seqnr; + iq['req']['seqnr'] = seqnr; + if nodeIds is not None: + for nodeId in nodeIds: + iq['req'].add_node(nodeId); + if fields is not None: + for field in fields: + iq['req'].add_field(field); + + iq['req']._set_flags(flags); + + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback}; + iq.send(block=False); + + return seqnr; + + def cancel_request(self, session): + """ + Called on the client side to cancel a request for data readout. + Composes a message with the cancellation and sends it to the device(s). + Does not block, the callback will be called when cancellation is + confirmed. + + Arguments: + session -- The session id of the request to cancel + """ + seqnr = session + iq = self.xmpp.Iq(); + iq['from'] = self.sessions[seqnr]['from'] + iq['to'] = self.sessions[seqnr]['to']; + iq['type'] = "get"; + iq['id'] = seqnr; + iq['cancel']['seqnr'] = seqnr; + iq.send(block=False); + + def _get_new_seqnr(self): + """ Returns a unique sequence number (unique across threads) """ + self.seqnr_lock.acquire(); + self.last_seqnr = self.last_seqnr + 1; + self.seqnr_lock.release(); + return str(self.last_seqnr); + + def _handle_event_accepted(self, iq): + """ Received Iq with accepted - request was accepted """ + seqnr = iq['accepted']['seqnr']; + result = "accepted" + if iq['accepted']['queued'] == 'true': + result = "queued" + + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result=result); + + def _handle_event_rejected(self, iq): + """ Received Iq with rejected - this is a reject. + Delete the session. """ + seqnr = iq['rejected']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']); + # Session terminated + del self.sessions[seqnr]; + + def _handle_event_cancelled(self, iq): + """ + Received Iq with cancelled - this is a cancel confirm. + Delete the session. + """ + #print("Got cancelled") + seqnr = iq['cancelled']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=iq['from'], result="cancelled"); + # Session cancelled + del self.sessions[seqnr]; + + def _handle_event_fields(self, msg): + """ + Received Msg with fields - this is a data reponse to a request. + If this is the last data block, issue a "done" callback. + """ + seqnr = msg['fields']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + for node in msg['fields']['nodes']: + for ts in node['timestamps']: + fields = []; + for d in ts['datas']: + field_block = {}; + field_block["name"] = d['name']; + field_block["typename"] = d._get_typename(); + field_block["value"] = d['value']; + if not d['unit'] == "": field_block["unit"] = d['unit']; + if not d['dataType'] == "": field_block["dataType"] = d['dataType']; + flags = d._get_flags(); + if not len(flags) == 0: + field_block["flags"] = flags; + fields.append(field_block); + + callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields); + + if msg['fields']['done'] == "true": + callback(from_jid=msg['from'], result="done"); + # Session done + del self.sessions[seqnr]; + + def _handle_event_failure(self, msg): + """ + Received Msg with failure - our request failed + Delete the session. + """ + seqnr = msg['failure']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']); + + # Session failed + del self.sessions[seqnr]; + + def _handle_event_started(self, msg): + """ + Received Msg with started - our request was queued and is now started. + """ + seqnr = msg['started']['seqnr']; + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=msg['from'], result="started"); diff --git a/sleekxmpp/plugins/xep_0323/stanza/__init__.py b/sleekxmpp/plugins/xep_0323/stanza/__init__.py index 82dc9eea..c039cefa 100644 --- a/sleekxmpp/plugins/xep_0323/stanza/__init__.py +++ b/sleekxmpp/plugins/xep_0323/stanza/__init__.py @@ -2,7 +2,7 @@ SleekXMPP: The Sleek XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of SleekXMPP. See the file LICENSE for copying permission. diff --git a/sleekxmpp/plugins/xep_0323/stanza/base.py b/sleekxmpp/plugins/xep_0323/stanza/base.py index 4caf07b2..1dadcf46 100644 --- a/sleekxmpp/plugins/xep_0323/stanza/base.py +++ b/sleekxmpp/plugins/xep_0323/stanza/base.py @@ -1,9 +1,10 @@ """ SleekXMPP: The Sleek XMPP Library - Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + See the file LICENSE for copying permission. """ diff --git a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py index 9567ef87..55e5060f 100644 --- a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py +++ b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py @@ -2,7 +2,7 @@ SleekXMPP: The Sleek XMPP Library Implementation of xeps for Internet of Things http://wiki.xmpp.org/web/Tech_pages/IoT_systems - Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se This file is part of SleekXMPP. See the file LICENSE for copying permission. @@ -10,55 +10,783 @@ from sleekxmpp import Iq, Message from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID - +from re import match class Sensordata(ElementBase): - namespace = 'http://xmpp.org/iot/sensordata' + """ Placeholder for the namespace, not used as a stanza """ + namespace = 'urn:xmpp:iot:sensordata' name = 'sensordata' plugin_attrib = name interfaces = set(tuple()) +class FieldTypes(): + """ + All field types are optional booleans that default to False + """ + field_types = set([ 'momentary','peak','status','computed','identity','historicalSecond','historicalMinute','historicalHour', \ + 'historicalDay','historicalWeek','historicalMonth','historicalQuarter','historicalYear','historicalOther']) + +class FieldStatus(): + """ + All field statuses are optional booleans that default to False + """ + field_status = set([ 'missing','automaticEstimate','manualEstimate','manualReadout','automaticReadout','timeOffset','warning','error', \ + 'signed','invoiced','endOfSeries','powerFailure','invoiceConfirmed']) + class Request(ElementBase): - namespace = 'http://xmpp.org/iot/sensordata' + namespace = 'urn:xmpp:iot:sensordata' name = 'req' plugin_attrib = name - interfaces = set(('seqnr','momentary')) + interfaces = set(['seqnr','nodes','fields','serviceToken','deviceToken','userToken','from','to','when','historical','all']) + interfaces.update(FieldTypes.field_types); + _flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']); + _flags.update(FieldTypes.field_types); + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._nodes = set() + self._fields = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._nodes = set([node['nodeId'] for node in self['nodes']]) + self._fields = set([field['name'] for field in self['fields']]) + + def _get_flags(self): + """ + Helper function for getting of flags. Returns all flags in + dictionary format: { "flag name": "flag value" ... } + """ + flags = {}; + for f in self._flags: + if not self[f] == "": + flags[f] = self[f]; + return flags; + + def _set_flags(self, flags): + """ + Helper function for setting of flags. + + Arguments: + flags -- Flags in dictionary format: { "flag name": "flag value" ... } + """ + for f in self._flags: + if flags is not None and f in flags: + self[f] = flags[f]; + else: + self[f] = None; + + def add_node(self, nodeId, sourceId=None, cacheType=None): + """ + Add a new node element. Each item is required to have a + nodeId, but may also specify a sourceId value and cacheType. + + Arguments: + nodeId -- The ID for the node. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + if nodeId not in self._nodes: + self._nodes.add((nodeId)) + node = RequestNode(parent=self) + node['nodeId'] = nodeId + node['sourceId'] = sourceId + node['cacheType'] = cacheType + self.iterables.append(node) + return node + return None + + def del_node(self, nodeId): + """ + Remove a single node. + + Arguments: + nodeId -- Node ID of the item to remove. + """ + if nodeId in self._nodes: + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + if node['nodeId'] == nodeId: + self.xml.remove(node.xml) + self.iterables.remove(node) + return True + return False + + def get_nodes(self): + """Return all nodes.""" + nodes = set() + for node in self['substanzas']: + if isinstance(node, RequestNode): + nodes.add(node) + return nodes + + def set_nodes(self, nodes): + """ + Set or replace all nodes. The given nodes must be in a + list or set where each item is a tuple of the form: + (nodeId, sourceId, cacheType) + + Arguments: + nodes -- A series of nodes in tuple format. + """ + self.del_nodes() + for node in nodes: + if isinstance(node, RequestNode): + self.add_node(node['nodeId'], node['sourceId'], node['cacheType']) + else: + nodeId, sourceId, cacheType = node + self.add_node(nodeId, sourceId, cacheType) + + def del_nodes(self): + """Remove all nodes.""" + self._nodes = set() + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + self.xml.remove(node.xml) + self.iterables.remove(node) + + + def add_field(self, name): + """ + Add a new field element. Each item is required to have a + name. + + Arguments: + name -- The name of the field. + """ + if name not in self._fields: + self._fields.add((name)) + field = RequestField(parent=self) + field['name'] = name + self.iterables.append(field) + return field + return None + + def del_field(self, name): + """ + Remove a single field. + + Arguments: + name -- name of field to remove. + """ + if name in self._fields: + fields = [i for i in self.iterables if isinstance(i, RequestField)] + for field in fields: + if field['name'] == name: + self.xml.remove(field.xml) + self.iterables.remove(field) + return True + return False + + def get_fields(self): + """Return all fields.""" + fields = set() + for field in self['substanzas']: + if isinstance(field, RequestField): + fields.add(field) + return fields + + def set_fields(self, fields): + """ + Set or replace all fields. The given fields must be in a + list or set where each item is RequestField or string + + Arguments: + fields -- A series of fields in RequestField or string format. + """ + self.del_fields() + for field in fields: + if isinstance(field, RequestField): + self.add_field(field['name']) + else: + self.add_field(field) + + def del_fields(self): + """Remove all fields.""" + self._fields = set() + fields = [i for i in self.iterables if isinstance(i, RequestField)] + for field in fields: + self.xml.remove(field.xml) + self.iterables.remove(field) + + +class RequestNode(ElementBase): + """ Node element in a request """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'node' + plugin_attrib = name + interfaces = set(['nodeId','sourceId','cacheType']) +class RequestField(ElementBase): + """ Field element in a request """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'field' + plugin_attrib = name + interfaces = set(['name']) class Accepted(ElementBase): - namespace = 'http://xmpp.org/iot/sensordata' + namespace = 'urn:xmpp:iot:sensordata' name = 'accepted' plugin_attrib = name - interfaces = set(('seqnr')) + interfaces = set(['seqnr','queued']) +class Started(ElementBase): + namespace = 'urn:xmpp:iot:sensordata' + name = 'started' + plugin_attrib = name + interfaces = set(['seqnr']) class Failure(ElementBase): - namespace = 'http://xmpp.org/iot/sensordata' + namespace = 'urn:xmpp:iot:sensordata' name = 'failure' plugin_attrib = name - interfaces = set(('seqnr','done')) - - - -register_stanza_plugin(Iq, Sensordata) -register_stanza_plugin(Sensordata, Request) -register_stanza_plugin(Sensordata, Accepted) -register_stanza_plugin(Sensordata, Failure) -# register_stanza_plugin(Pubsub, Default) -# register_stanza_plugin(Pubsub, Items) -# register_stanza_plugin(Pubsub, Options) -# register_stanza_plugin(Pubsub, Publish) -# register_stanza_plugin(Pubsub, PublishOptions) -# register_stanza_plugin(Pubsub, Retract) -# register_stanza_plugin(Pubsub, Subscribe) -# register_stanza_plugin(Pubsub, Subscription) -# register_stanza_plugin(Pubsub, Subscriptions) -# register_stanza_plugin(Pubsub, Unsubscribe) -# register_stanza_plugin(Affiliations, Affiliation, iterable=True) -# register_stanza_plugin(Configure, xep_0004.Form) -# register_stanza_plugin(Items, Item, iterable=True) -# register_stanza_plugin(Publish, Item, iterable=True) -# register_stanza_plugin(Retract, Item) -# register_stanza_plugin(Subscribe, Options) -# register_stanza_plugin(Subscription, SubscribeOptions) -# register_stanza_plugin(Subscriptions, Subscription, iterable=True) + interfaces = set(['seqnr','done']) + +class Error(ElementBase): + """ Error element in a request failure """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'error' + plugin_attrib = name + interfaces = set(['nodeId','timestamp','sourceId','cacheType','text']) + + def get_text(self): + """Return then contents inside the XML tag.""" + return self.xml.text + + def set_text(self, value): + """Set then contents inside the XML tag. + + :param value: string + """ + + self.xml.text = value; + return self + + def del_text(self): + """Remove the contents inside the XML tag.""" + self.xml.text = "" + return self + +class Rejected(ElementBase): + namespace = 'urn:xmpp:iot:sensordata' + name = 'rejected' + plugin_attrib = name + interfaces = set(['seqnr','error']) + sub_interfaces = set(['error']) + +class Fields(ElementBase): + """ Fields element, top level in a response message with data """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'fields' + plugin_attrib = name + interfaces = set(['seqnr','done','nodes']) + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._nodes = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._nodes = set([node['nodeId'] for node in self['nodes']]) + + + def add_node(self, nodeId, sourceId=None, cacheType=None, substanzas=None): + """ + Add a new node element. Each item is required to have a + nodeId, but may also specify a sourceId value and cacheType. + + Arguments: + nodeId -- The ID for the node. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + if nodeId not in self._nodes: + self._nodes.add((nodeId)) + node = FieldsNode(parent=self) + node['nodeId'] = nodeId + node['sourceId'] = sourceId + node['cacheType'] = cacheType + if substanzas is not None: + node.set_timestamps(substanzas) + + self.iterables.append(node) + return node + return None + + def del_node(self, nodeId): + """ + Remove a single node. + + Arguments: + nodeId -- Node ID of the item to remove. + """ + if nodeId in self._nodes: + nodes = [i for i in self.iterables if isinstance(i, FieldsNode)] + for node in nodes: + if node['nodeId'] == nodeId: + self.xml.remove(node.xml) + self.iterables.remove(node) + return True + return False + + def get_nodes(self): + """Return all nodes.""" + nodes = set() + for node in self['substanzas']: + if isinstance(node, FieldsNode): + nodes.add(node) + return nodes + + def set_nodes(self, nodes): + """ + Set or replace all nodes. The given nodes must be in a + list or set where each item is a tuple of the form: + (nodeId, sourceId, cacheType) + + Arguments: + nodes -- A series of nodes in tuple format. + """ + #print(str(id(self)) + " set_nodes: got " + str(nodes)) + self.del_nodes() + for node in nodes: + if isinstance(node, FieldsNode): + self.add_node(node['nodeId'], node['sourceId'], node['cacheType'], substanzas=node['substanzas']) + else: + nodeId, sourceId, cacheType = node + self.add_node(nodeId, sourceId, cacheType) + + def del_nodes(self): + """Remove all nodes.""" + self._nodes = set() + nodes = [i for i in self.iterables if isinstance(i, FieldsNode)] + for node in nodes: + self.xml.remove(node.xml) + self.iterables.remove(node) + + +class FieldsNode(ElementBase): + """ Node element in response fields """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'node' + plugin_attrib = name + interfaces = set(['nodeId','sourceId','cacheType','timestamps']) + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._timestamps = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._timestamps = set([ts['value'] for ts in self['timestamps']]) + + def add_timestamp(self, timestamp, substanzas=None): + """ + Add a new timestamp element. + + Arguments: + timestamp -- The timestamp in ISO format. + """ + #print(str(id(self)) + " add_timestamp: " + str(timestamp)) + + if timestamp not in self._timestamps: + self._timestamps.add((timestamp)) + ts = Timestamp(parent=self) + ts['value'] = timestamp + if not substanzas is None: + ts.set_datas(substanzas); + #print("add_timestamp with substanzas: " + str(substanzas)) + self.iterables.append(ts) + #print(str(id(self)) + " added_timestamp: " + str(id(ts))) + return ts + return None + + def del_timestamp(self, timestamp): + """ + Remove a single timestamp. + + Arguments: + timestamp -- timestamp (in ISO format) of the item to remove. + """ + #print("del_timestamp: ") + if timestamp in self._timestamps: + timestamps = [i for i in self.iterables if isinstance(i, Timestamp)] + for ts in timestamps: + if ts['value'] == timestamp: + self.xml.remove(ts.xml) + self.iterables.remove(ts) + return True + return False + + def get_timestamps(self): + """Return all timestamps.""" + #print(str(id(self)) + " get_timestamps: ") + timestamps = set() + for timestamp in self['substanzas']: + if isinstance(timestamp, Timestamp): + timestamps.add(timestamp) + return timestamps + + def set_timestamps(self, timestamps): + """ + Set or replace all timestamps. The given timestamps must be in a + list or set where each item is a timestamp + + Arguments: + timestamps -- A series of timestamps. + """ + #print(str(id(self)) + " set_timestamps: got " + str(timestamps)) + self.del_timestamps() + for timestamp in timestamps: + #print("set_timestamps: subset " + str(timestamp)) + #print("set_timestamps: subset.substanzas " + str(timestamp['substanzas'])) + if isinstance(timestamp, Timestamp): + self.add_timestamp(timestamp['value'], substanzas=timestamp['substanzas']) + else: + #print("set_timestamps: got " + str(timestamp)) + self.add_timestamp(timestamp) + + def del_timestamps(self): + """Remove all timestamps.""" + #print(str(id(self)) + " del_timestamps: ") + self._timestamps = set() + timestamps = [i for i in self.iterables if isinstance(i, Timestamp)] + for timestamp in timestamps: + self.xml.remove(timestamp.xml) + self.iterables.remove(timestamp) + +class Field(ElementBase): + """ + Field element in response Timestamp. This is a base class, + all instances of fields added to Timestamp must be of types: + DataNumeric + DataString + DataBoolean + DataDateTime + DataTimeSpan + DataEnum + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'field' + plugin_attrib = name + interfaces = set(['name','module','stringIds']); + interfaces.update(FieldTypes.field_types); + interfaces.update(FieldStatus.field_status); + + _flags = set(); + _flags.update(FieldTypes.field_types); + _flags.update(FieldStatus.field_status); + + def set_stringIds(self, value): + """Verifies stringIds according to regexp from specification XMPP-0323. + + :param value: string + """ + + pattern = re.compile("^\d+([|]\w+([.]\w+)*([|][^,]*)?)?(,\d+([|]\w+([.]\w+)*([|][^,]*)?)?)*$") + if pattern.match(value) is not None: + self.xml.stringIds = value; + else: + # Bad content, add nothing + pass + + return self + + def _get_flags(self): + """ + Helper function for getting of flags. Returns all flags in + dictionary format: { "flag name": "flag value" ... } + """ + flags = {}; + for f in self._flags: + if not self[f] == "": + flags[f] = self[f]; + return flags; + + def _set_flags(self, flags): + """ + Helper function for setting of flags. + + Arguments: + flags -- Flags in dictionary format: { "flag name": "flag value" ... } + """ + for f in self._flags: + if flags is not None and f in flags: + self[f] = flags[f]; + else: + self[f] = None; + + def _get_typename(self): + return "invalid type, use subclasses!"; + + +class Timestamp(ElementBase): + """ Timestamp element in response Node """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'timestamp' + plugin_attrib = name + interfaces = set(['value','datas']) + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._datas = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._datas = set([data['name'] for data in self['datas']]) + + def add_data(self, typename, name, value, module=None, stringIds=None, unit=None, dataType=None, flags=None): + """ + Add a new data element. + + Arguments: + typename -- The type of data element (numeric, string, boolean, dateTime, timeSpan or enum) + value -- The value of the data element + module -- [optional] language module to use for the data element + stringIds -- [optional] The stringIds used to find associated text in the language module + unit -- [optional] The unit. Only applicable for type numeric + dataType -- [optional] The dataType. Only applicable for type enum + """ + if name not in self._datas: + dataObj = None; + if typename == "numeric": + dataObj = DataNumeric(parent=self); + dataObj['unit'] = unit; + elif typename == "string": + dataObj = DataString(parent=self); + elif typename == "boolean": + dataObj = DataBoolean(parent=self); + elif typename == "dateTime": + dataObj = DataDateTime(parent=self); + elif typename == "timeSpan": + dataObj = DataTimeSpan(parent=self); + elif typename == "enum": + dataObj = DataEnum(parent=self); + dataObj['dataType'] = dataType; + + dataObj['name'] = name; + dataObj['value'] = value; + dataObj['module'] = module; + dataObj['stringIds'] = stringIds; + + if flags is not None: + dataObj._set_flags(flags); + + self._datas.add(name) + self.iterables.append(dataObj) + return dataObj + return None + + def del_data(self, name): + """ + Remove a single data element. + + Arguments: + data_name -- The data element name to remove. + """ + if name in self._datas: + datas = [i for i in self.iterables if isinstance(i, Field)] + for data in datas: + if data['name'] == name: + self.xml.remove(data.xml) + self.iterables.remove(data) + return True + return False + + def get_datas(self): + """ Return all data elements. """ + datas = set() + for data in self['substanzas']: + if isinstance(data, Field): + datas.add(data) + return datas + + def set_datas(self, datas): + """ + Set or replace all data elements. The given elements must be in a + list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum) + + Arguments: + datas -- A series of data elements. + """ + self.del_datas() + for data in datas: + self.add_data(typename=data._get_typename(), name=data['name'], value=data['value'], module=data['module'], stringIds=data['stringIds'], unit=data['unit'], dataType=data['dataType'], flags=data._get_flags()) + + def del_datas(self): + """Remove all data elements.""" + self._datas = set() + datas = [i for i in self.iterables if isinstance(i, Field)] + for data in datas: + self.xml.remove(data.xml) + self.iterables.remove(data) + +class DataNumeric(Field): + """ + Field data of type numeric. + Note that the value is expressed as a string. + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'numeric' + plugin_attrib = name + interfaces = set(['value', 'unit']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "numeric" + +class DataString(Field): + """ + Field data of type string + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'string' + plugin_attrib = name + interfaces = set(['value']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "string" + +class DataBoolean(Field): + """ + Field data of type boolean. + Note that the value is expressed as a string. + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'boolean' + plugin_attrib = name + interfaces = set(['value']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "boolean" + +class DataDateTime(Field): + """ + Field data of type dateTime. + Note that the value is expressed as a string. + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'dateTime' + plugin_attrib = name + interfaces = set(['value']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "dateTime" + +class DataTimeSpan(Field): + """ + Field data of type timeSpan. + Note that the value is expressed as a string. + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'timeSpan' + plugin_attrib = name + interfaces = set(['value']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "timeSpan" + +class DataEnum(Field): + """ + Field data of type enum. + Note that the value is expressed as a string. + """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'enum' + plugin_attrib = name + interfaces = set(['value', 'dataType']); + interfaces.update(Field.interfaces); + + def _get_typename(self): + return "enum" + +class Done(ElementBase): + """ Done element used to signal that all data has been transferred """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'done' + plugin_attrib = name + interfaces = set(['seqnr']) + +class Cancel(ElementBase): + """ Cancel element used to signal that a request shall be cancelled """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'cancel' + plugin_attrib = name + interfaces = set(['seqnr']) + +class Cancelled(ElementBase): + """ Cancelled element used to signal that cancellation is confirmed """ + namespace = 'urn:xmpp:iot:sensordata' + name = 'cancelled' + plugin_attrib = name + interfaces = set(['seqnr']) + + +register_stanza_plugin(Iq, Request) +register_stanza_plugin(Request, RequestNode, iterable=True) +register_stanza_plugin(Request, RequestField, iterable=True) + +register_stanza_plugin(Iq, Accepted) +register_stanza_plugin(Message, Failure) +register_stanza_plugin(Failure, Error) + +register_stanza_plugin(Iq, Rejected) + +register_stanza_plugin(Message, Fields) +register_stanza_plugin(Fields, FieldsNode, iterable=True) +register_stanza_plugin(FieldsNode, Timestamp, iterable=True) +register_stanza_plugin(Timestamp, Field, iterable=True) +register_stanza_plugin(Timestamp, DataNumeric, iterable=True) +register_stanza_plugin(Timestamp, DataString, iterable=True) +register_stanza_plugin(Timestamp, DataBoolean, iterable=True) +register_stanza_plugin(Timestamp, DataDateTime, iterable=True) +register_stanza_plugin(Timestamp, DataTimeSpan, iterable=True) +register_stanza_plugin(Timestamp, DataEnum, iterable=True) + +register_stanza_plugin(Message, Started) + +register_stanza_plugin(Iq, Cancel) +register_stanza_plugin(Iq, Cancelled) diff --git a/sleekxmpp/plugins/xep_0323/timerreset.py b/sleekxmpp/plugins/xep_0323/timerreset.py new file mode 100644 index 00000000..578f1efe --- /dev/null +++ b/sleekxmpp/plugins/xep_0323/timerreset.py @@ -0,0 +1,64 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" +from threading import Thread, Event, Timer +import time + +def TimerReset(*args, **kwargs): + """ Global function for Timer """ + return _TimerReset(*args, **kwargs) + + +class _TimerReset(Thread): + """Call a function after a specified number of seconds: + + t = TimerReset(30.0, f, args=[], kwargs={}) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ + + def __init__(self, interval, function, args=[], kwargs={}): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.finished = Event() + self.resetted = True + + def cancel(self): + """Stop the timer if it hasn't finished yet""" + self.finished.set() + + def run(self): + #print "Time: %s - timer running..." % time.asctime() + + while self.resetted: + #print "Time: %s - timer waiting for timeout in %.2f..." % (time.asctime(), self.interval) + self.resetted = False + self.finished.wait(self.interval) + + if not self.finished.isSet(): + self.function(*self.args, **self.kwargs) + self.finished.set() + #print "Time: %s - timer finished!" % time.asctime() + + def reset(self, interval=None): + """ Reset the timer """ + + if interval: + #print "Time: %s - timer resetting to %.2f..." % (time.asctime(), interval) + self.interval = interval + else: + #print "Time: %s - timer resetting..." % time.asctime() + pass + + self.resetted = True + self.finished.set() + self.finished.clear() diff --git a/sleekxmpp/plugins/xep_0325/__init__.py b/sleekxmpp/plugins/xep_0325/__init__.py new file mode 100644 index 00000000..01c38dce --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/__init__.py @@ -0,0 +1,18 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.base import register_plugin + +from sleekxmpp.plugins.xep_0325.control import XEP_0325 +from sleekxmpp.plugins.xep_0325 import stanza + +register_plugin(XEP_0325) + +xep_0325=XEP_0325 diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py new file mode 100644 index 00000000..e34eb2c2 --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/control.py @@ -0,0 +1,574 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging +import time +from threading import Thread, Timer, Lock + +from sleekxmpp.xmlstream import JID +from sleekxmpp.xmlstream.handler import Callback +from sleekxmpp.xmlstream.matcher import StanzaPath +from sleekxmpp.plugins.base import BasePlugin +from sleekxmpp.plugins.xep_0325 import stanza +from sleekxmpp.plugins.xep_0325.stanza import Control + + +log = logging.getLogger(__name__) + + +class XEP_0325(BasePlugin): + + """ + XEP-0325: IoT Control + + + Actuators are devices in sensor networks that can be controlled through + the network and act with the outside world. In sensor networks and + Internet of Things applications, actuators make it possible to automate + real-world processes. + This plugin implements a mechanism whereby actuators can be controlled + in XMPP-based sensor networks, making it possible to integrate sensors + and actuators of different brands, makes and models into larger + Internet of Things applications. + + Also see <http://xmpp.org/extensions/xep-0325.html> + + Configuration Values: + threaded -- Indicates if communication with sensors should be threaded. + Defaults to True. + + Events: + Sensor side + ----------- + Control Event:DirectSet -- Received a control message + Control Event:SetReq -- Received a control request + + Client side + ----------- + Control Event:SetResponse -- Received a response to a + control request, type result + Control Event:SetResponseError -- Received a response to a + control request, type error + + Attributes: + threaded -- Indicates if command events should be threaded. + Defaults to True. + sessions -- A dictionary or equivalent backend mapping + session IDs to dictionaries containing data + relevant to a request's session. This dictionary is used + both by the client and sensor side. On client side, seqnr + is used as key, while on sensor side, a session_id is used + as key. This ensures that the two will not collide, so + one instance can be both client and sensor. + Sensor side + ----------- + nodes -- A dictionary mapping sensor nodes that are serviced through + this XMPP instance to their device handlers ("drivers"). + Client side + ----------- + last_seqnr -- The last used sequence number (integer). One sequence of + communication (e.g. -->request, <--accept, <--fields) + between client and sensor is identified by a unique + sequence number (unique between the client/sensor pair) + + Methods: + plugin_init -- Overrides base_plugin.plugin_init + post_init -- Overrides base_plugin.post_init + plugin_end -- Overrides base_plugin.plugin_end + + Sensor side + ----------- + register_node -- Register a sensor as available from this XMPP + instance. + + Client side + ----------- + set_request -- Initiates a control request to modify data in + sensor(s). Non-blocking, a callback function will + be called when the sensor has responded. + set_command -- Initiates a control command to modify data in + sensor(s). Non-blocking. The sensor(s) will not + respond regardless of the result of the command, + so no callback is made. + + """ + + name = 'xep_0325' + description = 'XEP-0325 Internet of Things - Control' + dependencies = set(['xep_0030']) + stanza = stanza + + + default_config = { + 'threaded': True +# 'session_db': None + } + + def plugin_init(self): + """ Start the XEP-0325 plugin """ + + self.xmpp.register_handler( + Callback('Control Event:DirectSet', + StanzaPath('message/set'), + self._handle_direct_set)) + + self.xmpp.register_handler( + Callback('Control Event:SetReq', + StanzaPath('iq@type=set/set'), + self._handle_set_req)) + + self.xmpp.register_handler( + Callback('Control Event:SetResponse', + StanzaPath('iq@type=result/setResponse'), + self._handle_set_response)) + + self.xmpp.register_handler( + Callback('Control Event:SetResponseError', + StanzaPath('iq@type=error/setResponse'), + self._handle_set_response)) + + # Server side dicts + self.nodes = {}; + self.sessions = {}; + + self.last_seqnr = 0; + self.seqnr_lock = Lock(); + + ## For testning only + self.test_authenticated_from = "" + + def post_init(self): + """ Init complete. Register our features in Serivce discovery. """ + BasePlugin.post_init(self) + self.xmpp['xep_0030'].add_feature(Control.namespace) + self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()) + + def _new_session(self): + """ Return a new session ID. """ + return str(time.time()) + '-' + self.xmpp.new_id() + + def plugin_end(self): + """ Stop the XEP-0325 plugin """ + self.sessions.clear(); + self.xmpp.remove_handler('Control Event:DirectSet') + self.xmpp.remove_handler('Control Event:SetReq') + self.xmpp.remove_handler('Control Event:SetResponse') + self.xmpp.remove_handler('Control Event:SetResponseError') + self.xmpp['xep_0030'].del_feature(feature=Control.namespace) + self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()); + + + # ================================================================= + # Sensor side (data provider) API + + def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): + """ + Register a sensor/device as available for control requests/commands + through this XMPP instance. + + The device object may by any custom implementation to support + specific devices, but it must implement the functions: + has_control_field + set_control_fields + according to the interfaces shown in the example device.py file. + + Arguments: + nodeId -- The identifier for the device + device -- The device object + commTimeout -- Time in seconds to wait between each callback from device during + a data readout. Float. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + self.nodes[nodeId] = {"device": device, + "commTimeout": commTimeout, + "sourceId": sourceId, + "cacheType": cacheType}; + + def _set_authenticated(self, auth=''): + """ Internal testing function """ + self.test_authenticated_from = auth; + + def _get_new_seqnr(self): + """ Returns a unique sequence number (unique across threads) """ + self.seqnr_lock.acquire(); + self.last_seqnr = self.last_seqnr + 1; + self.seqnr_lock.release(); + return str(self.last_seqnr); + + def _handle_set_req(self, iq): + """ + Event handler for reception of an Iq with set req - this is a + control request. + + Verifies that + - all the requested nodes are available + (if no nodes are specified in the request, assume all nodes) + - all the control fields are available from all requested nodes + (if no nodes are specified in the request, assume all nodes) + + If the request passes verification, the control request is passed + to the devices (in a separate thread). + If the verification fails, a setResponse with error indication + is sent. + """ + + error_msg = ''; + req_ok = True; + missing_node = None; + missing_field = None; + + # Authentication + if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: + # Invalid authentication + req_ok = False; + error_msg = "Access denied"; + + # Nodes + process_nodes = []; + if len(iq['set']['nodes']) > 0: + for n in iq['set']['nodes']: + if not n['nodeId'] in self.nodes: + req_ok = False; + missing_node = n['nodeId']; + error_msg = "Invalid nodeId " + n['nodeId']; + process_nodes = [n['nodeId'] for n in iq['set']['nodes']]; + else: + process_nodes = self.nodes.keys(); + + # Fields - for control we need to find all in all devices, otherwise we reject + process_fields = []; + if len(iq['set']['datas']) > 0: + for f in iq['set']['datas']: + for node in self.nodes: + if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): + req_ok = False; + missing_field = f['name']; + error_msg = "Invalid field " + f['name']; + break; + process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']]; + + if req_ok: + session = self._new_session(); + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']}; + self.sessions[session]["commTimers"] = {}; + self.sessions[session]["nodeDone"] = {}; + # Flag that a reply is exected when we are done + self.sessions[session]["reply"] = True; + + self.sessions[session]["node_list"] = process_nodes; + if self.threaded: + #print("starting thread") + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) + tr_req.start() + #print("started thread") + else: + self._threaded_node_request(session, process_fields); + + else: + iq.reply(); + iq['type'] = 'error'; + iq['setResponse']['responseCode'] = "NotFound"; + if missing_node is not None: + iq['setResponse'].add_node(missing_node); + if missing_field is not None: + iq['setResponse'].add_data(missing_field); + iq['setResponse']['error']['var'] = "Output"; + iq['setResponse']['error']['text'] = error_msg; + iq.send(block=False); + + def _handle_direct_set(self, msg): + """ + Event handler for reception of a Message with set command - this is a + direct control command. + + Verifies that + - all the requested nodes are available + (if no nodes are specified in the request, assume all nodes) + - all the control fields are available from all requested nodes + (if no nodes are specified in the request, assume all nodes) + + If the request passes verification, the control request is passed + to the devices (in a separate thread). + If the verification fails, do nothing. + """ + req_ok = True; + + # Nodes + process_nodes = []; + if len(msg['set']['nodes']) > 0: + for n in msg['set']['nodes']: + if not n['nodeId'] in self.nodes: + req_ok = False; + error_msg = "Invalid nodeId " + n['nodeId']; + process_nodes = [n['nodeId'] for n in msg['set']['nodes']]; + else: + process_nodes = self.nodes.keys(); + + # Fields - for control we need to find all in all devices, otherwise we reject + process_fields = []; + if len(msg['set']['datas']) > 0: + for f in msg['set']['datas']: + for node in self.nodes: + if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): + req_ok = False; + missing_field = f['name']; + error_msg = "Invalid field " + f['name']; + break; + process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']]; + + if req_ok: + session = self._new_session(); + self.sessions[session] = {"from": msg['from'], "to": msg['to']}; + self.sessions[session]["commTimers"] = {}; + self.sessions[session]["nodeDone"] = {}; + self.sessions[session]["reply"] = False; + + self.sessions[session]["node_list"] = process_nodes; + if self.threaded: + #print("starting thread") + tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) + tr_req.start() + #print("started thread") + else: + self._threaded_node_request(session, process_fields); + + + def _threaded_node_request(self, session, process_fields): + """ + Helper function to handle the device control in a separate thread. + + Arguments: + session -- The request session id + process_fields -- The fields to set in the devices. List of tuple format: + (name, datatype, value) + """ + for node in self.sessions[session]["node_list"]: + self.sessions[session]["nodeDone"][node] = False; + + for node in self.sessions[session]["node_list"]: + timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); + self.sessions[session]["commTimers"][node] = timer; + timer.start(); + self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback); + + def _event_comm_timeout(self, session, nodeId): + """ + Triggered if any of the control operations timeout. + Stop communicating with the failing device. + If the control command was an Iq request, sends a failure + message back to the client. + + Arguments: + session -- The request session id + nodeId -- The id of the device which timed out + """ + + if self.sessions[session]["reply"]: + # Reply is exected when we are done + iq = self.xmpp.Iq(); + iq['from'] = self.sessions[session]['to']; + iq['to'] = self.sessions[session]['from']; + iq['type'] = "error"; + iq['id'] = self.sessions[session]['seqnr']; + iq['setResponse']['responseCode'] = "OtherError"; + iq['setResponse'].add_node(nodeId); + iq['setResponse']['error']['var'] = "Output"; + iq['setResponse']['error']['text'] = "Timeout."; + iq.send(block=False); + + ## TODO - should we send one timeout per node?? + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + # The session is complete, delete it + del self.sessions[session]; + + def _all_nodes_done(self, session): + """ + Checks wheter all devices are done replying to the control command. + + Arguments: + session -- The request session id + """ + for n in self.sessions[session]["nodeDone"]: + if not self.sessions[session]["nodeDone"][n]: + return False; + return True; + + def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None): + """ + Callback function called by the devices when the control command is + complete or failed. + If needed, composes a message with the result and sends it back to the + client. + + Arguments: + session -- The request session id + nodeId -- The device id which initiated the callback + result -- The current result status of the control command. Valid values are: + "error" - Set fields failed. + "ok" - All fields were set. + error_field -- [optional] Only applies when result == "error" + The field name that failed (usually means it is missing) + error_msg -- [optional] Only applies when result == "error". + Error details when a request failed. + """ + + if not session in self.sessions: + # This can happend if a session was deleted, like in a timeout. Just drop the data. + return + + if result == "error": + self.sessions[session]["commTimers"][nodeId].cancel(); + + if self.sessions[session]["reply"]: + # Reply is exected when we are done + iq = self.xmpp.Iq(); + iq['from'] = self.sessions[session]['to']; + iq['to'] = self.sessions[session]['from']; + iq['type'] = "error"; + iq['id'] = self.sessions[session]['seqnr']; + iq['setResponse']['responseCode'] = "OtherError"; + iq['setResponse'].add_node(nodeId); + if error_field is not None: + iq['setResponse'].add_data(error_field); + iq['setResponse']['error']['var'] = error_field; + iq['setResponse']['error']['text'] = error_msg; + iq.send(block=False); + + # Drop communication with this device and check if we are done + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + # The session is complete, delete it + del self.sessions[session]; + else: + self.sessions[session]["commTimers"][nodeId].cancel(); + + self.sessions[session]["nodeDone"][nodeId] = True; + if (self._all_nodes_done(session)): + if self.sessions[session]["reply"]: + # Reply is exected when we are done + iq = self.xmpp.Iq(); + iq['from'] = self.sessions[session]['to']; + iq['to'] = self.sessions[session]['from']; + iq['type'] = "result"; + iq['id'] = self.sessions[session]['seqnr']; + iq['setResponse']['responseCode'] = "OK"; + iq.send(block=False); + + # The session is complete, delete it + del self.sessions[session]; + + + # ================================================================= + # Client side (data controller) API + + def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None): + """ + Called on the client side to initiade a control request. + Composes a message with the request and sends it to the device(s). + Does not block, the callback will be called when the device(s) + has responded. + + Arguments: + from_jid -- The jid of the requester + to_jid -- The jid of the device(s) + callback -- The callback function to call when data is availble. + + The callback function must support the following arguments: + + from_jid -- The jid of the responding device(s) + result -- The result of the control request. Valid values are: + "OK" - Control request completed successfully + "NotFound" - One or more nodes or fields are missing + "InsufficientPrivileges" - Not authorized. + "Locked" - Field(s) is locked and cannot + be changed at the moment. + "NotImplemented" - Request feature not implemented. + "FormError" - Error while setting with + a form (not implemented). + "OtherError" - Indicates other types of + errors, such as timeout. + Details in the error_msg. + + + nodeId -- [optional] Only applicable when result == "error" + List of node Ids of failing device(s). + + fields -- [optional] Only applicable when result == "error" + List of fields that failed.[optional] Mandatory when result == "rejected" or "failure". + + error_msg -- Details about why the request failed. + + fields -- Fields to set. List of tuple format: (name, typename, value). + nodeIds -- [optional] Limits the request to the node Ids in this list. + """ + iq = self.xmpp.Iq(); + iq['from'] = from_jid; + iq['to'] = to_jid; + seqnr = self._get_new_seqnr(); + iq['id'] = seqnr; + iq['type'] = "set"; + if nodeIds is not None: + for nodeId in nodeIds: + iq['set'].add_node(nodeId); + if fields is not None: + for name, typename, value in fields: + iq['set'].add_data(name=name, typename=typename, value=value); + + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback}; + iq.send(block=False); + + def set_command(self, from_jid, to_jid, fields, nodeIds=None): + """ + Called on the client side to initiade a control command. + Composes a message with the set commandand sends it to the device(s). + Does not block. Device(s) will not respond, regardless of result. + + Arguments: + from_jid -- The jid of the requester + to_jid -- The jid of the device(s) + + fields -- Fields to set. List of tuple format: (name, typename, value). + nodeIds -- [optional] Limits the request to the node Ids in this list. + """ + msg = self.xmpp.Message(); + msg['from'] = from_jid; + msg['to'] = to_jid; + msg['type'] = "set"; + if nodeIds is not None: + for nodeId in nodeIds: + msg['set'].add_node(nodeId); + if fields is not None: + for name, typename, value in fields: + msg['set'].add_data(name, typename, value); + + # We won't get any reply, so don't create a session + msg.send(); + + def _handle_set_response(self, iq): + """ Received response from device(s) """ + #print("ooh") + seqnr = iq['id']; + from_jid = str(iq['from']); + result = iq['setResponse']['responseCode']; + nodeIds = [n['name'] for n in iq['setResponse']['nodes']]; + fields = [f['name'] for f in iq['setResponse']['datas']]; + error_msg = None; + + if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "": + error_msg = iq['setResponse']['error']['text']; + + callback = self.sessions[seqnr]["callback"]; + callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg); + + diff --git a/sleekxmpp/plugins/xep_0325/device.py b/sleekxmpp/plugins/xep_0325/device.py new file mode 100644 index 00000000..1cb99510 --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/device.py @@ -0,0 +1,125 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import datetime + +class Device(object): + """ + Example implementation of a device control object. + + The device object may by any custom implementation to support + specific devices, but it must implement the functions: + has_control_field + set_control_fields + """ + + def __init__(self, nodeId): + self.nodeId = nodeId; + self.control_fields = {}; + + def has_control_field(self, field, typename): + """ + Returns true if the supplied field name exists + and the type matches for control in this device. + + Arguments: + field -- The field name + typename -- The expected type + """ + if field in self.control_fields and self.control_fields[field]["type"] == typename: + return True; + return False; + + def set_control_fields(self, fields, session, callback): + """ + Starts a control setting procedure. Verifies the fields, + sets the data and (if needed) and calls the callback. + + Arguments: + fields -- List of control fields in tuple format: + (name, typename, value) + session -- Session id, only used in the callback as identifier + callback -- Callback function to call when control set is complete. + + The callback function must support the following arguments: + + session -- Session id, as supplied in the + request_fields call + nodeId -- Identifier for this device + result -- The current result status of the readout. + Valid values are: + "error" - Set fields failed. + "ok" - All fields were set. + error_field -- [optional] Only applies when result == "error" + The field name that failed + (usually means it is missing) + error_msg -- [optional] Only applies when result == "error". + Error details when a request failed. + """ + + if len(fields) > 0: + # Check availiability + for name, typename, value in fields: + if not self.has_control_field(name, typename): + self._send_control_reject(session, name, "NotFound", callback) + return False; + + for name, typename, value in fields: + self._set_field_value(name, value) + + callback(session, result="ok", nodeId=self.nodeId); + return True + + def _send_control_reject(self, session, field, message, callback): + """ + Sends a reject to the caller + + Arguments: + session -- Session id, see definition in + set_control_fields function + callback -- Callback function, see definition in + set_control_fields function + """ + callback(session, result="error", nodeId=self.nodeId, error_field=field, error_msg=message); + + def _add_control_field(self, name, typename, value): + """ + Adds a control field to the device + + Arguments: + name -- Name of the field + typename -- Type of the field, one of: + (boolean, color, string, date, dateTime, + double, duration, int, long, time) + value -- Field value + """ + self.control_fields[name] = {"type": typename, "value": value}; + + def _set_field_value(self, name, value): + """ + Set the value of a control field + + Arguments: + name -- Name of the field + value -- New value for the field + """ + if name in self.control_fields: + self.control_fields[name]["value"] = value; + + def _get_field_value(self, name): + """ + Get the value of a control field. Only used for unit testing. + + Arguments: + name -- Name of the field + """ + if name in self.control_fields: + return self.control_fields[name]["value"]; + return None; diff --git a/sleekxmpp/plugins/xep_0325/stanza/__init__.py b/sleekxmpp/plugins/xep_0325/stanza/__init__.py new file mode 100644 index 00000000..746c2033 --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/stanza/__init__.py @@ -0,0 +1,12 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.plugins.xep_0325.stanza.control import * + diff --git a/sleekxmpp/plugins/xep_0325/stanza/base.py b/sleekxmpp/plugins/xep_0325/stanza/base.py new file mode 100644 index 00000000..1dadcf46 --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/stanza/base.py @@ -0,0 +1,13 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp.xmlstream import ET + +pass diff --git a/sleekxmpp/plugins/xep_0325/stanza/control.py b/sleekxmpp/plugins/xep_0325/stanza/control.py new file mode 100644 index 00000000..2707191f --- /dev/null +++ b/sleekxmpp/plugins/xep_0325/stanza/control.py @@ -0,0 +1,526 @@ +""" + SleekXMPP: The Sleek XMPP Library + Implementation of xeps for Internet of Things + http://wiki.xmpp.org/web/Tech_pages/IoT_systems + Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +from sleekxmpp import Iq, Message +from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID +from re import match + +class Control(ElementBase): + """ Placeholder for the namespace, not used as a stanza """ + namespace = 'urn:xmpp:iot:control' + name = 'control' + plugin_attrib = name + interfaces = set(tuple()) + +class ControlSet(ElementBase): + namespace = 'urn:xmpp:iot:control' + name = 'set' + plugin_attrib = name + interfaces = set(['nodes','datas']) + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._nodes = set() + self._datas = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._nodes = set([node['nodeId'] for node in self['nodes']]) + self._datas = set([data['name'] for data in self['datas']]) + + def add_node(self, nodeId, sourceId=None, cacheType=None): + """ + Add a new node element. Each item is required to have a + nodeId, but may also specify a sourceId value and cacheType. + + Arguments: + nodeId -- The ID for the node. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + if nodeId not in self._nodes: + self._nodes.add((nodeId)) + node = RequestNode(parent=self) + node['nodeId'] = nodeId + node['sourceId'] = sourceId + node['cacheType'] = cacheType + self.iterables.append(node) + return node + return None + + def del_node(self, nodeId): + """ + Remove a single node. + + Arguments: + nodeId -- Node ID of the item to remove. + """ + if nodeId in self._nodes: + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + if node['nodeId'] == nodeId: + self.xml.remove(node.xml) + self.iterables.remove(node) + return True + return False + + def get_nodes(self): + """Return all nodes.""" + nodes = set() + for node in self['substanzas']: + if isinstance(node, RequestNode): + nodes.add(node) + return nodes + + def set_nodes(self, nodes): + """ + Set or replace all nodes. The given nodes must be in a + list or set where each item is a tuple of the form: + (nodeId, sourceId, cacheType) + + Arguments: + nodes -- A series of nodes in tuple format. + """ + self.del_nodes() + for node in nodes: + if isinstance(node, RequestNode): + self.add_node(node['nodeId'], node['sourceId'], node['cacheType']) + else: + nodeId, sourceId, cacheType = node + self.add_node(nodeId, sourceId, cacheType) + + def del_nodes(self): + """Remove all nodes.""" + self._nodes = set() + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + self.xml.remove(node.xml) + self.iterables.remove(node) + + + def add_data(self, name, typename, value): + """ + Add a new data element. + + Arguments: + name -- The name of the data element + typename -- The type of data element + (boolean, color, string, date, dateTime, + double, duration, int, long, time) + value -- The value of the data element + """ + if name not in self._datas: + dataObj = None; + if typename == "boolean": + dataObj = BooleanParameter(parent=self); + elif typename == "color": + dataObj = ColorParameter(parent=self); + elif typename == "string": + dataObj = StringParameter(parent=self); + elif typename == "date": + dataObj = DateParameter(parent=self); + elif typename == "dateTime": + dataObj = DateTimeParameter(parent=self); + elif typename == "double": + dataObj = DoubleParameter(parent=self); + elif typename == "duration": + dataObj = DurationParameter(parent=self); + elif typename == "int": + dataObj = IntParameter(parent=self); + elif typename == "long": + dataObj = LongParameter(parent=self); + elif typename == "time": + dataObj = TimeParameter(parent=self); + + dataObj['name'] = name; + dataObj['value'] = value; + + self._datas.add(name) + self.iterables.append(dataObj) + return dataObj + return None + + def del_data(self, name): + """ + Remove a single data element. + + Arguments: + data_name -- The data element name to remove. + """ + if name in self._datas: + datas = [i for i in self.iterables if isinstance(i, BaseParameter)] + for data in datas: + if data['name'] == name: + self.xml.remove(data.xml) + self.iterables.remove(data) + return True + return False + + def get_datas(self): + """ Return all data elements. """ + datas = set() + for data in self['substanzas']: + if isinstance(data, BaseParameter): + datas.add(data) + return datas + + def set_datas(self, datas): + """ + Set or replace all data elements. The given elements must be in a + list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum) + + Arguments: + datas -- A series of data elements. + """ + self.del_datas() + for data in datas: + self.add_data(name=data['name'], typename=data._get_typename(), value=data['value']) + + def del_datas(self): + """Remove all data elements.""" + self._datas = set() + datas = [i for i in self.iterables if isinstance(i, BaseParameter)] + for data in datas: + self.xml.remove(data.xml) + self.iterables.remove(data) + + +class RequestNode(ElementBase): + """ Node element in a request """ + namespace = 'urn:xmpp:iot:control' + name = 'node' + plugin_attrib = name + interfaces = set(['nodeId','sourceId','cacheType']) + + +class ControlSetResponse(ElementBase): + namespace = 'urn:xmpp:iot:control' + name = 'setResponse' + plugin_attrib = name + interfaces = set(['responseCode']) + + def __init__(self, xml=None, parent=None): + ElementBase.__init__(self, xml, parent); + self._nodes = set() + self._datas = set() + + def setup(self, xml=None): + """ + Populate the stanza object using an optional XML object. + + Overrides ElementBase.setup + + Caches item information. + + Arguments: + xml -- Use an existing XML object for the stanza's values. + """ + ElementBase.setup(self, xml) + self._nodes = set([node['nodeId'] for node in self['nodes']]) + self._datas = set([data['name'] for data in self['datas']]) + + def add_node(self, nodeId, sourceId=None, cacheType=None): + """ + Add a new node element. Each item is required to have a + nodeId, but may also specify a sourceId value and cacheType. + + Arguments: + nodeId -- The ID for the node. + sourceId -- [optional] identifying the data source controlling the device + cacheType -- [optional] narrowing down the search to a specific kind of node + """ + if nodeId not in self._nodes: + self._nodes.add(nodeId) + node = RequestNode(parent=self) + node['nodeId'] = nodeId + node['sourceId'] = sourceId + node['cacheType'] = cacheType + self.iterables.append(node) + return node + return None + + def del_node(self, nodeId): + """ + Remove a single node. + + Arguments: + nodeId -- Node ID of the item to remove. + """ + if nodeId in self._nodes: + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + if node['nodeId'] == nodeId: + self.xml.remove(node.xml) + self.iterables.remove(node) + return True + return False + + def get_nodes(self): + """Return all nodes.""" + nodes = set() + for node in self['substanzas']: + if isinstance(node, RequestNode): + nodes.add(node) + return nodes + + def set_nodes(self, nodes): + """ + Set or replace all nodes. The given nodes must be in a + list or set where each item is a tuple of the form: + (nodeId, sourceId, cacheType) + + Arguments: + nodes -- A series of nodes in tuple format. + """ + self.del_nodes() + for node in nodes: + if isinstance(node, RequestNode): + self.add_node(node['nodeId'], node['sourceId'], node['cacheType']) + else: + nodeId, sourceId, cacheType = node + self.add_node(nodeId, sourceId, cacheType) + + def del_nodes(self): + """Remove all nodes.""" + self._nodes = set() + nodes = [i for i in self.iterables if isinstance(i, RequestNode)] + for node in nodes: + self.xml.remove(node.xml) + self.iterables.remove(node) + + + def add_data(self, name): + """ + Add a new ResponseParameter element. + + Arguments: + name -- Name of the parameter + """ + if name not in self._datas: + self._datas.add(name) + data = ResponseParameter(parent=self) + data['name'] = name; + self.iterables.append(data) + return data + return None + + def del_data(self, name): + """ + Remove a single ResponseParameter element. + + Arguments: + name -- The data element name to remove. + """ + if name in self._datas: + datas = [i for i in self.iterables if isinstance(i, ResponseParameter)] + for data in datas: + if data['name'] == name: + self.xml.remove(data.xml) + self.iterables.remove(data) + return True + return False + + def get_datas(self): + """ Return all ResponseParameter elements. """ + datas = set() + for data in self['substanzas']: + if isinstance(data, ResponseParameter): + datas.add(data) + return datas + + def set_datas(self, datas): + """ + Set or replace all data elements. The given elements must be in a + list or set of ResponseParameter elements + + Arguments: + datas -- A series of data element names. + """ + self.del_datas() + for data in datas: + self.add_data(name=data['name']) + + def del_datas(self): + """Remove all ResponseParameter elements.""" + self._datas = set() + datas = [i for i in self.iterables if isinstance(i, ResponseParameter)] + for data in datas: + self.xml.remove(data.xml) + self.iterables.remove(data) + + +class Error(ElementBase): + namespace = 'urn:xmpp:iot:control' + name = 'error' + plugin_attrib = name + interfaces = set(['var','text']) + + def get_text(self): + """Return then contents inside the XML tag.""" + return self.xml.text + + def set_text(self, value): + """Set then contents inside the XML tag. + + Arguments: + value -- string + """ + + self.xml.text = value; + return self + + def del_text(self): + """Remove the contents inside the XML tag.""" + self.xml.text = "" + return self + +class ResponseParameter(ElementBase): + """ + Parameter element in ControlSetResponse. + """ + namespace = 'urn:xmpp:iot:control' + name = 'parameter' + plugin_attrib = name + interfaces = set(['name']); + + +class BaseParameter(ElementBase): + """ + Parameter element in SetCommand. This is a base class, + all instances of parameters added to SetCommand must be of types: + BooleanParameter + ColorParameter + StringParameter + DateParameter + DateTimeParameter + DoubleParameter + DurationParameter + IntParameter + LongParameter + TimeParameter + """ + namespace = 'urn:xmpp:iot:control' + name = 'baseParameter' + plugin_attrib = name + interfaces = set(['name','value']); + + def _get_typename(self): + return self.name; + +class BooleanParameter(BaseParameter): + """ + Field data of type boolean. + Note that the value is expressed as a string. + """ + name = 'boolean' + plugin_attrib = name + +class ColorParameter(BaseParameter): + """ + Field data of type color. + Note that the value is expressed as a string. + """ + name = 'color' + plugin_attrib = name + +class StringParameter(BaseParameter): + """ + Field data of type string. + """ + name = 'string' + plugin_attrib = name + +class DateParameter(BaseParameter): + """ + Field data of type date. + Note that the value is expressed as a string. + """ + name = 'date' + plugin_attrib = name + +class DateTimeParameter(BaseParameter): + """ + Field data of type dateTime. + Note that the value is expressed as a string. + """ + name = 'dateTime' + plugin_attrib = name + +class DoubleParameter(BaseParameter): + """ + Field data of type double. + Note that the value is expressed as a string. + """ + name = 'double' + plugin_attrib = name + +class DurationParameter(BaseParameter): + """ + Field data of type duration. + Note that the value is expressed as a string. + """ + name = 'duration' + plugin_attrib = name + +class IntParameter(BaseParameter): + """ + Field data of type int. + Note that the value is expressed as a string. + """ + name = 'int' + plugin_attrib = name + +class LongParameter(BaseParameter): + """ + Field data of type long (64-bit int). + Note that the value is expressed as a string. + """ + name = 'long' + plugin_attrib = name + +class TimeParameter(BaseParameter): + """ + Field data of type time. + Note that the value is expressed as a string. + """ + name = 'time' + plugin_attrib = name + +register_stanza_plugin(Iq, ControlSet) +register_stanza_plugin(Message, ControlSet) + +register_stanza_plugin(ControlSet, RequestNode, iterable=True) + +register_stanza_plugin(ControlSet, BooleanParameter, iterable=True) +register_stanza_plugin(ControlSet, ColorParameter, iterable=True) +register_stanza_plugin(ControlSet, StringParameter, iterable=True) +register_stanza_plugin(ControlSet, DateParameter, iterable=True) +register_stanza_plugin(ControlSet, DateTimeParameter, iterable=True) +register_stanza_plugin(ControlSet, DoubleParameter, iterable=True) +register_stanza_plugin(ControlSet, DurationParameter, iterable=True) +register_stanza_plugin(ControlSet, IntParameter, iterable=True) +register_stanza_plugin(ControlSet, LongParameter, iterable=True) +register_stanza_plugin(ControlSet, TimeParameter, iterable=True) + +register_stanza_plugin(Iq, ControlSetResponse) +register_stanza_plugin(ControlSetResponse, Error) +register_stanza_plugin(ControlSetResponse, RequestNode, iterable=True) +register_stanza_plugin(ControlSetResponse, ResponseParameter, iterable=True) + |