summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0325/control.py
diff options
context:
space:
mode:
authorJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
committerJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
commit45689fd8799186fd6be0b308745aef428ab50dcc (patch)
tree0dd4e5f41684ad8f4b31920f1b8b9d3e27fd1527 /sleekxmpp/plugins/xep_0325/control.py
parentb7adaafb3ecb0a615c93fbb1830e66357b081fe3 (diff)
downloadslixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.gz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.bz2
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.xz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.zip
First implementation of the xep_0323 and xep_325 used in IoT systems. Tests are added for stanza and streams
Diffstat (limited to 'sleekxmpp/plugins/xep_0325/control.py')
-rw-r--r--sleekxmpp/plugins/xep_0325/control.py574
1 files changed, 574 insertions, 0 deletions
diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py
new file mode 100644
index 00000000..e34eb2c2
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/control.py
@@ -0,0 +1,574 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+import logging
+import time
+from threading import Thread, Timer, Lock
+
+from sleekxmpp.xmlstream import JID
+from sleekxmpp.xmlstream.handler import Callback
+from sleekxmpp.xmlstream.matcher import StanzaPath
+from sleekxmpp.plugins.base import BasePlugin
+from sleekxmpp.plugins.xep_0325 import stanza
+from sleekxmpp.plugins.xep_0325.stanza import Control
+
+
+log = logging.getLogger(__name__)
+
+
+class XEP_0325(BasePlugin):
+
+ """
+ XEP-0325: IoT Control
+
+
+ Actuators are devices in sensor networks that can be controlled through
+ the network and act with the outside world. In sensor networks and
+ Internet of Things applications, actuators make it possible to automate
+ real-world processes.
+ This plugin implements a mechanism whereby actuators can be controlled
+ in XMPP-based sensor networks, making it possible to integrate sensors
+ and actuators of different brands, makes and models into larger
+ Internet of Things applications.
+
+ Also see <http://xmpp.org/extensions/xep-0325.html>
+
+ Configuration Values:
+ threaded -- Indicates if communication with sensors should be threaded.
+ Defaults to True.
+
+ Events:
+ Sensor side
+ -----------
+ Control Event:DirectSet -- Received a control message
+ Control Event:SetReq -- Received a control request
+
+ Client side
+ -----------
+ Control Event:SetResponse -- Received a response to a
+ control request, type result
+ Control Event:SetResponseError -- Received a response to a
+ control request, type error
+
+ Attributes:
+ threaded -- Indicates if command events should be threaded.
+ Defaults to True.
+ sessions -- A dictionary or equivalent backend mapping
+ session IDs to dictionaries containing data
+ relevant to a request's session. This dictionary is used
+ both by the client and sensor side. On client side, seqnr
+ is used as key, while on sensor side, a session_id is used
+ as key. This ensures that the two will not collide, so
+ one instance can be both client and sensor.
+ Sensor side
+ -----------
+ nodes -- A dictionary mapping sensor nodes that are serviced through
+ this XMPP instance to their device handlers ("drivers").
+ Client side
+ -----------
+ last_seqnr -- The last used sequence number (integer). One sequence of
+ communication (e.g. -->request, <--accept, <--fields)
+ between client and sensor is identified by a unique
+ sequence number (unique between the client/sensor pair)
+
+ Methods:
+ plugin_init -- Overrides base_plugin.plugin_init
+ post_init -- Overrides base_plugin.post_init
+ plugin_end -- Overrides base_plugin.plugin_end
+
+ Sensor side
+ -----------
+ register_node -- Register a sensor as available from this XMPP
+ instance.
+
+ Client side
+ -----------
+ set_request -- Initiates a control request to modify data in
+ sensor(s). Non-blocking, a callback function will
+ be called when the sensor has responded.
+ set_command -- Initiates a control command to modify data in
+ sensor(s). Non-blocking. The sensor(s) will not
+ respond regardless of the result of the command,
+ so no callback is made.
+
+ """
+
+ name = 'xep_0325'
+ description = 'XEP-0325 Internet of Things - Control'
+ dependencies = set(['xep_0030'])
+ stanza = stanza
+
+
+ default_config = {
+ 'threaded': True
+# 'session_db': None
+ }
+
+ def plugin_init(self):
+ """ Start the XEP-0325 plugin """
+
+ self.xmpp.register_handler(
+ Callback('Control Event:DirectSet',
+ StanzaPath('message/set'),
+ self._handle_direct_set))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetReq',
+ StanzaPath('iq@type=set/set'),
+ self._handle_set_req))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetResponse',
+ StanzaPath('iq@type=result/setResponse'),
+ self._handle_set_response))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetResponseError',
+ StanzaPath('iq@type=error/setResponse'),
+ self._handle_set_response))
+
+ # Server side dicts
+ self.nodes = {};
+ self.sessions = {};
+
+ self.last_seqnr = 0;
+ self.seqnr_lock = Lock();
+
+ ## For testning only
+ self.test_authenticated_from = ""
+
+ def post_init(self):
+ """ Init complete. Register our features in Serivce discovery. """
+ BasePlugin.post_init(self)
+ self.xmpp['xep_0030'].add_feature(Control.namespace)
+ self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple())
+
+ def _new_session(self):
+ """ Return a new session ID. """
+ return str(time.time()) + '-' + self.xmpp.new_id()
+
+ def plugin_end(self):
+ """ Stop the XEP-0325 plugin """
+ self.sessions.clear();
+ self.xmpp.remove_handler('Control Event:DirectSet')
+ self.xmpp.remove_handler('Control Event:SetReq')
+ self.xmpp.remove_handler('Control Event:SetResponse')
+ self.xmpp.remove_handler('Control Event:SetResponseError')
+ self.xmpp['xep_0030'].del_feature(feature=Control.namespace)
+ self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple());
+
+
+ # =================================================================
+ # Sensor side (data provider) API
+
+ def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
+ """
+ Register a sensor/device as available for control requests/commands
+ through this XMPP instance.
+
+ The device object may by any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_control_field
+ set_control_fields
+ according to the interfaces shown in the example device.py file.
+
+ Arguments:
+ nodeId -- The identifier for the device
+ device -- The device object
+ commTimeout -- Time in seconds to wait between each callback from device during
+ a data readout. Float.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ self.nodes[nodeId] = {"device": device,
+ "commTimeout": commTimeout,
+ "sourceId": sourceId,
+ "cacheType": cacheType};
+
+ def _set_authenticated(self, auth=''):
+ """ Internal testing function """
+ self.test_authenticated_from = auth;
+
+ def _get_new_seqnr(self):
+ """ Returns a unique sequence number (unique across threads) """
+ self.seqnr_lock.acquire();
+ self.last_seqnr = self.last_seqnr + 1;
+ self.seqnr_lock.release();
+ return str(self.last_seqnr);
+
+ def _handle_set_req(self, iq):
+ """
+ Event handler for reception of an Iq with set req - this is a
+ control request.
+
+ Verifies that
+ - all the requested nodes are available
+ (if no nodes are specified in the request, assume all nodes)
+ - all the control fields are available from all requested nodes
+ (if no nodes are specified in the request, assume all nodes)
+
+ If the request passes verification, the control request is passed
+ to the devices (in a separate thread).
+ If the verification fails, a setResponse with error indication
+ is sent.
+ """
+
+ error_msg = '';
+ req_ok = True;
+ missing_node = None;
+ missing_field = None;
+
+ # Authentication
+ if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
+ # Invalid authentication
+ req_ok = False;
+ error_msg = "Access denied";
+
+ # Nodes
+ process_nodes = [];
+ if len(iq['set']['nodes']) > 0:
+ for n in iq['set']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ missing_node = n['nodeId'];
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in iq['set']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - for control we need to find all in all devices, otherwise we reject
+ process_fields = [];
+ if len(iq['set']['datas']) > 0:
+ for f in iq['set']['datas']:
+ for node in self.nodes:
+ if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']];
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+ # Flag that a reply is exected when we are done
+ self.sessions[session]["reply"] = True;
+
+ self.sessions[session]["node_list"] = process_nodes;
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields);
+
+ else:
+ iq.reply();
+ iq['type'] = 'error';
+ iq['setResponse']['responseCode'] = "NotFound";
+ if missing_node is not None:
+ iq['setResponse'].add_node(missing_node);
+ if missing_field is not None:
+ iq['setResponse'].add_data(missing_field);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
+
+ def _handle_direct_set(self, msg):
+ """
+ Event handler for reception of a Message with set command - this is a
+ direct control command.
+
+ Verifies that
+ - all the requested nodes are available
+ (if no nodes are specified in the request, assume all nodes)
+ - all the control fields are available from all requested nodes
+ (if no nodes are specified in the request, assume all nodes)
+
+ If the request passes verification, the control request is passed
+ to the devices (in a separate thread).
+ If the verification fails, do nothing.
+ """
+ req_ok = True;
+
+ # Nodes
+ process_nodes = [];
+ if len(msg['set']['nodes']) > 0:
+ for n in msg['set']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in msg['set']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - for control we need to find all in all devices, otherwise we reject
+ process_fields = [];
+ if len(msg['set']['datas']) > 0:
+ for f in msg['set']['datas']:
+ for node in self.nodes:
+ if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']];
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": msg['from'], "to": msg['to']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+ self.sessions[session]["reply"] = False;
+
+ self.sessions[session]["node_list"] = process_nodes;
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields);
+
+
+ def _threaded_node_request(self, session, process_fields):
+ """
+ Helper function to handle the device control in a separate thread.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to set in the devices. List of tuple format:
+ (name, datatype, value)
+ """
+ for node in self.sessions[session]["node_list"]:
+ self.sessions[session]["nodeDone"][node] = False;
+
+ for node in self.sessions[session]["node_list"]:
+ timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
+ self.sessions[session]["commTimers"][node] = timer;
+ timer.start();
+ self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback);
+
+ def _event_comm_timeout(self, session, nodeId):
+ """
+ Triggered if any of the control operations timeout.
+ Stop communicating with the failing device.
+ If the control command was an Iq request, sends a failure
+ message back to the client.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The id of the device which timed out
+ """
+
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = "Timeout.";
+ iq.send(block=False);
+
+ ## TODO - should we send one timeout per node??
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ del self.sessions[session];
+
+ def _all_nodes_done(self, session):
+ """
+ Checks wheter all devices are done replying to the control command.
+
+ Arguments:
+ session -- The request session id
+ """
+ for n in self.sessions[session]["nodeDone"]:
+ if not self.sessions[session]["nodeDone"][n]:
+ return False;
+ return True;
+
+ def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None):
+ """
+ Callback function called by the devices when the control command is
+ complete or failed.
+ If needed, composes a message with the result and sends it back to the
+ client.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The device id which initiated the callback
+ result -- The current result status of the control command. Valid values are:
+ "error" - Set fields failed.
+ "ok" - All fields were set.
+ error_field -- [optional] Only applies when result == "error"
+ The field name that failed (usually means it is missing)
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+ """
+
+ if not session in self.sessions:
+ # This can happend if a session was deleted, like in a timeout. Just drop the data.
+ return
+
+ if result == "error":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
+ if error_field is not None:
+ iq['setResponse'].add_data(error_field);
+ iq['setResponse']['error']['var'] = error_field;
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ del self.sessions[session];
+ else:
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "result";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OK";
+ iq.send(block=False);
+
+ # The session is complete, delete it
+ del self.sessions[session];
+
+
+ # =================================================================
+ # Client side (data controller) API
+
+ def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None):
+ """
+ Called on the client side to initiade a control request.
+ Composes a message with the request and sends it to the device(s).
+ Does not block, the callback will be called when the device(s)
+ has responded.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+ callback -- The callback function to call when data is availble.
+
+ The callback function must support the following arguments:
+
+ from_jid -- The jid of the responding device(s)
+ result -- The result of the control request. Valid values are:
+ "OK" - Control request completed successfully
+ "NotFound" - One or more nodes or fields are missing
+ "InsufficientPrivileges" - Not authorized.
+ "Locked" - Field(s) is locked and cannot
+ be changed at the moment.
+ "NotImplemented" - Request feature not implemented.
+ "FormError" - Error while setting with
+ a form (not implemented).
+ "OtherError" - Indicates other types of
+ errors, such as timeout.
+ Details in the error_msg.
+
+
+ nodeId -- [optional] Only applicable when result == "error"
+ List of node Ids of failing device(s).
+
+ fields -- [optional] Only applicable when result == "error"
+ List of fields that failed.[optional] Mandatory when result == "rejected" or "failure".
+
+ error_msg -- Details about why the request failed.
+
+ fields -- Fields to set. List of tuple format: (name, typename, value).
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ """
+ iq = self.xmpp.Iq();
+ iq['from'] = from_jid;
+ iq['to'] = to_jid;
+ seqnr = self._get_new_seqnr();
+ iq['id'] = seqnr;
+ iq['type'] = "set";
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ iq['set'].add_node(nodeId);
+ if fields is not None:
+ for name, typename, value in fields:
+ iq['set'].add_data(name=name, typename=typename, value=value);
+
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback};
+ iq.send(block=False);
+
+ def set_command(self, from_jid, to_jid, fields, nodeIds=None):
+ """
+ Called on the client side to initiade a control command.
+ Composes a message with the set commandand sends it to the device(s).
+ Does not block. Device(s) will not respond, regardless of result.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+
+ fields -- Fields to set. List of tuple format: (name, typename, value).
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ """
+ msg = self.xmpp.Message();
+ msg['from'] = from_jid;
+ msg['to'] = to_jid;
+ msg['type'] = "set";
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ msg['set'].add_node(nodeId);
+ if fields is not None:
+ for name, typename, value in fields:
+ msg['set'].add_data(name, typename, value);
+
+ # We won't get any reply, so don't create a session
+ msg.send();
+
+ def _handle_set_response(self, iq):
+ """ Received response from device(s) """
+ #print("ooh")
+ seqnr = iq['id'];
+ from_jid = str(iq['from']);
+ result = iq['setResponse']['responseCode'];
+ nodeIds = [n['name'] for n in iq['setResponse']['nodes']];
+ fields = [f['name'] for f in iq['setResponse']['datas']];
+ error_msg = None;
+
+ if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "":
+ error_msg = iq['setResponse']['error']['text'];
+
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg);
+
+