diff options
author | Mike Taylor <bear42@gmail.com> | 2015-04-11 20:12:19 -0400 |
---|---|---|
committer | Mike Taylor <bear42@gmail.com> | 2015-04-11 20:12:19 -0400 |
commit | 86e85f98357b40941fddb7a2dbf8fbe16f96032e (patch) | |
tree | 65a86c0378c5e59f1822dfba8ed484f77ff5ebeb /sleekxmpp/plugins/xep_0325/control.py | |
parent | cc145d20b077442b7d4118d4d2f4c27e1b0faf0c (diff) | |
parent | 073e85381a86069e931369bb5353cab2a2e3682d (diff) | |
download | slixmpp-86e85f98357b40941fddb7a2dbf8fbe16f96032e.tar.gz slixmpp-86e85f98357b40941fddb7a2dbf8fbe16f96032e.tar.bz2 slixmpp-86e85f98357b40941fddb7a2dbf8fbe16f96032e.tar.xz slixmpp-86e85f98357b40941fddb7a2dbf8fbe16f96032e.zip |
Merge pull request #313 from mayflower/develop
Proposing #310 again in fixed version
Diffstat (limited to 'sleekxmpp/plugins/xep_0325/control.py')
-rw-r--r-- | sleekxmpp/plugins/xep_0325/control.py | 377 |
1 files changed, 186 insertions, 191 deletions
diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py index e34eb2c2..11e7a045 100644 --- a/sleekxmpp/plugins/xep_0325/control.py +++ b/sleekxmpp/plugins/xep_0325/control.py @@ -12,7 +12,6 @@ import logging import time from threading import Thread, Timer, Lock -from sleekxmpp.xmlstream import JID from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath from sleekxmpp.plugins.base import BasePlugin @@ -26,16 +25,16 @@ log = logging.getLogger(__name__) class XEP_0325(BasePlugin): """ - XEP-0325: IoT Control + XEP-0325: IoT Control - Actuators are devices in sensor networks that can be controlled through - the network and act with the outside world. In sensor networks and - Internet of Things applications, actuators make it possible to automate - real-world processes. - This plugin implements a mechanism whereby actuators can be controlled - in XMPP-based sensor networks, making it possible to integrate sensors - and actuators of different brands, makes and models into larger + Actuators are devices in sensor networks that can be controlled through + the network and act with the outside world. In sensor networks and + Internet of Things applications, actuators make it possible to automate + real-world processes. + This plugin implements a mechanism whereby actuators can be controlled + in XMPP-based sensor networks, making it possible to integrate sensors + and actuators of different brands, makes and models into larger Internet of Things applications. Also see <http://xmpp.org/extensions/xep-0325.html> @@ -52,9 +51,9 @@ class XEP_0325(BasePlugin): Client side ----------- - Control Event:SetResponse -- Received a response to a + Control Event:SetResponse -- Received a response to a control request, type result - Control Event:SetResponseError -- Received a response to a + Control Event:SetResponseError -- Received a response to a control request, type error Attributes: @@ -65,7 +64,7 @@ class XEP_0325(BasePlugin): relevant to a request's session. This dictionary is used both by the client and sensor side. On client side, seqnr is used as key, while on sensor side, a session_id is used - as key. This ensures that the two will not collide, so + as key. This ensures that the two will not collide, so one instance can be both client and sensor. Sensor side ----------- @@ -85,15 +84,15 @@ class XEP_0325(BasePlugin): Sensor side ----------- - register_node -- Register a sensor as available from this XMPP + register_node -- Register a sensor as available from this XMPP instance. Client side ----------- - set_request -- Initiates a control request to modify data in + set_request -- Initiates a control request to modify data in sensor(s). Non-blocking, a callback function will be called when the sensor has responded. - set_command -- Initiates a control command to modify data in + set_command -- Initiates a control command to modify data in sensor(s). Non-blocking. The sensor(s) will not respond regardless of the result of the command, so no callback is made. @@ -102,7 +101,7 @@ class XEP_0325(BasePlugin): name = 'xep_0325' description = 'XEP-0325 Internet of Things - Control' - dependencies = set(['xep_0030']) + dependencies = set(['xep_0030']) stanza = stanza @@ -135,11 +134,11 @@ class XEP_0325(BasePlugin): self._handle_set_response)) # Server side dicts - self.nodes = {}; - self.sessions = {}; + self.nodes = {} + self.sessions = {} - self.last_seqnr = 0; - self.seqnr_lock = Lock(); + self.last_seqnr = 0 + self.seqnr_lock = Lock() ## For testning only self.test_authenticated_from = "" @@ -156,13 +155,13 @@ class XEP_0325(BasePlugin): def plugin_end(self): """ Stop the XEP-0325 plugin """ - self.sessions.clear(); + self.sessions.clear() self.xmpp.remove_handler('Control Event:DirectSet') self.xmpp.remove_handler('Control Event:SetReq') self.xmpp.remove_handler('Control Event:SetResponse') self.xmpp.remove_handler('Control Event:SetResponseError') self.xmpp['xep_0030'].del_feature(feature=Control.namespace) - self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()); + self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()) # ================================================================= @@ -170,10 +169,10 @@ class XEP_0325(BasePlugin): def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): """ - Register a sensor/device as available for control requests/commands - through this XMPP instance. + Register a sensor/device as available for control requests/commands + through this XMPP instance. - The device object may by any custom implementation to support + The device object may by any custom implementation to support specific devices, but it must implement the functions: has_control_field set_control_fields @@ -185,30 +184,30 @@ class XEP_0325(BasePlugin): commTimeout -- Time in seconds to wait between each callback from device during a data readout. Float. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ - self.nodes[nodeId] = {"device": device, + self.nodes[nodeId] = {"device": device, "commTimeout": commTimeout, - "sourceId": sourceId, - "cacheType": cacheType}; + "sourceId": sourceId, + "cacheType": cacheType} def _set_authenticated(self, auth=''): """ Internal testing function """ - self.test_authenticated_from = auth; + self.test_authenticated_from = auth def _get_new_seqnr(self): """ Returns a unique sequence number (unique across threads) """ - self.seqnr_lock.acquire(); - self.last_seqnr = self.last_seqnr + 1; - self.seqnr_lock.release(); - return str(self.last_seqnr); + self.seqnr_lock.acquire() + self.last_seqnr += 1 + self.seqnr_lock.release() + return str(self.last_seqnr) def _handle_set_req(self, iq): """ - Event handler for reception of an Iq with set req - this is a + Event handler for reception of an Iq with set req - this is a control request. - Verifies that + Verifies that - all the requested nodes are available (if no nodes are specified in the request, assume all nodes) - all the control fields are available from all requested nodes @@ -216,80 +215,79 @@ class XEP_0325(BasePlugin): If the request passes verification, the control request is passed to the devices (in a separate thread). - If the verification fails, a setResponse with error indication + If the verification fails, a setResponse with error indication is sent. """ - error_msg = ''; - req_ok = True; - missing_node = None; - missing_field = None; + error_msg = '' + req_ok = True + missing_node = None + missing_field = None # Authentication if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: # Invalid authentication - req_ok = False; - error_msg = "Access denied"; + req_ok = False + error_msg = "Access denied" # Nodes - process_nodes = []; if len(iq['set']['nodes']) > 0: for n in iq['set']['nodes']: if not n['nodeId'] in self.nodes: - req_ok = False; - missing_node = n['nodeId']; - error_msg = "Invalid nodeId " + n['nodeId']; - process_nodes = [n['nodeId'] for n in iq['set']['nodes']]; + req_ok = False + missing_node = n['nodeId'] + error_msg = "Invalid nodeId " + n['nodeId'] + process_nodes = [n['nodeId'] for n in iq['set']['nodes']] else: - process_nodes = self.nodes.keys(); + process_nodes = self.nodes.keys() # Fields - for control we need to find all in all devices, otherwise we reject - process_fields = []; + process_fields = [] if len(iq['set']['datas']) > 0: for f in iq['set']['datas']: for node in self.nodes: if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): - req_ok = False; - missing_field = f['name']; - error_msg = "Invalid field " + f['name']; - break; - process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']]; + req_ok = False + missing_field = f['name'] + error_msg = "Invalid field " + f['name'] + break + process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']] if req_ok: - session = self._new_session(); - self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']}; - self.sessions[session]["commTimers"] = {}; - self.sessions[session]["nodeDone"] = {}; + session = self._new_session() + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']} + self.sessions[session]["commTimers"] = {} + self.sessions[session]["nodeDone"] = {} # Flag that a reply is exected when we are done - self.sessions[session]["reply"] = True; + self.sessions[session]["reply"] = True - self.sessions[session]["node_list"] = process_nodes; + self.sessions[session]["node_list"] = process_nodes if self.threaded: #print("starting thread") tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) tr_req.start() #print("started thread") else: - self._threaded_node_request(session, process_fields); + self._threaded_node_request(session, process_fields) else: - iq.reply(); - iq['type'] = 'error'; - iq['setResponse']['responseCode'] = "NotFound"; + iq.reply() + iq['type'] = 'error' + iq['setResponse']['responseCode'] = "NotFound" if missing_node is not None: - iq['setResponse'].add_node(missing_node); + iq['setResponse'].add_node(missing_node) if missing_field is not None: - iq['setResponse'].add_data(missing_field); - iq['setResponse']['error']['var'] = "Output"; - iq['setResponse']['error']['text'] = error_msg; - iq.send(block=False); + iq['setResponse'].add_data(missing_field) + iq['setResponse']['error']['var'] = "Output" + iq['setResponse']['error']['text'] = error_msg + iq.send(block=False) def _handle_direct_set(self, msg): """ - Event handler for reception of a Message with set command - this is a + Event handler for reception of a Message with set command - this is a direct control command. - Verifies that + Verifies that - all the requested nodes are available (if no nodes are specified in the request, assume all nodes) - all the control fields are available from all requested nodes @@ -299,73 +297,72 @@ class XEP_0325(BasePlugin): to the devices (in a separate thread). If the verification fails, do nothing. """ - req_ok = True; + req_ok = True # Nodes - process_nodes = []; if len(msg['set']['nodes']) > 0: for n in msg['set']['nodes']: if not n['nodeId'] in self.nodes: - req_ok = False; - error_msg = "Invalid nodeId " + n['nodeId']; - process_nodes = [n['nodeId'] for n in msg['set']['nodes']]; + req_ok = False + error_msg = "Invalid nodeId " + n['nodeId'] + process_nodes = [n['nodeId'] for n in msg['set']['nodes']] else: - process_nodes = self.nodes.keys(); + process_nodes = self.nodes.keys() # Fields - for control we need to find all in all devices, otherwise we reject - process_fields = []; + process_fields = [] if len(msg['set']['datas']) > 0: for f in msg['set']['datas']: for node in self.nodes: if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): - req_ok = False; - missing_field = f['name']; - error_msg = "Invalid field " + f['name']; - break; - process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']]; + req_ok = False + missing_field = f['name'] + error_msg = "Invalid field " + f['name'] + break + process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']] if req_ok: - session = self._new_session(); - self.sessions[session] = {"from": msg['from'], "to": msg['to']}; - self.sessions[session]["commTimers"] = {}; - self.sessions[session]["nodeDone"] = {}; - self.sessions[session]["reply"] = False; + session = self._new_session() + self.sessions[session] = {"from": msg['from'], "to": msg['to']} + self.sessions[session]["commTimers"] = {} + self.sessions[session]["nodeDone"] = {} + self.sessions[session]["reply"] = False - self.sessions[session]["node_list"] = process_nodes; + self.sessions[session]["node_list"] = process_nodes if self.threaded: #print("starting thread") tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) tr_req.start() #print("started thread") else: - self._threaded_node_request(session, process_fields); + self._threaded_node_request(session, process_fields) def _threaded_node_request(self, session, process_fields): - """ + """ Helper function to handle the device control in a separate thread. - + Arguments: session -- The request session id process_fields -- The fields to set in the devices. List of tuple format: (name, datatype, value) """ for node in self.sessions[session]["node_list"]: - self.sessions[session]["nodeDone"][node] = False; + self.sessions[session]["nodeDone"][node] = False for node in self.sessions[session]["node_list"]: - timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); - self.sessions[session]["commTimers"][node] = timer; - timer.start(); - self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback); + timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)) + self.sessions[session]["commTimers"][node] = timer + timer.start() + self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback) def _event_comm_timeout(self, session, nodeId): - """ + """ Triggered if any of the control operations timeout. Stop communicating with the failing device. - If the control command was an Iq request, sends a failure - message back to the client. - + If the control command was an Iq request, sends a failure + message back to the client. + Arguments: session -- The request session id nodeId -- The id of the device which timed out @@ -373,51 +370,51 @@ class XEP_0325(BasePlugin): if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "error"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OtherError"; - iq['setResponse'].add_node(nodeId); - iq['setResponse']['error']['var'] = "Output"; - iq['setResponse']['error']['text'] = "Timeout."; - iq.send(block=False); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "error" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OtherError" + iq['setResponse'].add_node(nodeId) + iq['setResponse']['error']['var'] = "Output" + iq['setResponse']['error']['text'] = "Timeout." + iq.send(block=False) ## TODO - should we send one timeout per node?? # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] def _all_nodes_done(self, session): - """ + """ Checks wheter all devices are done replying to the control command. - + Arguments: session -- The request session id """ for n in self.sessions[session]["nodeDone"]: if not self.sessions[session]["nodeDone"][n]: - return False; - return True; + return False + return True def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None): - """ - Callback function called by the devices when the control command is + """ + Callback function called by the devices when the control command is complete or failed. - If needed, composes a message with the result and sends it back to the + If needed, composes a message with the result and sends it back to the client. - + Arguments: session -- The request session id nodeId -- The device id which initiated the callback result -- The current result status of the control command. Valid values are: "error" - Set fields failed. "ok" - All fields were set. - error_field -- [optional] Only applies when result == "error" + error_field -- [optional] Only applies when result == "error" The field name that failed (usually means it is missing) error_msg -- [optional] Only applies when result == "error". Error details when a request failed. @@ -428,62 +425,62 @@ class XEP_0325(BasePlugin): return if result == "error": - self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["commTimers"][nodeId].cancel() if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "error"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OtherError"; - iq['setResponse'].add_node(nodeId); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "error" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OtherError" + iq['setResponse'].add_node(nodeId) if error_field is not None: - iq['setResponse'].add_data(error_field); - iq['setResponse']['error']['var'] = error_field; - iq['setResponse']['error']['text'] = error_msg; - iq.send(block=False); + iq['setResponse'].add_data(error_field) + iq['setResponse']['error']['var'] = error_field + iq['setResponse']['error']['text'] = error_msg + iq.send(block=False) # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] else: - self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["commTimers"][nodeId].cancel() - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "result"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OK"; - iq.send(block=False); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "result" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OK" + iq.send(block=False) # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] # ================================================================= # Client side (data controller) API def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None): - """ + """ Called on the client side to initiade a control request. Composes a message with the request and sends it to the device(s). - Does not block, the callback will be called when the device(s) + Does not block, the callback will be called when the device(s) has responded. - + Arguments: from_jid -- The jid of the requester to_jid -- The jid of the device(s) - callback -- The callback function to call when data is availble. - + callback -- The callback function to call when data is availble. + The callback function must support the following arguments: from_jid -- The jid of the responding device(s) @@ -494,46 +491,46 @@ class XEP_0325(BasePlugin): "Locked" - Field(s) is locked and cannot be changed at the moment. "NotImplemented" - Request feature not implemented. - "FormError" - Error while setting with + "FormError" - Error while setting with a form (not implemented). - "OtherError" - Indicates other types of - errors, such as timeout. + "OtherError" - Indicates other types of + errors, such as timeout. Details in the error_msg. - - nodeId -- [optional] Only applicable when result == "error" - List of node Ids of failing device(s). - fields -- [optional] Only applicable when result == "error" + nodeId -- [optional] Only applicable when result == "error" + List of node Ids of failing device(s). + + fields -- [optional] Only applicable when result == "error" List of fields that failed.[optional] Mandatory when result == "rejected" or "failure". - - error_msg -- Details about why the request failed. + + error_msg -- Details about why the request failed. fields -- Fields to set. List of tuple format: (name, typename, value). nodeIds -- [optional] Limits the request to the node Ids in this list. """ - iq = self.xmpp.Iq(); - iq['from'] = from_jid; - iq['to'] = to_jid; - seqnr = self._get_new_seqnr(); - iq['id'] = seqnr; - iq['type'] = "set"; + iq = self.xmpp.Iq() + iq['from'] = from_jid + iq['to'] = to_jid + seqnr = self._get_new_seqnr() + iq['id'] = seqnr + iq['type'] = "set" if nodeIds is not None: for nodeId in nodeIds: - iq['set'].add_node(nodeId); + iq['set'].add_node(nodeId) if fields is not None: for name, typename, value in fields: - iq['set'].add_data(name=name, typename=typename, value=value); + iq['set'].add_data(name=name, typename=typename, value=value) - self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback}; - iq.send(block=False); + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback} + iq.send(block=False) def set_command(self, from_jid, to_jid, fields, nodeIds=None): - """ + """ Called on the client side to initiade a control command. Composes a message with the set commandand sends it to the device(s). Does not block. Device(s) will not respond, regardless of result. - + Arguments: from_jid -- The jid of the requester to_jid -- The jid of the device(s) @@ -541,34 +538,32 @@ class XEP_0325(BasePlugin): fields -- Fields to set. List of tuple format: (name, typename, value). nodeIds -- [optional] Limits the request to the node Ids in this list. """ - msg = self.xmpp.Message(); - msg['from'] = from_jid; - msg['to'] = to_jid; - msg['type'] = "set"; + msg = self.xmpp.Message() + msg['from'] = from_jid + msg['to'] = to_jid + msg['type'] = "set" if nodeIds is not None: for nodeId in nodeIds: - msg['set'].add_node(nodeId); + msg['set'].add_node(nodeId) if fields is not None: for name, typename, value in fields: - msg['set'].add_data(name, typename, value); + msg['set'].add_data(name, typename, value) # We won't get any reply, so don't create a session - msg.send(); + msg.send() def _handle_set_response(self, iq): """ Received response from device(s) """ #print("ooh") - seqnr = iq['id']; - from_jid = str(iq['from']); - result = iq['setResponse']['responseCode']; - nodeIds = [n['name'] for n in iq['setResponse']['nodes']]; - fields = [f['name'] for f in iq['setResponse']['datas']]; - error_msg = None; + seqnr = iq['id'] + from_jid = str(iq['from']) + result = iq['setResponse']['responseCode'] + nodeIds = [n['name'] for n in iq['setResponse']['nodes']] + fields = [f['name'] for f in iq['setResponse']['datas']] + error_msg = None if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "": - error_msg = iq['setResponse']['error']['text']; - - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg); + error_msg = iq['setResponse']['error']['text'] - + callback = self.sessions[seqnr]["callback"] + callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg) |