summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0325/control.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/plugins/xep_0325/control.py')
-rw-r--r--sleekxmpp/plugins/xep_0325/control.py377
1 files changed, 191 insertions, 186 deletions
diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py
index 11e7a045..e34eb2c2 100644
--- a/sleekxmpp/plugins/xep_0325/control.py
+++ b/sleekxmpp/plugins/xep_0325/control.py
@@ -12,6 +12,7 @@ import logging
import time
from threading import Thread, Timer, Lock
+from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.plugins.base import BasePlugin
@@ -25,16 +26,16 @@ log = logging.getLogger(__name__)
class XEP_0325(BasePlugin):
"""
- XEP-0325: IoT Control
+ XEP-0325: IoT Control
- Actuators are devices in sensor networks that can be controlled through
- the network and act with the outside world. In sensor networks and
- Internet of Things applications, actuators make it possible to automate
- real-world processes.
- This plugin implements a mechanism whereby actuators can be controlled
- in XMPP-based sensor networks, making it possible to integrate sensors
- and actuators of different brands, makes and models into larger
+ Actuators are devices in sensor networks that can be controlled through
+ the network and act with the outside world. In sensor networks and
+ Internet of Things applications, actuators make it possible to automate
+ real-world processes.
+ This plugin implements a mechanism whereby actuators can be controlled
+ in XMPP-based sensor networks, making it possible to integrate sensors
+ and actuators of different brands, makes and models into larger
Internet of Things applications.
Also see <http://xmpp.org/extensions/xep-0325.html>
@@ -51,9 +52,9 @@ class XEP_0325(BasePlugin):
Client side
-----------
- Control Event:SetResponse -- Received a response to a
+ Control Event:SetResponse -- Received a response to a
control request, type result
- Control Event:SetResponseError -- Received a response to a
+ Control Event:SetResponseError -- Received a response to a
control request, type error
Attributes:
@@ -64,7 +65,7 @@ class XEP_0325(BasePlugin):
relevant to a request's session. This dictionary is used
both by the client and sensor side. On client side, seqnr
is used as key, while on sensor side, a session_id is used
- as key. This ensures that the two will not collide, so
+ as key. This ensures that the two will not collide, so
one instance can be both client and sensor.
Sensor side
-----------
@@ -84,15 +85,15 @@ class XEP_0325(BasePlugin):
Sensor side
-----------
- register_node -- Register a sensor as available from this XMPP
+ register_node -- Register a sensor as available from this XMPP
instance.
Client side
-----------
- set_request -- Initiates a control request to modify data in
+ set_request -- Initiates a control request to modify data in
sensor(s). Non-blocking, a callback function will
be called when the sensor has responded.
- set_command -- Initiates a control command to modify data in
+ set_command -- Initiates a control command to modify data in
sensor(s). Non-blocking. The sensor(s) will not
respond regardless of the result of the command,
so no callback is made.
@@ -101,7 +102,7 @@ class XEP_0325(BasePlugin):
name = 'xep_0325'
description = 'XEP-0325 Internet of Things - Control'
- dependencies = set(['xep_0030'])
+ dependencies = set(['xep_0030'])
stanza = stanza
@@ -134,11 +135,11 @@ class XEP_0325(BasePlugin):
self._handle_set_response))
# Server side dicts
- self.nodes = {}
- self.sessions = {}
+ self.nodes = {};
+ self.sessions = {};
- self.last_seqnr = 0
- self.seqnr_lock = Lock()
+ self.last_seqnr = 0;
+ self.seqnr_lock = Lock();
## For testning only
self.test_authenticated_from = ""
@@ -155,13 +156,13 @@ class XEP_0325(BasePlugin):
def plugin_end(self):
""" Stop the XEP-0325 plugin """
- self.sessions.clear()
+ self.sessions.clear();
self.xmpp.remove_handler('Control Event:DirectSet')
self.xmpp.remove_handler('Control Event:SetReq')
self.xmpp.remove_handler('Control Event:SetResponse')
self.xmpp.remove_handler('Control Event:SetResponseError')
self.xmpp['xep_0030'].del_feature(feature=Control.namespace)
- self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple())
+ self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple());
# =================================================================
@@ -169,10 +170,10 @@ class XEP_0325(BasePlugin):
def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
"""
- Register a sensor/device as available for control requests/commands
- through this XMPP instance.
+ Register a sensor/device as available for control requests/commands
+ through this XMPP instance.
- The device object may by any custom implementation to support
+ The device object may by any custom implementation to support
specific devices, but it must implement the functions:
has_control_field
set_control_fields
@@ -184,30 +185,30 @@ class XEP_0325(BasePlugin):
commTimeout -- Time in seconds to wait between each callback from device during
a data readout. Float.
sourceId -- [optional] identifying the data source controlling the device
- cacheType -- [optional] narrowing down the search to a specific kind of node
+ cacheType -- [optional] narrowing down the search to a specific kind of node
"""
- self.nodes[nodeId] = {"device": device,
+ self.nodes[nodeId] = {"device": device,
"commTimeout": commTimeout,
- "sourceId": sourceId,
- "cacheType": cacheType}
+ "sourceId": sourceId,
+ "cacheType": cacheType};
def _set_authenticated(self, auth=''):
""" Internal testing function """
- self.test_authenticated_from = auth
+ self.test_authenticated_from = auth;
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
- self.seqnr_lock.acquire()
- self.last_seqnr += 1
- self.seqnr_lock.release()
- return str(self.last_seqnr)
+ self.seqnr_lock.acquire();
+ self.last_seqnr = self.last_seqnr + 1;
+ self.seqnr_lock.release();
+ return str(self.last_seqnr);
def _handle_set_req(self, iq):
"""
- Event handler for reception of an Iq with set req - this is a
+ Event handler for reception of an Iq with set req - this is a
control request.
- Verifies that
+ Verifies that
- all the requested nodes are available
(if no nodes are specified in the request, assume all nodes)
- all the control fields are available from all requested nodes
@@ -215,79 +216,80 @@ class XEP_0325(BasePlugin):
If the request passes verification, the control request is passed
to the devices (in a separate thread).
- If the verification fails, a setResponse with error indication
+ If the verification fails, a setResponse with error indication
is sent.
"""
- error_msg = ''
- req_ok = True
- missing_node = None
- missing_field = None
+ error_msg = '';
+ req_ok = True;
+ missing_node = None;
+ missing_field = None;
# Authentication
if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
# Invalid authentication
- req_ok = False
- error_msg = "Access denied"
+ req_ok = False;
+ error_msg = "Access denied";
# Nodes
+ process_nodes = [];
if len(iq['set']['nodes']) > 0:
for n in iq['set']['nodes']:
if not n['nodeId'] in self.nodes:
- req_ok = False
- missing_node = n['nodeId']
- error_msg = "Invalid nodeId " + n['nodeId']
- process_nodes = [n['nodeId'] for n in iq['set']['nodes']]
+ req_ok = False;
+ missing_node = n['nodeId'];
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in iq['set']['nodes']];
else:
- process_nodes = self.nodes.keys()
+ process_nodes = self.nodes.keys();
# Fields - for control we need to find all in all devices, otherwise we reject
- process_fields = []
+ process_fields = [];
if len(iq['set']['datas']) > 0:
for f in iq['set']['datas']:
for node in self.nodes:
if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
- req_ok = False
- missing_field = f['name']
- error_msg = "Invalid field " + f['name']
- break
- process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']]
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']];
if req_ok:
- session = self._new_session()
- self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']}
- self.sessions[session]["commTimers"] = {}
- self.sessions[session]["nodeDone"] = {}
+ session = self._new_session();
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
# Flag that a reply is exected when we are done
- self.sessions[session]["reply"] = True
+ self.sessions[session]["reply"] = True;
- self.sessions[session]["node_list"] = process_nodes
+ self.sessions[session]["node_list"] = process_nodes;
if self.threaded:
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
- self._threaded_node_request(session, process_fields)
+ self._threaded_node_request(session, process_fields);
else:
- iq.reply()
- iq['type'] = 'error'
- iq['setResponse']['responseCode'] = "NotFound"
+ iq.reply();
+ iq['type'] = 'error';
+ iq['setResponse']['responseCode'] = "NotFound";
if missing_node is not None:
- iq['setResponse'].add_node(missing_node)
+ iq['setResponse'].add_node(missing_node);
if missing_field is not None:
- iq['setResponse'].add_data(missing_field)
- iq['setResponse']['error']['var'] = "Output"
- iq['setResponse']['error']['text'] = error_msg
- iq.send(block=False)
+ iq['setResponse'].add_data(missing_field);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
def _handle_direct_set(self, msg):
"""
- Event handler for reception of a Message with set command - this is a
+ Event handler for reception of a Message with set command - this is a
direct control command.
- Verifies that
+ Verifies that
- all the requested nodes are available
(if no nodes are specified in the request, assume all nodes)
- all the control fields are available from all requested nodes
@@ -297,72 +299,73 @@ class XEP_0325(BasePlugin):
to the devices (in a separate thread).
If the verification fails, do nothing.
"""
- req_ok = True
+ req_ok = True;
# Nodes
+ process_nodes = [];
if len(msg['set']['nodes']) > 0:
for n in msg['set']['nodes']:
if not n['nodeId'] in self.nodes:
- req_ok = False
- error_msg = "Invalid nodeId " + n['nodeId']
- process_nodes = [n['nodeId'] for n in msg['set']['nodes']]
+ req_ok = False;
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in msg['set']['nodes']];
else:
- process_nodes = self.nodes.keys()
+ process_nodes = self.nodes.keys();
# Fields - for control we need to find all in all devices, otherwise we reject
- process_fields = []
+ process_fields = [];
if len(msg['set']['datas']) > 0:
for f in msg['set']['datas']:
for node in self.nodes:
if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
- req_ok = False
- missing_field = f['name']
- error_msg = "Invalid field " + f['name']
- break
- process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']]
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']];
if req_ok:
- session = self._new_session()
- self.sessions[session] = {"from": msg['from'], "to": msg['to']}
- self.sessions[session]["commTimers"] = {}
- self.sessions[session]["nodeDone"] = {}
- self.sessions[session]["reply"] = False
+ session = self._new_session();
+ self.sessions[session] = {"from": msg['from'], "to": msg['to']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+ self.sessions[session]["reply"] = False;
- self.sessions[session]["node_list"] = process_nodes
+ self.sessions[session]["node_list"] = process_nodes;
if self.threaded:
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
- self._threaded_node_request(session, process_fields)
+ self._threaded_node_request(session, process_fields);
def _threaded_node_request(self, session, process_fields):
- """
+ """
Helper function to handle the device control in a separate thread.
-
+
Arguments:
session -- The request session id
process_fields -- The fields to set in the devices. List of tuple format:
(name, datatype, value)
"""
for node in self.sessions[session]["node_list"]:
- self.sessions[session]["nodeDone"][node] = False
+ self.sessions[session]["nodeDone"][node] = False;
for node in self.sessions[session]["node_list"]:
- timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node))
- self.sessions[session]["commTimers"][node] = timer
- timer.start()
- self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback)
+ timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
+ self.sessions[session]["commTimers"][node] = timer;
+ timer.start();
+ self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback);
def _event_comm_timeout(self, session, nodeId):
- """
+ """
Triggered if any of the control operations timeout.
Stop communicating with the failing device.
- If the control command was an Iq request, sends a failure
- message back to the client.
-
+ If the control command was an Iq request, sends a failure
+ message back to the client.
+
Arguments:
session -- The request session id
nodeId -- The id of the device which timed out
@@ -370,51 +373,51 @@ class XEP_0325(BasePlugin):
if self.sessions[session]["reply"]:
# Reply is exected when we are done
- iq = self.xmpp.Iq()
- iq['from'] = self.sessions[session]['to']
- iq['to'] = self.sessions[session]['from']
- iq['type'] = "error"
- iq['id'] = self.sessions[session]['seqnr']
- iq['setResponse']['responseCode'] = "OtherError"
- iq['setResponse'].add_node(nodeId)
- iq['setResponse']['error']['var'] = "Output"
- iq['setResponse']['error']['text'] = "Timeout."
- iq.send(block=False)
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = "Timeout.";
+ iq.send(block=False);
## TODO - should we send one timeout per node??
# Drop communication with this device and check if we are done
- self.sessions[session]["nodeDone"][nodeId] = True
+ self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
# The session is complete, delete it
- del self.sessions[session]
+ del self.sessions[session];
def _all_nodes_done(self, session):
- """
+ """
Checks wheter all devices are done replying to the control command.
-
+
Arguments:
session -- The request session id
"""
for n in self.sessions[session]["nodeDone"]:
if not self.sessions[session]["nodeDone"][n]:
- return False
- return True
+ return False;
+ return True;
def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None):
- """
- Callback function called by the devices when the control command is
+ """
+ Callback function called by the devices when the control command is
complete or failed.
- If needed, composes a message with the result and sends it back to the
+ If needed, composes a message with the result and sends it back to the
client.
-
+
Arguments:
session -- The request session id
nodeId -- The device id which initiated the callback
result -- The current result status of the control command. Valid values are:
"error" - Set fields failed.
"ok" - All fields were set.
- error_field -- [optional] Only applies when result == "error"
+ error_field -- [optional] Only applies when result == "error"
The field name that failed (usually means it is missing)
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
@@ -425,62 +428,62 @@ class XEP_0325(BasePlugin):
return
if result == "error":
- self.sessions[session]["commTimers"][nodeId].cancel()
+ self.sessions[session]["commTimers"][nodeId].cancel();
if self.sessions[session]["reply"]:
# Reply is exected when we are done
- iq = self.xmpp.Iq()
- iq['from'] = self.sessions[session]['to']
- iq['to'] = self.sessions[session]['from']
- iq['type'] = "error"
- iq['id'] = self.sessions[session]['seqnr']
- iq['setResponse']['responseCode'] = "OtherError"
- iq['setResponse'].add_node(nodeId)
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
if error_field is not None:
- iq['setResponse'].add_data(error_field)
- iq['setResponse']['error']['var'] = error_field
- iq['setResponse']['error']['text'] = error_msg
- iq.send(block=False)
+ iq['setResponse'].add_data(error_field);
+ iq['setResponse']['error']['var'] = error_field;
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
# Drop communication with this device and check if we are done
- self.sessions[session]["nodeDone"][nodeId] = True
+ self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
# The session is complete, delete it
- del self.sessions[session]
+ del self.sessions[session];
else:
- self.sessions[session]["commTimers"][nodeId].cancel()
+ self.sessions[session]["commTimers"][nodeId].cancel();
- self.sessions[session]["nodeDone"][nodeId] = True
+ self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
if self.sessions[session]["reply"]:
# Reply is exected when we are done
- iq = self.xmpp.Iq()
- iq['from'] = self.sessions[session]['to']
- iq['to'] = self.sessions[session]['from']
- iq['type'] = "result"
- iq['id'] = self.sessions[session]['seqnr']
- iq['setResponse']['responseCode'] = "OK"
- iq.send(block=False)
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "result";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OK";
+ iq.send(block=False);
# The session is complete, delete it
- del self.sessions[session]
+ del self.sessions[session];
# =================================================================
# Client side (data controller) API
def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None):
- """
+ """
Called on the client side to initiade a control request.
Composes a message with the request and sends it to the device(s).
- Does not block, the callback will be called when the device(s)
+ Does not block, the callback will be called when the device(s)
has responded.
-
+
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
- callback -- The callback function to call when data is availble.
-
+ callback -- The callback function to call when data is availble.
+
The callback function must support the following arguments:
from_jid -- The jid of the responding device(s)
@@ -491,46 +494,46 @@ class XEP_0325(BasePlugin):
"Locked" - Field(s) is locked and cannot
be changed at the moment.
"NotImplemented" - Request feature not implemented.
- "FormError" - Error while setting with
+ "FormError" - Error while setting with
a form (not implemented).
- "OtherError" - Indicates other types of
- errors, such as timeout.
+ "OtherError" - Indicates other types of
+ errors, such as timeout.
Details in the error_msg.
+
+ nodeId -- [optional] Only applicable when result == "error"
+ List of node Ids of failing device(s).
- nodeId -- [optional] Only applicable when result == "error"
- List of node Ids of failing device(s).
-
- fields -- [optional] Only applicable when result == "error"
+ fields -- [optional] Only applicable when result == "error"
List of fields that failed.[optional] Mandatory when result == "rejected" or "failure".
-
- error_msg -- Details about why the request failed.
+
+ error_msg -- Details about why the request failed.
fields -- Fields to set. List of tuple format: (name, typename, value).
nodeIds -- [optional] Limits the request to the node Ids in this list.
"""
- iq = self.xmpp.Iq()
- iq['from'] = from_jid
- iq['to'] = to_jid
- seqnr = self._get_new_seqnr()
- iq['id'] = seqnr
- iq['type'] = "set"
+ iq = self.xmpp.Iq();
+ iq['from'] = from_jid;
+ iq['to'] = to_jid;
+ seqnr = self._get_new_seqnr();
+ iq['id'] = seqnr;
+ iq['type'] = "set";
if nodeIds is not None:
for nodeId in nodeIds:
- iq['set'].add_node(nodeId)
+ iq['set'].add_node(nodeId);
if fields is not None:
for name, typename, value in fields:
- iq['set'].add_data(name=name, typename=typename, value=value)
+ iq['set'].add_data(name=name, typename=typename, value=value);
- self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback}
- iq.send(block=False)
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback};
+ iq.send(block=False);
def set_command(self, from_jid, to_jid, fields, nodeIds=None):
- """
+ """
Called on the client side to initiade a control command.
Composes a message with the set commandand sends it to the device(s).
Does not block. Device(s) will not respond, regardless of result.
-
+
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
@@ -538,32 +541,34 @@ class XEP_0325(BasePlugin):
fields -- Fields to set. List of tuple format: (name, typename, value).
nodeIds -- [optional] Limits the request to the node Ids in this list.
"""
- msg = self.xmpp.Message()
- msg['from'] = from_jid
- msg['to'] = to_jid
- msg['type'] = "set"
+ msg = self.xmpp.Message();
+ msg['from'] = from_jid;
+ msg['to'] = to_jid;
+ msg['type'] = "set";
if nodeIds is not None:
for nodeId in nodeIds:
- msg['set'].add_node(nodeId)
+ msg['set'].add_node(nodeId);
if fields is not None:
for name, typename, value in fields:
- msg['set'].add_data(name, typename, value)
+ msg['set'].add_data(name, typename, value);
# We won't get any reply, so don't create a session
- msg.send()
+ msg.send();
def _handle_set_response(self, iq):
""" Received response from device(s) """
#print("ooh")
- seqnr = iq['id']
- from_jid = str(iq['from'])
- result = iq['setResponse']['responseCode']
- nodeIds = [n['name'] for n in iq['setResponse']['nodes']]
- fields = [f['name'] for f in iq['setResponse']['datas']]
- error_msg = None
+ seqnr = iq['id'];
+ from_jid = str(iq['from']);
+ result = iq['setResponse']['responseCode'];
+ nodeIds = [n['name'] for n in iq['setResponse']['nodes']];
+ fields = [f['name'] for f in iq['setResponse']['datas']];
+ error_msg = None;
if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "":
- error_msg = iq['setResponse']['error']['text']
+ error_msg = iq['setResponse']['error']['text'];
+
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg);
- callback = self.sessions[seqnr]["callback"]
- callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg)
+