summaryrefslogtreecommitdiff
path: root/sleekxmpp
diff options
context:
space:
mode:
authorJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
committerJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
commit45689fd8799186fd6be0b308745aef428ab50dcc (patch)
tree0dd4e5f41684ad8f4b31920f1b8b9d3e27fd1527 /sleekxmpp
parentb7adaafb3ecb0a615c93fbb1830e66357b081fe3 (diff)
downloadslixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.gz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.bz2
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.xz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.zip
First implementation of the xep_0323 and xep_325 used in IoT systems. Tests are added for stanza and streams
Diffstat (limited to 'sleekxmpp')
-rw-r--r--sleekxmpp/plugins/__init__.py3
-rw-r--r--sleekxmpp/plugins/xep_0323/__init__.py2
-rw-r--r--sleekxmpp/plugins/xep_0323/device.py243
-rw-r--r--sleekxmpp/plugins/xep_0323/sensordata.py699
-rw-r--r--sleekxmpp/plugins/xep_0323/stanza/__init__.py2
-rw-r--r--sleekxmpp/plugins/xep_0323/stanza/base.py5
-rw-r--r--sleekxmpp/plugins/xep_0323/stanza/sensordata.py796
-rw-r--r--sleekxmpp/plugins/xep_0323/timerreset.py64
-rw-r--r--sleekxmpp/plugins/xep_0325/__init__.py18
-rw-r--r--sleekxmpp/plugins/xep_0325/control.py574
-rw-r--r--sleekxmpp/plugins/xep_0325/device.py125
-rw-r--r--sleekxmpp/plugins/xep_0325/stanza/__init__.py12
-rw-r--r--sleekxmpp/plugins/xep_0325/stanza/base.py13
-rw-r--r--sleekxmpp/plugins/xep_0325/stanza/control.py526
14 files changed, 3020 insertions, 62 deletions
diff --git a/sleekxmpp/plugins/__init__.py b/sleekxmpp/plugins/__init__.py
index 10a79439..bdd06d50 100644
--- a/sleekxmpp/plugins/__init__.py
+++ b/sleekxmpp/plugins/__init__.py
@@ -79,5 +79,6 @@ __all__ = [
'xep_0302', # XMPP Compliance Suites 2012
'xep_0308', # Last Message Correction
'xep_0313', # Message Archive Management
- 'xep_0323', # IoT Sensor Data
+ 'xep_0323', # IoT Systems Sensor Data
+ 'xep_0325', # IoT Systems Control
]
diff --git a/sleekxmpp/plugins/xep_0323/__init__.py b/sleekxmpp/plugins/xep_0323/__init__.py
index e4e2d0ee..10779ada 100644
--- a/sleekxmpp/plugins/xep_0323/__init__.py
+++ b/sleekxmpp/plugins/xep_0323/__init__.py
@@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
diff --git a/sleekxmpp/plugins/xep_0323/device.py b/sleekxmpp/plugins/xep_0323/device.py
new file mode 100644
index 00000000..8414f5b4
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0323/device.py
@@ -0,0 +1,243 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+import datetime
+
+class Device(object):
+ """
+ Example implementation of a device readout object.
+ Is registered in the XEP_0323.register_node call
+ The device object may be any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_field
+ request_fields
+ """
+
+ def __init__(self, nodeId):
+ self.nodeId = nodeId;
+ self.fields = {};
+ # {'type':'numeric',
+ # 'name':'myname',
+ # 'value': 42,
+ # 'unit':'Z'}];
+ self.timestamp_data = {};
+ self.momentary_data = {};
+ self.momentary_timestamp = "";
+
+ def has_field(self, field):
+ """
+ Returns true if the supplied field name exists in this device.
+
+ Arguments:
+ field -- The field name
+ """
+ if field in self.fields:
+ return True;
+ return False;
+
+ def request_fields(self, fields, flags, session, callback):
+ """
+ Starts a data readout. Verifies the requested fields,
+ refreshes the data (if needed) and calls the callback
+ with requested data.
+
+
+ Arguments:
+ fields -- List of field names to readout
+ flags -- [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ session -- Session id, only used in the callback as identifier
+ callback -- Callback function to call when data is available.
+
+ The callback function must support the following arguments:
+
+ session -- Session id, as supplied in the request_fields call
+ nodeId -- Identifier for this device
+ result -- The current result status of the readout. Valid values are:
+ "error" - Readout failed.
+ "fields" - Contains readout data.
+ "done" - Indicates that the readout is complete. May contain
+ readout data.
+ timestamp_block -- [optional] Only applies when result != "error"
+ The readout data. Structured as a dictionary:
+ {
+ timestamp: timestamp for this datablock,
+ fields: list of field dictionary (one per readout field).
+ readout field dictionary format:
+ {
+ type: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
+ name: The field name
+ value: The field value
+ unit: The unit of the field. Only applies to type numeric.
+ dataType: The datatype of the field. Only applies to type enum.
+ flags: [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ }
+ }
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+
+ """
+
+ if len(fields) > 0:
+ # Check availiability
+ for f in fields:
+ if f not in self.fields:
+ self._send_reject(session, callback)
+ return False;
+ else:
+ # Request all fields
+ fields = self.fields;
+
+
+ # Refresh data from device
+ # ...
+
+ if "momentary" in flags and flags['momentary'] == "true" or \
+ "all" in flags and flags['all'] == "true":
+ ts_block = {};
+ timestamp = "";
+
+ if len(self.momentary_timestamp) > 0:
+ timestamp = self.momentary_timestamp;
+ else:
+ timestamp = self._get_timestamp();
+
+ field_block = [];
+ for f in self.momentary_data:
+ if f in fields:
+ field_block.append({"name": f,
+ "type": self.fields[f]["type"],
+ "unit": self.fields[f]["unit"],
+ "dataType": self.fields[f]["dataType"],
+ "value": self.momentary_data[f]["value"],
+ "flags": self.momentary_data[f]["flags"]});
+ ts_block["timestamp"] = timestamp;
+ ts_block["fields"] = field_block;
+
+ callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block);
+ return
+
+ from_flag = self._datetime_flag_parser(flags, 'from')
+ to_flag = self._datetime_flag_parser(flags, 'to')
+
+ for ts in sorted(self.timestamp_data.keys()):
+ tsdt = datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
+ if not from_flag is None:
+ if tsdt < from_flag:
+ #print (str(tsdt) + " < " + str(from_flag))
+ continue
+ if not to_flag is None:
+ if tsdt > to_flag:
+ #print (str(tsdt) + " > " + str(to_flag))
+ continue
+
+ ts_block = {};
+ field_block = [];
+
+ for f in self.timestamp_data[ts]:
+ if f in fields:
+ field_block.append({"name": f,
+ "type": self.fields[f]["type"],
+ "unit": self.fields[f]["unit"],
+ "dataType": self.fields[f]["dataType"],
+ "value": self.timestamp_data[ts][f]["value"],
+ "flags": self.timestamp_data[ts][f]["flags"]});
+
+ ts_block["timestamp"] = ts;
+ ts_block["fields"] = field_block;
+ callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block);
+ callback(session, result="done", nodeId=self.nodeId, timestamp_block=None);
+
+ def _datetime_flag_parser(self, flags, flagname):
+ if not flagname in flags:
+ return None
+
+ dt = None
+ try:
+ dt = datetime.datetime.strptime(flags[flagname], "%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ # Badly formatted datetime, ignore it
+ pass
+ return dt
+
+
+ def _get_timestamp(self):
+ """
+ Generates a properly formatted timestamp of current time
+ """
+ return datetime.datetime.now().replace(microsecond=0).isoformat()
+
+ def _send_reject(self, session, callback):
+ """
+ Sends a reject to the caller
+
+ Arguments:
+ session -- Session id, see definition in request_fields function
+ callback -- Callback function, see definition in request_fields function
+ """
+ callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject");
+
+ def _add_field(self, name, typename, unit=None, dataType=None):
+ """
+ Adds a field to the device
+
+ Arguments:
+ name -- Name of the field
+ typename -- Type of the field (numeric, boolean, dateTime, timeSpan, string, enum)
+ unit -- [optional] only applies to "numeric". Unit for the field.
+ dataType -- [optional] only applies to "enum". Datatype for the field.
+ """
+ self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType};
+
+ def _add_field_timestamp_data(self, name, timestamp, value, flags=None):
+ """
+ Adds timestamped data to a field
+
+ Arguments:
+ name -- Name of the field
+ timestamp -- Timestamp for the data (string)
+ value -- Field value at the timestamp
+ flags -- [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ """
+ if not name in self.fields:
+ return False;
+ if not timestamp in self.timestamp_data:
+ self.timestamp_data[timestamp] = {};
+
+ self.timestamp_data[timestamp][name] = {"value": value, "flags": flags};
+ return True;
+
+ def _add_field_momentary_data(self, name, value, flags=None):
+ """
+ Sets momentary data to a field
+
+ Arguments:
+ name -- Name of the field
+ value -- Field value at the timestamp
+ flags -- [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ """
+ if self.fields[name] is None:
+ return False;
+ if flags is None:
+ flags = {};
+
+ flags["momentary"] = "true"
+ self.momentary_data[name] = {"value": value, "flags": flags};
+ return True;
+
+ def _set_momentary_timestamp(self, timestamp):
+ """
+ This function is only for unit testing to produce predictable results.
+ """
+ self.momentary_timestamp = timestamp;
+
diff --git a/sleekxmpp/plugins/xep_0323/sensordata.py b/sleekxmpp/plugins/xep_0323/sensordata.py
index 1343069d..ff671663 100644
--- a/sleekxmpp/plugins/xep_0323/sensordata.py
+++ b/sleekxmpp/plugins/xep_0323/sensordata.py
@@ -2,19 +2,25 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
+import time
+import datetime
+from threading import Thread, Lock, Timer
+
+from sleekxmpp.plugins.xep_0323.timerreset import TimerReset
from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.plugins.base import BasePlugin
from sleekxmpp.plugins.xep_0323 import stanza
+from sleekxmpp.plugins.xep_0323.stanza import Sensordata
log = logging.getLogger(__name__)
@@ -23,43 +29,690 @@ log = logging.getLogger(__name__)
class XEP_0323(BasePlugin):
"""
- XEP-0323 IoT Sensor Data
+ XEP-0323: IoT Sensor Data
+
+
+ This XEP provides the underlying architecture, basic operations and data
+ structures for sensor data communication over XMPP networks. It includes
+ a hardware abstraction model, removing any technical detail implemented
+ in underlying technologies.
+
+ Also see <http://xmpp.org/extensions/xep-0323.html>
+
+ Configuration Values:
+ threaded -- Indicates if communication with sensors should be threaded.
+ Defaults to True.
+
+ Events:
+ Sensor side
+ -----------
+ Sensordata Event:Req -- Received a request for data
+ Sensordata Event:Cancel -- Received a cancellation for a request
+
+ Client side
+ -----------
+ Sensordata Event:Accepted -- Received a accept from sensor for a request
+ Sensordata Event:Rejected -- Received a reject from sensor for a request
+ Sensordata Event:Cancelled -- Received a cancel confirm from sensor
+ Sensordata Event:Fields -- Received fields from sensor for a request
+ This may be triggered multiple times since
+ the sensor can split up its response in
+ multiple messages.
+ Sensordata Event:Failure -- Received a failure indication from sensor
+ for a request. Typically a comm timeout.
+
+ Attributes:
+ threaded -- Indicates if command events should be threaded.
+ Defaults to True.
+ sessions -- A dictionary or equivalent backend mapping
+ session IDs to dictionaries containing data
+ relevant to a request's session. This dictionary is used
+ both by the client and sensor side. On client side, seqnr
+ is used as key, while on sensor side, a session_id is used
+ as key. This ensures that the two will not collide, so
+ one instance can be both client and sensor.
+ Sensor side
+ -----------
+ nodes -- A dictionary mapping sensor nodes that are serviced through
+ this XMPP instance to their device handlers ("drivers").
+ Client side
+ -----------
+ last_seqnr -- The last used sequence number (integer). One sequence of
+ communication (e.g. -->request, <--accept, <--fields)
+ between client and sensor is identified by a unique
+ sequence number (unique between the client/sensor pair)
+
+ Methods:
+ plugin_init -- Overrides base_plugin.plugin_init
+ post_init -- Overrides base_plugin.post_init
+ plugin_end -- Overrides base_plugin.plugin_end
+
+ Sensor side
+ -----------
+ register_node -- Register a sensor as available from this XMPP
+ instance.
+
+ Client side
+ -----------
+ request_data -- Initiates a request for data from one or more
+ sensors. Non-blocking, a callback function will
+ be called when data is available.
+
"""
name = 'xep_0323'
description = 'XEP-0323 Internet of Things - Sensor Data'
- dependencies = set(['xep_0030']) # set(['xep_0030', 'xep_0004', 'xep_0082', 'xep_0131'])
+ dependencies = set(['xep_0030'])
stanza = stanza
+
+ default_config = {
+ 'threaded': True
+# 'session_db': None
+ }
+
def plugin_init(self):
- pass
- # self.node_event_map = {}
+ """ Start the XEP-0323 plugin """
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Req',
+ StanzaPath('iq@type=get/req'),
+ self._handle_event_req))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Accepted',
+ StanzaPath('iq@type=result/accepted'),
+ self._handle_event_accepted))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Rejected',
+ StanzaPath('iq@type=error/rejected'),
+ self._handle_event_rejected))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Cancel',
+ StanzaPath('iq@type=get/cancel'),
+ self._handle_event_cancel))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Cancelled',
+ StanzaPath('iq@type=result/cancelled'),
+ self._handle_event_cancelled))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Fields',
+ StanzaPath('message/fields'),
+ self._handle_event_fields))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Failure',
+ StanzaPath('message/failure'),
+ self._handle_event_failure))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Started',
+ StanzaPath('message/started'),
+ self._handle_event_started))
+
+ # Server side dicts
+ self.nodes = {};
+ self.sessions = {};
+
+ self.last_seqnr = 0;
+ self.seqnr_lock = Lock();
+
+ ## For testning only
+ self.test_authenticated_from = ""
+
+ def post_init(self):
+ """ Init complete. Register our features in Serivce discovery. """
+ BasePlugin.post_init(self)
+ self.xmpp['xep_0030'].add_feature(Sensordata.namespace)
+ self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple())
- # self.xmpp.register_handler(
- # Callback('Sensordata Event: Get',
- # StanzaPath('message/sensordata_event/get'),
- # self._handle_event_get))
+ def _new_session(self):
+ """ Return a new session ID. """
+ return str(time.time()) + '-' + self.xmpp.new_id()
def plugin_end(self):
- # self.xmpp.remove_handler('Sensordata Event: Get')
- pass
-
- def get_value(self, jid, msg):
+ """ Stop the XEP-0323 plugin """
+ self.sessions.clear();
+ self.xmpp.remove_handler('Sensordata Event:Req')
+ self.xmpp.remove_handler('Sensordata Event:Accepted')
+ self.xmpp.remove_handler('Sensordata Event:Rejected')
+ self.xmpp.remove_handler('Sensordata Event:Cancel')
+ self.xmpp.remove_handler('Sensordata Event:Cancelled')
+ self.xmpp.remove_handler('Sensordata Event:Fields')
+ self.xmpp['xep_0030'].del_feature(feature=Sensordata.namespace)
+ self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple());
+
+
+ # =================================================================
+ # Sensor side (data provider) API
+
+ def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
+ """
+ Register a sensor/device as available for serving of data through this XMPP
+ instance.
+
+ The device object may by any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_field
+ request_fields
+ according to the interfaces shown in the example device.py file.
+
+ Arguments:
+ nodeId -- The identifier for the device
+ device -- The device object
+ commTimeout -- Time in seconds to wait between each callback from device during
+ a data readout. Float.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ self.nodes[nodeId] = {"device": device,
+ "commTimeout": commTimeout,
+ "sourceId": sourceId,
+ "cacheType": cacheType};
+
+ def _set_authenticated(self, auth=''):
+ """ Internal testing function """
+ self.test_authenticated_from = auth;
+
+
+ def _handle_event_req(self, iq):
+ """
+ Event handler for reception of an Iq with req - this is a request.
+
+ Verifies that
+ - all the requested nodes are available
+ - at least one of the requested fields is available from at least
+ one of the nodes
+
+ If the request passes verification, an accept response is sent, and
+ the readout process is started in a separate thread.
+ If the verification fails, a reject message is sent.
+ """
+
+ seqnr = iq['req']['seqnr'];
+ error_msg = '';
+ req_ok = True;
+
+ # Authentication
+ if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
+ # Invalid authentication
+ req_ok = False;
+ error_msg = "Access denied";
+
+ # Nodes
+ process_nodes = [];
+ if len(iq['req']['nodes']) > 0:
+ for n in iq['req']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in iq['req']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - if we just find one we are happy, otherwise we reject
+ process_fields = [];
+ if len(iq['req']['fields']) > 0:
+ found = False
+ for f in iq['req']['fields']:
+ for node in self.nodes:
+ if self.nodes[node]["device"].has_field(f['name']):
+ found = True;
+ break;
+ if not found:
+ req_ok = False;
+ error_msg = "Invalid field " + f['name'];
+ process_fields = [f['name'] for n in iq['req']['fields']];
+
+ req_flags = iq['req']._get_flags();
+
+ request_delay_sec = None
+ if 'when' in req_flags:
+ # Timed request - requires datetime string in iso format
+ # ex. 2013-04-05T15:00:03
+ dt = None
+ try:
+ dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ req_ok = False;
+ error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)."
+
+ if not dt is None:
+ # Datetime properly formatted
+ dtnow = datetime.datetime.now()
+ dtdiff = dt - dtnow
+ request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600
+ if request_delay_sec <= 0:
+ req_ok = False;
+ error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat();
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+
+ #print("added session: " + str(self.sessions))
+
+ iq.reply();
+ iq['accepted']['seqnr'] = seqnr;
+ if not request_delay_sec is None:
+ iq['accepted']['queued'] = "true"
+ iq.send(block=False);
+
+ self.sessions[session]["node_list"] = process_nodes;
+
+ if not request_delay_sec is None:
+ # Delay request to requested time
+ timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags))
+ self.sessions[session]["commTimers"]["delaytimer"] = timer;
+ timer.start();
+ return
+
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields, req_flags);
+
+ else:
+ iq.reply();
+ iq['type'] = 'error';
+ iq['rejected']['seqnr'] = seqnr;
+ iq['rejected']['error'] = error_msg;
+ iq.send(block=False);
+
+ def _threaded_node_request(self, session, process_fields, flags):
+ """
+ Helper function to handle the device readouts in a separate thread.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to request from the devices
+ flags -- [optional] flags to pass to the devices, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ """
+ for node in self.sessions[session]["node_list"]:
+ self.sessions[session]["nodeDone"][node] = False;
+
+ for node in self.sessions[session]["node_list"]:
+ timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
+ self.sessions[session]["commTimers"][node] = timer;
+ #print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout']))
+ timer.start();
+ self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback);
+
+ def _event_comm_timeout(self, session, nodeId):
+ """
+ Triggered if any of the readout operations timeout.
+ Sends a failure message back to the client, stops communicating
+ with the failing device.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The id of the device which timed out
+ """
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
+ msg['failure']['error']['text'] = "Timeout";
+ msg['failure']['error']['nodeId'] = nodeId;
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ msg['failure']['done'] = 'true';
+ msg.send();
+ # The session is complete, delete it
+ #print("del session " + session + " due to timeout")
+ del self.sessions[session];
+
+ def _event_delayed_req(self, session, process_fields, req_flags):
+ """
+ Triggered when the timer from a delayed request fires.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to request from the devices
+ flags -- [optional] flags to pass to the devices, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
"""
- Recieving a stanza for erading values
- # verify provisioning
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['started']['seqnr'] = self.sessions[session]['seqnr'];
+ msg.send();
+
+ if self.threaded:
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
+ tr_req.start()
+ else:
+ self._threaded_node_request(session, process_fields, req_flags);
+
+ def _all_nodes_done(self, session):
+ """
+ Checks wheter all devices are done replying to the readout.
+
+ Arguments:
+ session -- The request session id
+ """
+ for n in self.sessions[session]["nodeDone"]:
+ if not self.sessions[session]["nodeDone"][n]:
+ return False;
+ return True;
+
+ def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None):
+ """
+ Callback function called by the devices when they have any additional data.
+ Composes a message with the data and sends it back to the client, and resets
+ the timeout timer for the device.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The device id which initiated the callback
+ result -- The current result status of the readout. Valid values are:
+ "error" - Readout failed.
+ "fields" - Contains readout data.
+ "done" - Indicates that the readout is complete. May contain
+ readout data.
+ timestamp_block -- [optional] Only applies when result != "error"
+ The readout data. Structured as a dictionary:
+ {
+ timestamp: timestamp for this datablock,
+ fields: list of field dictionary (one per readout field).
+ readout field dictionary format:
+ {
+ type: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
+ name: The field name
+ value: The field value
+ unit: The unit of the field. Only applies to type numeric.
+ dataType: The datatype of the field. Only applies to type enum.
+ flags: [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ }
+ }
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+ """
+ if not session in self.sessions:
+ # This can happend if a session was deleted, like in a cancellation. Just drop the data.
+ return
+
+ if result == "error":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
+ msg['failure']['error']['text'] = error_msg;
+ msg['failure']['error']['nodeId'] = nodeId;
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ msg['failure']['done'] = 'true';
+ # The session is complete, delete it
+ # print("del session " + session + " due to error")
+ del self.sessions[session];
+ msg.send();
+ else:
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['fields']['seqnr'] = self.sessions[session]['seqnr'];
+
+ if timestamp_block is not None and len(timestamp_block) > 0:
+ node = msg['fields'].add_node(nodeId);
+ ts = node.add_timestamp(timestamp_block["timestamp"]);
+
+ for f in timestamp_block["fields"]:
+ data = ts.add_data( typename=f['type'],
+ name=f['name'],
+ value=f['value'],
+ unit=f['unit'],
+ dataType=f['dataType'],
+ flags=f['flags']);
+
+ if result == "done":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ msg['fields']['done'] = 'true';
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ # print("del session " + session + " due to complete")
+ del self.sessions[session];
+ else:
+ # Restart comm timer
+ self.sessions[session]["commTimers"][nodeId].reset();
+
+ msg.send();
+
+ def _handle_event_cancel(self, iq):
+ """ Received Iq with cancel - this is a cancel request.
+ Delete the session and confirm. """
- # verify requested values and categories
+ seqnr = iq['cancel']['seqnr'];
+ # Find the session
+ for s in self.sessions:
+ if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr:
+ # found it. Cancel all timers
+ for n in self.sessions[s]["commTimers"]:
+ self.sessions[s]["commTimers"][n].cancel();
- # Send accepted
- # Thread of the readout
+ # Confirm
+ iq.reply();
+ iq['type'] = 'result';
+ iq['cancelled']['seqnr'] = seqnr;
+ iq.send(block=False);
+
+ # Delete session
+ del self.sessions[s]
+ return
- # send started
+ # Could not find session, send reject
+ iq.reply();
+ iq['type'] = 'error';
+ iq['rejected']['seqnr'] = seqnr;
+ iq['rejected']['error'] = "Cancel request received, no matching request is active.";
+ iq.send(block=False);
- # send data messages
+ # =================================================================
+ # Client side (data retriever) API
- # send done
- """
- pass
+ def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None):
+ """
+ Called on the client side to initiade a data readout.
+ Composes a message with the request and sends it to the device(s).
+ Does not block, the callback will be called when data is available.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+ callback -- The callback function to call when data is availble.
+
+ The callback function must support the following arguments:
+
+ from_jid -- The jid of the responding device(s)
+ result -- The current result status of the readout. Valid values are:
+ "accepted" - Readout request accepted
+ "queued" - Readout request accepted and queued
+ "rejected" - Readout request rejected
+ "failure" - Readout failed.
+ "cancelled" - Confirmation of request cancellation.
+ "started" - Previously queued request is now started
+ "fields" - Contains readout data.
+ "done" - Indicates that the readout is complete.
+
+ nodeId -- [optional] Mandatory when result == "fields" or "failure".
+ The node Id of the responding device. One callback will only
+ contain data from one device.
+ timestamp -- [optional] Mandatory when result == "fields".
+ The timestamp of data in this callback. One callback will only
+ contain data from one timestamp.
+ fields -- [optional] Mandatory when result == "fields".
+ List of field dictionaries representing the readout data.
+ Dictionary format:
+ {
+ typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
+ name: The field name
+ value: The field value
+ unit: The unit of the field. Only applies to type numeric.
+ dataType: The datatype of the field. Only applies to type enum.
+ flags: [optional] data classifier flags for the field, e.g. momentary.
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ }
+
+ error_msg -- [optional] Mandatory when result == "rejected" or "failure".
+ Details about why the request is rejected or failed.
+ "rejected" means that the request is stopped, but note that the
+ request will continue even after a "failure". "failure" only means
+ that communication was stopped to that specific device, other
+ device(s) (if any) will continue their readout.
+
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ fields -- [optional] Limits the request to the field names in this list.
+ flags -- [optional] Limits the request according to the flags, or sets
+ readout conditions such as timing.
+
+ Return value:
+ session -- Session identifier. Client can use this as a reference to cancel
+ the request.
+ """
+ iq = self.xmpp.Iq();
+ iq['from'] = from_jid;
+ iq['to'] = to_jid;
+ iq['type'] = "get";
+ seqnr = self._get_new_seqnr();
+ iq['id'] = seqnr;
+ iq['req']['seqnr'] = seqnr;
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ iq['req'].add_node(nodeId);
+ if fields is not None:
+ for field in fields:
+ iq['req'].add_field(field);
+
+ iq['req']._set_flags(flags);
+
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback};
+ iq.send(block=False);
+
+ return seqnr;
+
+ def cancel_request(self, session):
+ """
+ Called on the client side to cancel a request for data readout.
+ Composes a message with the cancellation and sends it to the device(s).
+ Does not block, the callback will be called when cancellation is
+ confirmed.
+
+ Arguments:
+ session -- The session id of the request to cancel
+ """
+ seqnr = session
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[seqnr]['from']
+ iq['to'] = self.sessions[seqnr]['to'];
+ iq['type'] = "get";
+ iq['id'] = seqnr;
+ iq['cancel']['seqnr'] = seqnr;
+ iq.send(block=False);
+
+ def _get_new_seqnr(self):
+ """ Returns a unique sequence number (unique across threads) """
+ self.seqnr_lock.acquire();
+ self.last_seqnr = self.last_seqnr + 1;
+ self.seqnr_lock.release();
+ return str(self.last_seqnr);
+
+ def _handle_event_accepted(self, iq):
+ """ Received Iq with accepted - request was accepted """
+ seqnr = iq['accepted']['seqnr'];
+ result = "accepted"
+ if iq['accepted']['queued'] == 'true':
+ result = "queued"
+
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result=result);
+
+ def _handle_event_rejected(self, iq):
+ """ Received Iq with rejected - this is a reject.
+ Delete the session. """
+ seqnr = iq['rejected']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']);
+ # Session terminated
+ del self.sessions[seqnr];
+
+ def _handle_event_cancelled(self, iq):
+ """
+ Received Iq with cancelled - this is a cancel confirm.
+ Delete the session.
+ """
+ #print("Got cancelled")
+ seqnr = iq['cancelled']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result="cancelled");
+ # Session cancelled
+ del self.sessions[seqnr];
+
+ def _handle_event_fields(self, msg):
+ """
+ Received Msg with fields - this is a data reponse to a request.
+ If this is the last data block, issue a "done" callback.
+ """
+ seqnr = msg['fields']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ for node in msg['fields']['nodes']:
+ for ts in node['timestamps']:
+ fields = [];
+ for d in ts['datas']:
+ field_block = {};
+ field_block["name"] = d['name'];
+ field_block["typename"] = d._get_typename();
+ field_block["value"] = d['value'];
+ if not d['unit'] == "": field_block["unit"] = d['unit'];
+ if not d['dataType'] == "": field_block["dataType"] = d['dataType'];
+ flags = d._get_flags();
+ if not len(flags) == 0:
+ field_block["flags"] = flags;
+ fields.append(field_block);
+
+ callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields);
+
+ if msg['fields']['done'] == "true":
+ callback(from_jid=msg['from'], result="done");
+ # Session done
+ del self.sessions[seqnr];
+
+ def _handle_event_failure(self, msg):
+ """
+ Received Msg with failure - our request failed
+ Delete the session.
+ """
+ seqnr = msg['failure']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']);
+
+ # Session failed
+ del self.sessions[seqnr];
+
+ def _handle_event_started(self, msg):
+ """
+ Received Msg with started - our request was queued and is now started.
+ """
+ seqnr = msg['started']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=msg['from'], result="started");
diff --git a/sleekxmpp/plugins/xep_0323/stanza/__init__.py b/sleekxmpp/plugins/xep_0323/stanza/__init__.py
index 82dc9eea..c039cefa 100644
--- a/sleekxmpp/plugins/xep_0323/stanza/__init__.py
+++ b/sleekxmpp/plugins/xep_0323/stanza/__init__.py
@@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
diff --git a/sleekxmpp/plugins/xep_0323/stanza/base.py b/sleekxmpp/plugins/xep_0323/stanza/base.py
index 4caf07b2..1dadcf46 100644
--- a/sleekxmpp/plugins/xep_0323/stanza/base.py
+++ b/sleekxmpp/plugins/xep_0323/stanza/base.py
@@ -1,9 +1,10 @@
"""
SleekXMPP: The Sleek XMPP Library
-
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
See the file LICENSE for copying permission.
"""
diff --git a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py
index 9567ef87..55e5060f 100644
--- a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py
+++ b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py
@@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
@@ -10,55 +10,783 @@
from sleekxmpp import Iq, Message
from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
-
+from re import match
class Sensordata(ElementBase):
- namespace = 'http://xmpp.org/iot/sensordata'
+ """ Placeholder for the namespace, not used as a stanza """
+ namespace = 'urn:xmpp:iot:sensordata'
name = 'sensordata'
plugin_attrib = name
interfaces = set(tuple())
+class FieldTypes():
+ """
+ All field types are optional booleans that default to False
+ """
+ field_types = set([ 'momentary','peak','status','computed','identity','historicalSecond','historicalMinute','historicalHour', \
+ 'historicalDay','historicalWeek','historicalMonth','historicalQuarter','historicalYear','historicalOther'])
+
+class FieldStatus():
+ """
+ All field statuses are optional booleans that default to False
+ """
+ field_status = set([ 'missing','automaticEstimate','manualEstimate','manualReadout','automaticReadout','timeOffset','warning','error', \
+ 'signed','invoiced','endOfSeries','powerFailure','invoiceConfirmed'])
+
class Request(ElementBase):
- namespace = 'http://xmpp.org/iot/sensordata'
+ namespace = 'urn:xmpp:iot:sensordata'
name = 'req'
plugin_attrib = name
- interfaces = set(('seqnr','momentary'))
+ interfaces = set(['seqnr','nodes','fields','serviceToken','deviceToken','userToken','from','to','when','historical','all'])
+ interfaces.update(FieldTypes.field_types);
+ _flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']);
+ _flags.update(FieldTypes.field_types);
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._nodes = set()
+ self._fields = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._nodes = set([node['nodeId'] for node in self['nodes']])
+ self._fields = set([field['name'] for field in self['fields']])
+
+ def _get_flags(self):
+ """
+ Helper function for getting of flags. Returns all flags in
+ dictionary format: { "flag name": "flag value" ... }
+ """
+ flags = {};
+ for f in self._flags:
+ if not self[f] == "":
+ flags[f] = self[f];
+ return flags;
+
+ def _set_flags(self, flags):
+ """
+ Helper function for setting of flags.
+
+ Arguments:
+ flags -- Flags in dictionary format: { "flag name": "flag value" ... }
+ """
+ for f in self._flags:
+ if flags is not None and f in flags:
+ self[f] = flags[f];
+ else:
+ self[f] = None;
+
+ def add_node(self, nodeId, sourceId=None, cacheType=None):
+ """
+ Add a new node element. Each item is required to have a
+ nodeId, but may also specify a sourceId value and cacheType.
+
+ Arguments:
+ nodeId -- The ID for the node.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ if nodeId not in self._nodes:
+ self._nodes.add((nodeId))
+ node = RequestNode(parent=self)
+ node['nodeId'] = nodeId
+ node['sourceId'] = sourceId
+ node['cacheType'] = cacheType
+ self.iterables.append(node)
+ return node
+ return None
+
+ def del_node(self, nodeId):
+ """
+ Remove a single node.
+
+ Arguments:
+ nodeId -- Node ID of the item to remove.
+ """
+ if nodeId in self._nodes:
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ if node['nodeId'] == nodeId:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+ return True
+ return False
+
+ def get_nodes(self):
+ """Return all nodes."""
+ nodes = set()
+ for node in self['substanzas']:
+ if isinstance(node, RequestNode):
+ nodes.add(node)
+ return nodes
+
+ def set_nodes(self, nodes):
+ """
+ Set or replace all nodes. The given nodes must be in a
+ list or set where each item is a tuple of the form:
+ (nodeId, sourceId, cacheType)
+
+ Arguments:
+ nodes -- A series of nodes in tuple format.
+ """
+ self.del_nodes()
+ for node in nodes:
+ if isinstance(node, RequestNode):
+ self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
+ else:
+ nodeId, sourceId, cacheType = node
+ self.add_node(nodeId, sourceId, cacheType)
+
+ def del_nodes(self):
+ """Remove all nodes."""
+ self._nodes = set()
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+
+
+ def add_field(self, name):
+ """
+ Add a new field element. Each item is required to have a
+ name.
+
+ Arguments:
+ name -- The name of the field.
+ """
+ if name not in self._fields:
+ self._fields.add((name))
+ field = RequestField(parent=self)
+ field['name'] = name
+ self.iterables.append(field)
+ return field
+ return None
+
+ def del_field(self, name):
+ """
+ Remove a single field.
+
+ Arguments:
+ name -- name of field to remove.
+ """
+ if name in self._fields:
+ fields = [i for i in self.iterables if isinstance(i, RequestField)]
+ for field in fields:
+ if field['name'] == name:
+ self.xml.remove(field.xml)
+ self.iterables.remove(field)
+ return True
+ return False
+
+ def get_fields(self):
+ """Return all fields."""
+ fields = set()
+ for field in self['substanzas']:
+ if isinstance(field, RequestField):
+ fields.add(field)
+ return fields
+
+ def set_fields(self, fields):
+ """
+ Set or replace all fields. The given fields must be in a
+ list or set where each item is RequestField or string
+
+ Arguments:
+ fields -- A series of fields in RequestField or string format.
+ """
+ self.del_fields()
+ for field in fields:
+ if isinstance(field, RequestField):
+ self.add_field(field['name'])
+ else:
+ self.add_field(field)
+
+ def del_fields(self):
+ """Remove all fields."""
+ self._fields = set()
+ fields = [i for i in self.iterables if isinstance(i, RequestField)]
+ for field in fields:
+ self.xml.remove(field.xml)
+ self.iterables.remove(field)
+
+
+class RequestNode(ElementBase):
+ """ Node element in a request """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'node'
+ plugin_attrib = name
+ interfaces = set(['nodeId','sourceId','cacheType'])
+class RequestField(ElementBase):
+ """ Field element in a request """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'field'
+ plugin_attrib = name
+ interfaces = set(['name'])
class Accepted(ElementBase):
- namespace = 'http://xmpp.org/iot/sensordata'
+ namespace = 'urn:xmpp:iot:sensordata'
name = 'accepted'
plugin_attrib = name
- interfaces = set(('seqnr'))
+ interfaces = set(['seqnr','queued'])
+class Started(ElementBase):
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'started'
+ plugin_attrib = name
+ interfaces = set(['seqnr'])
class Failure(ElementBase):
- namespace = 'http://xmpp.org/iot/sensordata'
+ namespace = 'urn:xmpp:iot:sensordata'
name = 'failure'
plugin_attrib = name
- interfaces = set(('seqnr','done'))
-
-
-
-register_stanza_plugin(Iq, Sensordata)
-register_stanza_plugin(Sensordata, Request)
-register_stanza_plugin(Sensordata, Accepted)
-register_stanza_plugin(Sensordata, Failure)
-# register_stanza_plugin(Pubsub, Default)
-# register_stanza_plugin(Pubsub, Items)
-# register_stanza_plugin(Pubsub, Options)
-# register_stanza_plugin(Pubsub, Publish)
-# register_stanza_plugin(Pubsub, PublishOptions)
-# register_stanza_plugin(Pubsub, Retract)
-# register_stanza_plugin(Pubsub, Subscribe)
-# register_stanza_plugin(Pubsub, Subscription)
-# register_stanza_plugin(Pubsub, Subscriptions)
-# register_stanza_plugin(Pubsub, Unsubscribe)
-# register_stanza_plugin(Affiliations, Affiliation, iterable=True)
-# register_stanza_plugin(Configure, xep_0004.Form)
-# register_stanza_plugin(Items, Item, iterable=True)
-# register_stanza_plugin(Publish, Item, iterable=True)
-# register_stanza_plugin(Retract, Item)
-# register_stanza_plugin(Subscribe, Options)
-# register_stanza_plugin(Subscription, SubscribeOptions)
-# register_stanza_plugin(Subscriptions, Subscription, iterable=True)
+ interfaces = set(['seqnr','done'])
+
+class Error(ElementBase):
+ """ Error element in a request failure """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'error'
+ plugin_attrib = name
+ interfaces = set(['nodeId','timestamp','sourceId','cacheType','text'])
+
+ def get_text(self):
+ """Return then contents inside the XML tag."""
+ return self.xml.text
+
+ def set_text(self, value):
+ """Set then contents inside the XML tag.
+
+ :param value: string
+ """
+
+ self.xml.text = value;
+ return self
+
+ def del_text(self):
+ """Remove the contents inside the XML tag."""
+ self.xml.text = ""
+ return self
+
+class Rejected(ElementBase):
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'rejected'
+ plugin_attrib = name
+ interfaces = set(['seqnr','error'])
+ sub_interfaces = set(['error'])
+
+class Fields(ElementBase):
+ """ Fields element, top level in a response message with data """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'fields'
+ plugin_attrib = name
+ interfaces = set(['seqnr','done','nodes'])
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._nodes = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._nodes = set([node['nodeId'] for node in self['nodes']])
+
+
+ def add_node(self, nodeId, sourceId=None, cacheType=None, substanzas=None):
+ """
+ Add a new node element. Each item is required to have a
+ nodeId, but may also specify a sourceId value and cacheType.
+
+ Arguments:
+ nodeId -- The ID for the node.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ if nodeId not in self._nodes:
+ self._nodes.add((nodeId))
+ node = FieldsNode(parent=self)
+ node['nodeId'] = nodeId
+ node['sourceId'] = sourceId
+ node['cacheType'] = cacheType
+ if substanzas is not None:
+ node.set_timestamps(substanzas)
+
+ self.iterables.append(node)
+ return node
+ return None
+
+ def del_node(self, nodeId):
+ """
+ Remove a single node.
+
+ Arguments:
+ nodeId -- Node ID of the item to remove.
+ """
+ if nodeId in self._nodes:
+ nodes = [i for i in self.iterables if isinstance(i, FieldsNode)]
+ for node in nodes:
+ if node['nodeId'] == nodeId:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+ return True
+ return False
+
+ def get_nodes(self):
+ """Return all nodes."""
+ nodes = set()
+ for node in self['substanzas']:
+ if isinstance(node, FieldsNode):
+ nodes.add(node)
+ return nodes
+
+ def set_nodes(self, nodes):
+ """
+ Set or replace all nodes. The given nodes must be in a
+ list or set where each item is a tuple of the form:
+ (nodeId, sourceId, cacheType)
+
+ Arguments:
+ nodes -- A series of nodes in tuple format.
+ """
+ #print(str(id(self)) + " set_nodes: got " + str(nodes))
+ self.del_nodes()
+ for node in nodes:
+ if isinstance(node, FieldsNode):
+ self.add_node(node['nodeId'], node['sourceId'], node['cacheType'], substanzas=node['substanzas'])
+ else:
+ nodeId, sourceId, cacheType = node
+ self.add_node(nodeId, sourceId, cacheType)
+
+ def del_nodes(self):
+ """Remove all nodes."""
+ self._nodes = set()
+ nodes = [i for i in self.iterables if isinstance(i, FieldsNode)]
+ for node in nodes:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+
+
+class FieldsNode(ElementBase):
+ """ Node element in response fields """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'node'
+ plugin_attrib = name
+ interfaces = set(['nodeId','sourceId','cacheType','timestamps'])
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._timestamps = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._timestamps = set([ts['value'] for ts in self['timestamps']])
+
+ def add_timestamp(self, timestamp, substanzas=None):
+ """
+ Add a new timestamp element.
+
+ Arguments:
+ timestamp -- The timestamp in ISO format.
+ """
+ #print(str(id(self)) + " add_timestamp: " + str(timestamp))
+
+ if timestamp not in self._timestamps:
+ self._timestamps.add((timestamp))
+ ts = Timestamp(parent=self)
+ ts['value'] = timestamp
+ if not substanzas is None:
+ ts.set_datas(substanzas);
+ #print("add_timestamp with substanzas: " + str(substanzas))
+ self.iterables.append(ts)
+ #print(str(id(self)) + " added_timestamp: " + str(id(ts)))
+ return ts
+ return None
+
+ def del_timestamp(self, timestamp):
+ """
+ Remove a single timestamp.
+
+ Arguments:
+ timestamp -- timestamp (in ISO format) of the item to remove.
+ """
+ #print("del_timestamp: ")
+ if timestamp in self._timestamps:
+ timestamps = [i for i in self.iterables if isinstance(i, Timestamp)]
+ for ts in timestamps:
+ if ts['value'] == timestamp:
+ self.xml.remove(ts.xml)
+ self.iterables.remove(ts)
+ return True
+ return False
+
+ def get_timestamps(self):
+ """Return all timestamps."""
+ #print(str(id(self)) + " get_timestamps: ")
+ timestamps = set()
+ for timestamp in self['substanzas']:
+ if isinstance(timestamp, Timestamp):
+ timestamps.add(timestamp)
+ return timestamps
+
+ def set_timestamps(self, timestamps):
+ """
+ Set or replace all timestamps. The given timestamps must be in a
+ list or set where each item is a timestamp
+
+ Arguments:
+ timestamps -- A series of timestamps.
+ """
+ #print(str(id(self)) + " set_timestamps: got " + str(timestamps))
+ self.del_timestamps()
+ for timestamp in timestamps:
+ #print("set_timestamps: subset " + str(timestamp))
+ #print("set_timestamps: subset.substanzas " + str(timestamp['substanzas']))
+ if isinstance(timestamp, Timestamp):
+ self.add_timestamp(timestamp['value'], substanzas=timestamp['substanzas'])
+ else:
+ #print("set_timestamps: got " + str(timestamp))
+ self.add_timestamp(timestamp)
+
+ def del_timestamps(self):
+ """Remove all timestamps."""
+ #print(str(id(self)) + " del_timestamps: ")
+ self._timestamps = set()
+ timestamps = [i for i in self.iterables if isinstance(i, Timestamp)]
+ for timestamp in timestamps:
+ self.xml.remove(timestamp.xml)
+ self.iterables.remove(timestamp)
+
+class Field(ElementBase):
+ """
+ Field element in response Timestamp. This is a base class,
+ all instances of fields added to Timestamp must be of types:
+ DataNumeric
+ DataString
+ DataBoolean
+ DataDateTime
+ DataTimeSpan
+ DataEnum
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'field'
+ plugin_attrib = name
+ interfaces = set(['name','module','stringIds']);
+ interfaces.update(FieldTypes.field_types);
+ interfaces.update(FieldStatus.field_status);
+
+ _flags = set();
+ _flags.update(FieldTypes.field_types);
+ _flags.update(FieldStatus.field_status);
+
+ def set_stringIds(self, value):
+ """Verifies stringIds according to regexp from specification XMPP-0323.
+
+ :param value: string
+ """
+
+ pattern = re.compile("^\d+([|]\w+([.]\w+)*([|][^,]*)?)?(,\d+([|]\w+([.]\w+)*([|][^,]*)?)?)*$")
+ if pattern.match(value) is not None:
+ self.xml.stringIds = value;
+ else:
+ # Bad content, add nothing
+ pass
+
+ return self
+
+ def _get_flags(self):
+ """
+ Helper function for getting of flags. Returns all flags in
+ dictionary format: { "flag name": "flag value" ... }
+ """
+ flags = {};
+ for f in self._flags:
+ if not self[f] == "":
+ flags[f] = self[f];
+ return flags;
+
+ def _set_flags(self, flags):
+ """
+ Helper function for setting of flags.
+
+ Arguments:
+ flags -- Flags in dictionary format: { "flag name": "flag value" ... }
+ """
+ for f in self._flags:
+ if flags is not None and f in flags:
+ self[f] = flags[f];
+ else:
+ self[f] = None;
+
+ def _get_typename(self):
+ return "invalid type, use subclasses!";
+
+
+class Timestamp(ElementBase):
+ """ Timestamp element in response Node """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'timestamp'
+ plugin_attrib = name
+ interfaces = set(['value','datas'])
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._datas = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._datas = set([data['name'] for data in self['datas']])
+
+ def add_data(self, typename, name, value, module=None, stringIds=None, unit=None, dataType=None, flags=None):
+ """
+ Add a new data element.
+
+ Arguments:
+ typename -- The type of data element (numeric, string, boolean, dateTime, timeSpan or enum)
+ value -- The value of the data element
+ module -- [optional] language module to use for the data element
+ stringIds -- [optional] The stringIds used to find associated text in the language module
+ unit -- [optional] The unit. Only applicable for type numeric
+ dataType -- [optional] The dataType. Only applicable for type enum
+ """
+ if name not in self._datas:
+ dataObj = None;
+ if typename == "numeric":
+ dataObj = DataNumeric(parent=self);
+ dataObj['unit'] = unit;
+ elif typename == "string":
+ dataObj = DataString(parent=self);
+ elif typename == "boolean":
+ dataObj = DataBoolean(parent=self);
+ elif typename == "dateTime":
+ dataObj = DataDateTime(parent=self);
+ elif typename == "timeSpan":
+ dataObj = DataTimeSpan(parent=self);
+ elif typename == "enum":
+ dataObj = DataEnum(parent=self);
+ dataObj['dataType'] = dataType;
+
+ dataObj['name'] = name;
+ dataObj['value'] = value;
+ dataObj['module'] = module;
+ dataObj['stringIds'] = stringIds;
+
+ if flags is not None:
+ dataObj._set_flags(flags);
+
+ self._datas.add(name)
+ self.iterables.append(dataObj)
+ return dataObj
+ return None
+
+ def del_data(self, name):
+ """
+ Remove a single data element.
+
+ Arguments:
+ data_name -- The data element name to remove.
+ """
+ if name in self._datas:
+ datas = [i for i in self.iterables if isinstance(i, Field)]
+ for data in datas:
+ if data['name'] == name:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+ return True
+ return False
+
+ def get_datas(self):
+ """ Return all data elements. """
+ datas = set()
+ for data in self['substanzas']:
+ if isinstance(data, Field):
+ datas.add(data)
+ return datas
+
+ def set_datas(self, datas):
+ """
+ Set or replace all data elements. The given elements must be in a
+ list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum)
+
+ Arguments:
+ datas -- A series of data elements.
+ """
+ self.del_datas()
+ for data in datas:
+ self.add_data(typename=data._get_typename(), name=data['name'], value=data['value'], module=data['module'], stringIds=data['stringIds'], unit=data['unit'], dataType=data['dataType'], flags=data._get_flags())
+
+ def del_datas(self):
+ """Remove all data elements."""
+ self._datas = set()
+ datas = [i for i in self.iterables if isinstance(i, Field)]
+ for data in datas:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+
+class DataNumeric(Field):
+ """
+ Field data of type numeric.
+ Note that the value is expressed as a string.
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'numeric'
+ plugin_attrib = name
+ interfaces = set(['value', 'unit']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "numeric"
+
+class DataString(Field):
+ """
+ Field data of type string
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'string'
+ plugin_attrib = name
+ interfaces = set(['value']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "string"
+
+class DataBoolean(Field):
+ """
+ Field data of type boolean.
+ Note that the value is expressed as a string.
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'boolean'
+ plugin_attrib = name
+ interfaces = set(['value']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "boolean"
+
+class DataDateTime(Field):
+ """
+ Field data of type dateTime.
+ Note that the value is expressed as a string.
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'dateTime'
+ plugin_attrib = name
+ interfaces = set(['value']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "dateTime"
+
+class DataTimeSpan(Field):
+ """
+ Field data of type timeSpan.
+ Note that the value is expressed as a string.
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'timeSpan'
+ plugin_attrib = name
+ interfaces = set(['value']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "timeSpan"
+
+class DataEnum(Field):
+ """
+ Field data of type enum.
+ Note that the value is expressed as a string.
+ """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'enum'
+ plugin_attrib = name
+ interfaces = set(['value', 'dataType']);
+ interfaces.update(Field.interfaces);
+
+ def _get_typename(self):
+ return "enum"
+
+class Done(ElementBase):
+ """ Done element used to signal that all data has been transferred """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'done'
+ plugin_attrib = name
+ interfaces = set(['seqnr'])
+
+class Cancel(ElementBase):
+ """ Cancel element used to signal that a request shall be cancelled """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'cancel'
+ plugin_attrib = name
+ interfaces = set(['seqnr'])
+
+class Cancelled(ElementBase):
+ """ Cancelled element used to signal that cancellation is confirmed """
+ namespace = 'urn:xmpp:iot:sensordata'
+ name = 'cancelled'
+ plugin_attrib = name
+ interfaces = set(['seqnr'])
+
+
+register_stanza_plugin(Iq, Request)
+register_stanza_plugin(Request, RequestNode, iterable=True)
+register_stanza_plugin(Request, RequestField, iterable=True)
+
+register_stanza_plugin(Iq, Accepted)
+register_stanza_plugin(Message, Failure)
+register_stanza_plugin(Failure, Error)
+
+register_stanza_plugin(Iq, Rejected)
+
+register_stanza_plugin(Message, Fields)
+register_stanza_plugin(Fields, FieldsNode, iterable=True)
+register_stanza_plugin(FieldsNode, Timestamp, iterable=True)
+register_stanza_plugin(Timestamp, Field, iterable=True)
+register_stanza_plugin(Timestamp, DataNumeric, iterable=True)
+register_stanza_plugin(Timestamp, DataString, iterable=True)
+register_stanza_plugin(Timestamp, DataBoolean, iterable=True)
+register_stanza_plugin(Timestamp, DataDateTime, iterable=True)
+register_stanza_plugin(Timestamp, DataTimeSpan, iterable=True)
+register_stanza_plugin(Timestamp, DataEnum, iterable=True)
+
+register_stanza_plugin(Message, Started)
+
+register_stanza_plugin(Iq, Cancel)
+register_stanza_plugin(Iq, Cancelled)
diff --git a/sleekxmpp/plugins/xep_0323/timerreset.py b/sleekxmpp/plugins/xep_0323/timerreset.py
new file mode 100644
index 00000000..578f1efe
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0323/timerreset.py
@@ -0,0 +1,64 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+from threading import Thread, Event, Timer
+import time
+
+def TimerReset(*args, **kwargs):
+ """ Global function for Timer """
+ return _TimerReset(*args, **kwargs)
+
+
+class _TimerReset(Thread):
+ """Call a function after a specified number of seconds:
+
+ t = TimerReset(30.0, f, args=[], kwargs={})
+ t.start()
+ t.cancel() # stop the timer's action if it's still waiting
+ """
+
+ def __init__(self, interval, function, args=[], kwargs={}):
+ Thread.__init__(self)
+ self.interval = interval
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+ self.finished = Event()
+ self.resetted = True
+
+ def cancel(self):
+ """Stop the timer if it hasn't finished yet"""
+ self.finished.set()
+
+ def run(self):
+ #print "Time: %s - timer running..." % time.asctime()
+
+ while self.resetted:
+ #print "Time: %s - timer waiting for timeout in %.2f..." % (time.asctime(), self.interval)
+ self.resetted = False
+ self.finished.wait(self.interval)
+
+ if not self.finished.isSet():
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+ #print "Time: %s - timer finished!" % time.asctime()
+
+ def reset(self, interval=None):
+ """ Reset the timer """
+
+ if interval:
+ #print "Time: %s - timer resetting to %.2f..." % (time.asctime(), interval)
+ self.interval = interval
+ else:
+ #print "Time: %s - timer resetting..." % time.asctime()
+ pass
+
+ self.resetted = True
+ self.finished.set()
+ self.finished.clear()
diff --git a/sleekxmpp/plugins/xep_0325/__init__.py b/sleekxmpp/plugins/xep_0325/__init__.py
new file mode 100644
index 00000000..01c38dce
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/__init__.py
@@ -0,0 +1,18 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+from sleekxmpp.plugins.base import register_plugin
+
+from sleekxmpp.plugins.xep_0325.control import XEP_0325
+from sleekxmpp.plugins.xep_0325 import stanza
+
+register_plugin(XEP_0325)
+
+xep_0325=XEP_0325
diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py
new file mode 100644
index 00000000..e34eb2c2
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/control.py
@@ -0,0 +1,574 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+import logging
+import time
+from threading import Thread, Timer, Lock
+
+from sleekxmpp.xmlstream import JID
+from sleekxmpp.xmlstream.handler import Callback
+from sleekxmpp.xmlstream.matcher import StanzaPath
+from sleekxmpp.plugins.base import BasePlugin
+from sleekxmpp.plugins.xep_0325 import stanza
+from sleekxmpp.plugins.xep_0325.stanza import Control
+
+
+log = logging.getLogger(__name__)
+
+
+class XEP_0325(BasePlugin):
+
+ """
+ XEP-0325: IoT Control
+
+
+ Actuators are devices in sensor networks that can be controlled through
+ the network and act with the outside world. In sensor networks and
+ Internet of Things applications, actuators make it possible to automate
+ real-world processes.
+ This plugin implements a mechanism whereby actuators can be controlled
+ in XMPP-based sensor networks, making it possible to integrate sensors
+ and actuators of different brands, makes and models into larger
+ Internet of Things applications.
+
+ Also see <http://xmpp.org/extensions/xep-0325.html>
+
+ Configuration Values:
+ threaded -- Indicates if communication with sensors should be threaded.
+ Defaults to True.
+
+ Events:
+ Sensor side
+ -----------
+ Control Event:DirectSet -- Received a control message
+ Control Event:SetReq -- Received a control request
+
+ Client side
+ -----------
+ Control Event:SetResponse -- Received a response to a
+ control request, type result
+ Control Event:SetResponseError -- Received a response to a
+ control request, type error
+
+ Attributes:
+ threaded -- Indicates if command events should be threaded.
+ Defaults to True.
+ sessions -- A dictionary or equivalent backend mapping
+ session IDs to dictionaries containing data
+ relevant to a request's session. This dictionary is used
+ both by the client and sensor side. On client side, seqnr
+ is used as key, while on sensor side, a session_id is used
+ as key. This ensures that the two will not collide, so
+ one instance can be both client and sensor.
+ Sensor side
+ -----------
+ nodes -- A dictionary mapping sensor nodes that are serviced through
+ this XMPP instance to their device handlers ("drivers").
+ Client side
+ -----------
+ last_seqnr -- The last used sequence number (integer). One sequence of
+ communication (e.g. -->request, <--accept, <--fields)
+ between client and sensor is identified by a unique
+ sequence number (unique between the client/sensor pair)
+
+ Methods:
+ plugin_init -- Overrides base_plugin.plugin_init
+ post_init -- Overrides base_plugin.post_init
+ plugin_end -- Overrides base_plugin.plugin_end
+
+ Sensor side
+ -----------
+ register_node -- Register a sensor as available from this XMPP
+ instance.
+
+ Client side
+ -----------
+ set_request -- Initiates a control request to modify data in
+ sensor(s). Non-blocking, a callback function will
+ be called when the sensor has responded.
+ set_command -- Initiates a control command to modify data in
+ sensor(s). Non-blocking. The sensor(s) will not
+ respond regardless of the result of the command,
+ so no callback is made.
+
+ """
+
+ name = 'xep_0325'
+ description = 'XEP-0325 Internet of Things - Control'
+ dependencies = set(['xep_0030'])
+ stanza = stanza
+
+
+ default_config = {
+ 'threaded': True
+# 'session_db': None
+ }
+
+ def plugin_init(self):
+ """ Start the XEP-0325 plugin """
+
+ self.xmpp.register_handler(
+ Callback('Control Event:DirectSet',
+ StanzaPath('message/set'),
+ self._handle_direct_set))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetReq',
+ StanzaPath('iq@type=set/set'),
+ self._handle_set_req))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetResponse',
+ StanzaPath('iq@type=result/setResponse'),
+ self._handle_set_response))
+
+ self.xmpp.register_handler(
+ Callback('Control Event:SetResponseError',
+ StanzaPath('iq@type=error/setResponse'),
+ self._handle_set_response))
+
+ # Server side dicts
+ self.nodes = {};
+ self.sessions = {};
+
+ self.last_seqnr = 0;
+ self.seqnr_lock = Lock();
+
+ ## For testning only
+ self.test_authenticated_from = ""
+
+ def post_init(self):
+ """ Init complete. Register our features in Serivce discovery. """
+ BasePlugin.post_init(self)
+ self.xmpp['xep_0030'].add_feature(Control.namespace)
+ self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple())
+
+ def _new_session(self):
+ """ Return a new session ID. """
+ return str(time.time()) + '-' + self.xmpp.new_id()
+
+ def plugin_end(self):
+ """ Stop the XEP-0325 plugin """
+ self.sessions.clear();
+ self.xmpp.remove_handler('Control Event:DirectSet')
+ self.xmpp.remove_handler('Control Event:SetReq')
+ self.xmpp.remove_handler('Control Event:SetResponse')
+ self.xmpp.remove_handler('Control Event:SetResponseError')
+ self.xmpp['xep_0030'].del_feature(feature=Control.namespace)
+ self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple());
+
+
+ # =================================================================
+ # Sensor side (data provider) API
+
+ def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
+ """
+ Register a sensor/device as available for control requests/commands
+ through this XMPP instance.
+
+ The device object may by any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_control_field
+ set_control_fields
+ according to the interfaces shown in the example device.py file.
+
+ Arguments:
+ nodeId -- The identifier for the device
+ device -- The device object
+ commTimeout -- Time in seconds to wait between each callback from device during
+ a data readout. Float.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ self.nodes[nodeId] = {"device": device,
+ "commTimeout": commTimeout,
+ "sourceId": sourceId,
+ "cacheType": cacheType};
+
+ def _set_authenticated(self, auth=''):
+ """ Internal testing function """
+ self.test_authenticated_from = auth;
+
+ def _get_new_seqnr(self):
+ """ Returns a unique sequence number (unique across threads) """
+ self.seqnr_lock.acquire();
+ self.last_seqnr = self.last_seqnr + 1;
+ self.seqnr_lock.release();
+ return str(self.last_seqnr);
+
+ def _handle_set_req(self, iq):
+ """
+ Event handler for reception of an Iq with set req - this is a
+ control request.
+
+ Verifies that
+ - all the requested nodes are available
+ (if no nodes are specified in the request, assume all nodes)
+ - all the control fields are available from all requested nodes
+ (if no nodes are specified in the request, assume all nodes)
+
+ If the request passes verification, the control request is passed
+ to the devices (in a separate thread).
+ If the verification fails, a setResponse with error indication
+ is sent.
+ """
+
+ error_msg = '';
+ req_ok = True;
+ missing_node = None;
+ missing_field = None;
+
+ # Authentication
+ if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
+ # Invalid authentication
+ req_ok = False;
+ error_msg = "Access denied";
+
+ # Nodes
+ process_nodes = [];
+ if len(iq['set']['nodes']) > 0:
+ for n in iq['set']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ missing_node = n['nodeId'];
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in iq['set']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - for control we need to find all in all devices, otherwise we reject
+ process_fields = [];
+ if len(iq['set']['datas']) > 0:
+ for f in iq['set']['datas']:
+ for node in self.nodes:
+ if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']];
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+ # Flag that a reply is exected when we are done
+ self.sessions[session]["reply"] = True;
+
+ self.sessions[session]["node_list"] = process_nodes;
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields);
+
+ else:
+ iq.reply();
+ iq['type'] = 'error';
+ iq['setResponse']['responseCode'] = "NotFound";
+ if missing_node is not None:
+ iq['setResponse'].add_node(missing_node);
+ if missing_field is not None:
+ iq['setResponse'].add_data(missing_field);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
+
+ def _handle_direct_set(self, msg):
+ """
+ Event handler for reception of a Message with set command - this is a
+ direct control command.
+
+ Verifies that
+ - all the requested nodes are available
+ (if no nodes are specified in the request, assume all nodes)
+ - all the control fields are available from all requested nodes
+ (if no nodes are specified in the request, assume all nodes)
+
+ If the request passes verification, the control request is passed
+ to the devices (in a separate thread).
+ If the verification fails, do nothing.
+ """
+ req_ok = True;
+
+ # Nodes
+ process_nodes = [];
+ if len(msg['set']['nodes']) > 0:
+ for n in msg['set']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in msg['set']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - for control we need to find all in all devices, otherwise we reject
+ process_fields = [];
+ if len(msg['set']['datas']) > 0:
+ for f in msg['set']['datas']:
+ for node in self.nodes:
+ if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
+ req_ok = False;
+ missing_field = f['name'];
+ error_msg = "Invalid field " + f['name'];
+ break;
+ process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']];
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": msg['from'], "to": msg['to']};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+ self.sessions[session]["reply"] = False;
+
+ self.sessions[session]["node_list"] = process_nodes;
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields);
+
+
+ def _threaded_node_request(self, session, process_fields):
+ """
+ Helper function to handle the device control in a separate thread.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to set in the devices. List of tuple format:
+ (name, datatype, value)
+ """
+ for node in self.sessions[session]["node_list"]:
+ self.sessions[session]["nodeDone"][node] = False;
+
+ for node in self.sessions[session]["node_list"]:
+ timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
+ self.sessions[session]["commTimers"][node] = timer;
+ timer.start();
+ self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback);
+
+ def _event_comm_timeout(self, session, nodeId):
+ """
+ Triggered if any of the control operations timeout.
+ Stop communicating with the failing device.
+ If the control command was an Iq request, sends a failure
+ message back to the client.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The id of the device which timed out
+ """
+
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
+ iq['setResponse']['error']['var'] = "Output";
+ iq['setResponse']['error']['text'] = "Timeout.";
+ iq.send(block=False);
+
+ ## TODO - should we send one timeout per node??
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ del self.sessions[session];
+
+ def _all_nodes_done(self, session):
+ """
+ Checks wheter all devices are done replying to the control command.
+
+ Arguments:
+ session -- The request session id
+ """
+ for n in self.sessions[session]["nodeDone"]:
+ if not self.sessions[session]["nodeDone"][n]:
+ return False;
+ return True;
+
+ def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None):
+ """
+ Callback function called by the devices when the control command is
+ complete or failed.
+ If needed, composes a message with the result and sends it back to the
+ client.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The device id which initiated the callback
+ result -- The current result status of the control command. Valid values are:
+ "error" - Set fields failed.
+ "ok" - All fields were set.
+ error_field -- [optional] Only applies when result == "error"
+ The field name that failed (usually means it is missing)
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+ """
+
+ if not session in self.sessions:
+ # This can happend if a session was deleted, like in a timeout. Just drop the data.
+ return
+
+ if result == "error":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "error";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OtherError";
+ iq['setResponse'].add_node(nodeId);
+ if error_field is not None:
+ iq['setResponse'].add_data(error_field);
+ iq['setResponse']['error']['var'] = error_field;
+ iq['setResponse']['error']['text'] = error_msg;
+ iq.send(block=False);
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ del self.sessions[session];
+ else:
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ if self.sessions[session]["reply"]:
+ # Reply is exected when we are done
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[session]['to'];
+ iq['to'] = self.sessions[session]['from'];
+ iq['type'] = "result";
+ iq['id'] = self.sessions[session]['seqnr'];
+ iq['setResponse']['responseCode'] = "OK";
+ iq.send(block=False);
+
+ # The session is complete, delete it
+ del self.sessions[session];
+
+
+ # =================================================================
+ # Client side (data controller) API
+
+ def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None):
+ """
+ Called on the client side to initiade a control request.
+ Composes a message with the request and sends it to the device(s).
+ Does not block, the callback will be called when the device(s)
+ has responded.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+ callback -- The callback function to call when data is availble.
+
+ The callback function must support the following arguments:
+
+ from_jid -- The jid of the responding device(s)
+ result -- The result of the control request. Valid values are:
+ "OK" - Control request completed successfully
+ "NotFound" - One or more nodes or fields are missing
+ "InsufficientPrivileges" - Not authorized.
+ "Locked" - Field(s) is locked and cannot
+ be changed at the moment.
+ "NotImplemented" - Request feature not implemented.
+ "FormError" - Error while setting with
+ a form (not implemented).
+ "OtherError" - Indicates other types of
+ errors, such as timeout.
+ Details in the error_msg.
+
+
+ nodeId -- [optional] Only applicable when result == "error"
+ List of node Ids of failing device(s).
+
+ fields -- [optional] Only applicable when result == "error"
+ List of fields that failed.[optional] Mandatory when result == "rejected" or "failure".
+
+ error_msg -- Details about why the request failed.
+
+ fields -- Fields to set. List of tuple format: (name, typename, value).
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ """
+ iq = self.xmpp.Iq();
+ iq['from'] = from_jid;
+ iq['to'] = to_jid;
+ seqnr = self._get_new_seqnr();
+ iq['id'] = seqnr;
+ iq['type'] = "set";
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ iq['set'].add_node(nodeId);
+ if fields is not None:
+ for name, typename, value in fields:
+ iq['set'].add_data(name=name, typename=typename, value=value);
+
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback};
+ iq.send(block=False);
+
+ def set_command(self, from_jid, to_jid, fields, nodeIds=None):
+ """
+ Called on the client side to initiade a control command.
+ Composes a message with the set commandand sends it to the device(s).
+ Does not block. Device(s) will not respond, regardless of result.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+
+ fields -- Fields to set. List of tuple format: (name, typename, value).
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ """
+ msg = self.xmpp.Message();
+ msg['from'] = from_jid;
+ msg['to'] = to_jid;
+ msg['type'] = "set";
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ msg['set'].add_node(nodeId);
+ if fields is not None:
+ for name, typename, value in fields:
+ msg['set'].add_data(name, typename, value);
+
+ # We won't get any reply, so don't create a session
+ msg.send();
+
+ def _handle_set_response(self, iq):
+ """ Received response from device(s) """
+ #print("ooh")
+ seqnr = iq['id'];
+ from_jid = str(iq['from']);
+ result = iq['setResponse']['responseCode'];
+ nodeIds = [n['name'] for n in iq['setResponse']['nodes']];
+ fields = [f['name'] for f in iq['setResponse']['datas']];
+ error_msg = None;
+
+ if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "":
+ error_msg = iq['setResponse']['error']['text'];
+
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg);
+
+
diff --git a/sleekxmpp/plugins/xep_0325/device.py b/sleekxmpp/plugins/xep_0325/device.py
new file mode 100644
index 00000000..1cb99510
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/device.py
@@ -0,0 +1,125 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+import datetime
+
+class Device(object):
+ """
+ Example implementation of a device control object.
+
+ The device object may by any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_control_field
+ set_control_fields
+ """
+
+ def __init__(self, nodeId):
+ self.nodeId = nodeId;
+ self.control_fields = {};
+
+ def has_control_field(self, field, typename):
+ """
+ Returns true if the supplied field name exists
+ and the type matches for control in this device.
+
+ Arguments:
+ field -- The field name
+ typename -- The expected type
+ """
+ if field in self.control_fields and self.control_fields[field]["type"] == typename:
+ return True;
+ return False;
+
+ def set_control_fields(self, fields, session, callback):
+ """
+ Starts a control setting procedure. Verifies the fields,
+ sets the data and (if needed) and calls the callback.
+
+ Arguments:
+ fields -- List of control fields in tuple format:
+ (name, typename, value)
+ session -- Session id, only used in the callback as identifier
+ callback -- Callback function to call when control set is complete.
+
+ The callback function must support the following arguments:
+
+ session -- Session id, as supplied in the
+ request_fields call
+ nodeId -- Identifier for this device
+ result -- The current result status of the readout.
+ Valid values are:
+ "error" - Set fields failed.
+ "ok" - All fields were set.
+ error_field -- [optional] Only applies when result == "error"
+ The field name that failed
+ (usually means it is missing)
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+ """
+
+ if len(fields) > 0:
+ # Check availiability
+ for name, typename, value in fields:
+ if not self.has_control_field(name, typename):
+ self._send_control_reject(session, name, "NotFound", callback)
+ return False;
+
+ for name, typename, value in fields:
+ self._set_field_value(name, value)
+
+ callback(session, result="ok", nodeId=self.nodeId);
+ return True
+
+ def _send_control_reject(self, session, field, message, callback):
+ """
+ Sends a reject to the caller
+
+ Arguments:
+ session -- Session id, see definition in
+ set_control_fields function
+ callback -- Callback function, see definition in
+ set_control_fields function
+ """
+ callback(session, result="error", nodeId=self.nodeId, error_field=field, error_msg=message);
+
+ def _add_control_field(self, name, typename, value):
+ """
+ Adds a control field to the device
+
+ Arguments:
+ name -- Name of the field
+ typename -- Type of the field, one of:
+ (boolean, color, string, date, dateTime,
+ double, duration, int, long, time)
+ value -- Field value
+ """
+ self.control_fields[name] = {"type": typename, "value": value};
+
+ def _set_field_value(self, name, value):
+ """
+ Set the value of a control field
+
+ Arguments:
+ name -- Name of the field
+ value -- New value for the field
+ """
+ if name in self.control_fields:
+ self.control_fields[name]["value"] = value;
+
+ def _get_field_value(self, name):
+ """
+ Get the value of a control field. Only used for unit testing.
+
+ Arguments:
+ name -- Name of the field
+ """
+ if name in self.control_fields:
+ return self.control_fields[name]["value"];
+ return None;
diff --git a/sleekxmpp/plugins/xep_0325/stanza/__init__.py b/sleekxmpp/plugins/xep_0325/stanza/__init__.py
new file mode 100644
index 00000000..746c2033
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/stanza/__init__.py
@@ -0,0 +1,12 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+from sleekxmpp.plugins.xep_0325.stanza.control import *
+
diff --git a/sleekxmpp/plugins/xep_0325/stanza/base.py b/sleekxmpp/plugins/xep_0325/stanza/base.py
new file mode 100644
index 00000000..1dadcf46
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/stanza/base.py
@@ -0,0 +1,13 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+from sleekxmpp.xmlstream import ET
+
+pass
diff --git a/sleekxmpp/plugins/xep_0325/stanza/control.py b/sleekxmpp/plugins/xep_0325/stanza/control.py
new file mode 100644
index 00000000..2707191f
--- /dev/null
+++ b/sleekxmpp/plugins/xep_0325/stanza/control.py
@@ -0,0 +1,526 @@
+"""
+ SleekXMPP: The Sleek XMPP Library
+ Implementation of xeps for Internet of Things
+ http://wiki.xmpp.org/web/Tech_pages/IoT_systems
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
+ This file is part of SleekXMPP.
+
+ See the file LICENSE for copying permission.
+"""
+
+from sleekxmpp import Iq, Message
+from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
+from re import match
+
+class Control(ElementBase):
+ """ Placeholder for the namespace, not used as a stanza """
+ namespace = 'urn:xmpp:iot:control'
+ name = 'control'
+ plugin_attrib = name
+ interfaces = set(tuple())
+
+class ControlSet(ElementBase):
+ namespace = 'urn:xmpp:iot:control'
+ name = 'set'
+ plugin_attrib = name
+ interfaces = set(['nodes','datas'])
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._nodes = set()
+ self._datas = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._nodes = set([node['nodeId'] for node in self['nodes']])
+ self._datas = set([data['name'] for data in self['datas']])
+
+ def add_node(self, nodeId, sourceId=None, cacheType=None):
+ """
+ Add a new node element. Each item is required to have a
+ nodeId, but may also specify a sourceId value and cacheType.
+
+ Arguments:
+ nodeId -- The ID for the node.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ if nodeId not in self._nodes:
+ self._nodes.add((nodeId))
+ node = RequestNode(parent=self)
+ node['nodeId'] = nodeId
+ node['sourceId'] = sourceId
+ node['cacheType'] = cacheType
+ self.iterables.append(node)
+ return node
+ return None
+
+ def del_node(self, nodeId):
+ """
+ Remove a single node.
+
+ Arguments:
+ nodeId -- Node ID of the item to remove.
+ """
+ if nodeId in self._nodes:
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ if node['nodeId'] == nodeId:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+ return True
+ return False
+
+ def get_nodes(self):
+ """Return all nodes."""
+ nodes = set()
+ for node in self['substanzas']:
+ if isinstance(node, RequestNode):
+ nodes.add(node)
+ return nodes
+
+ def set_nodes(self, nodes):
+ """
+ Set or replace all nodes. The given nodes must be in a
+ list or set where each item is a tuple of the form:
+ (nodeId, sourceId, cacheType)
+
+ Arguments:
+ nodes -- A series of nodes in tuple format.
+ """
+ self.del_nodes()
+ for node in nodes:
+ if isinstance(node, RequestNode):
+ self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
+ else:
+ nodeId, sourceId, cacheType = node
+ self.add_node(nodeId, sourceId, cacheType)
+
+ def del_nodes(self):
+ """Remove all nodes."""
+ self._nodes = set()
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+
+
+ def add_data(self, name, typename, value):
+ """
+ Add a new data element.
+
+ Arguments:
+ name -- The name of the data element
+ typename -- The type of data element
+ (boolean, color, string, date, dateTime,
+ double, duration, int, long, time)
+ value -- The value of the data element
+ """
+ if name not in self._datas:
+ dataObj = None;
+ if typename == "boolean":
+ dataObj = BooleanParameter(parent=self);
+ elif typename == "color":
+ dataObj = ColorParameter(parent=self);
+ elif typename == "string":
+ dataObj = StringParameter(parent=self);
+ elif typename == "date":
+ dataObj = DateParameter(parent=self);
+ elif typename == "dateTime":
+ dataObj = DateTimeParameter(parent=self);
+ elif typename == "double":
+ dataObj = DoubleParameter(parent=self);
+ elif typename == "duration":
+ dataObj = DurationParameter(parent=self);
+ elif typename == "int":
+ dataObj = IntParameter(parent=self);
+ elif typename == "long":
+ dataObj = LongParameter(parent=self);
+ elif typename == "time":
+ dataObj = TimeParameter(parent=self);
+
+ dataObj['name'] = name;
+ dataObj['value'] = value;
+
+ self._datas.add(name)
+ self.iterables.append(dataObj)
+ return dataObj
+ return None
+
+ def del_data(self, name):
+ """
+ Remove a single data element.
+
+ Arguments:
+ data_name -- The data element name to remove.
+ """
+ if name in self._datas:
+ datas = [i for i in self.iterables if isinstance(i, BaseParameter)]
+ for data in datas:
+ if data['name'] == name:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+ return True
+ return False
+
+ def get_datas(self):
+ """ Return all data elements. """
+ datas = set()
+ for data in self['substanzas']:
+ if isinstance(data, BaseParameter):
+ datas.add(data)
+ return datas
+
+ def set_datas(self, datas):
+ """
+ Set or replace all data elements. The given elements must be in a
+ list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum)
+
+ Arguments:
+ datas -- A series of data elements.
+ """
+ self.del_datas()
+ for data in datas:
+ self.add_data(name=data['name'], typename=data._get_typename(), value=data['value'])
+
+ def del_datas(self):
+ """Remove all data elements."""
+ self._datas = set()
+ datas = [i for i in self.iterables if isinstance(i, BaseParameter)]
+ for data in datas:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+
+
+class RequestNode(ElementBase):
+ """ Node element in a request """
+ namespace = 'urn:xmpp:iot:control'
+ name = 'node'
+ plugin_attrib = name
+ interfaces = set(['nodeId','sourceId','cacheType'])
+
+
+class ControlSetResponse(ElementBase):
+ namespace = 'urn:xmpp:iot:control'
+ name = 'setResponse'
+ plugin_attrib = name
+ interfaces = set(['responseCode'])
+
+ def __init__(self, xml=None, parent=None):
+ ElementBase.__init__(self, xml, parent);
+ self._nodes = set()
+ self._datas = set()
+
+ def setup(self, xml=None):
+ """
+ Populate the stanza object using an optional XML object.
+
+ Overrides ElementBase.setup
+
+ Caches item information.
+
+ Arguments:
+ xml -- Use an existing XML object for the stanza's values.
+ """
+ ElementBase.setup(self, xml)
+ self._nodes = set([node['nodeId'] for node in self['nodes']])
+ self._datas = set([data['name'] for data in self['datas']])
+
+ def add_node(self, nodeId, sourceId=None, cacheType=None):
+ """
+ Add a new node element. Each item is required to have a
+ nodeId, but may also specify a sourceId value and cacheType.
+
+ Arguments:
+ nodeId -- The ID for the node.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ if nodeId not in self._nodes:
+ self._nodes.add(nodeId)
+ node = RequestNode(parent=self)
+ node['nodeId'] = nodeId
+ node['sourceId'] = sourceId
+ node['cacheType'] = cacheType
+ self.iterables.append(node)
+ return node
+ return None
+
+ def del_node(self, nodeId):
+ """
+ Remove a single node.
+
+ Arguments:
+ nodeId -- Node ID of the item to remove.
+ """
+ if nodeId in self._nodes:
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ if node['nodeId'] == nodeId:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+ return True
+ return False
+
+ def get_nodes(self):
+ """Return all nodes."""
+ nodes = set()
+ for node in self['substanzas']:
+ if isinstance(node, RequestNode):
+ nodes.add(node)
+ return nodes
+
+ def set_nodes(self, nodes):
+ """
+ Set or replace all nodes. The given nodes must be in a
+ list or set where each item is a tuple of the form:
+ (nodeId, sourceId, cacheType)
+
+ Arguments:
+ nodes -- A series of nodes in tuple format.
+ """
+ self.del_nodes()
+ for node in nodes:
+ if isinstance(node, RequestNode):
+ self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
+ else:
+ nodeId, sourceId, cacheType = node
+ self.add_node(nodeId, sourceId, cacheType)
+
+ def del_nodes(self):
+ """Remove all nodes."""
+ self._nodes = set()
+ nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
+ for node in nodes:
+ self.xml.remove(node.xml)
+ self.iterables.remove(node)
+
+
+ def add_data(self, name):
+ """
+ Add a new ResponseParameter element.
+
+ Arguments:
+ name -- Name of the parameter
+ """
+ if name not in self._datas:
+ self._datas.add(name)
+ data = ResponseParameter(parent=self)
+ data['name'] = name;
+ self.iterables.append(data)
+ return data
+ return None
+
+ def del_data(self, name):
+ """
+ Remove a single ResponseParameter element.
+
+ Arguments:
+ name -- The data element name to remove.
+ """
+ if name in self._datas:
+ datas = [i for i in self.iterables if isinstance(i, ResponseParameter)]
+ for data in datas:
+ if data['name'] == name:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+ return True
+ return False
+
+ def get_datas(self):
+ """ Return all ResponseParameter elements. """
+ datas = set()
+ for data in self['substanzas']:
+ if isinstance(data, ResponseParameter):
+ datas.add(data)
+ return datas
+
+ def set_datas(self, datas):
+ """
+ Set or replace all data elements. The given elements must be in a
+ list or set of ResponseParameter elements
+
+ Arguments:
+ datas -- A series of data element names.
+ """
+ self.del_datas()
+ for data in datas:
+ self.add_data(name=data['name'])
+
+ def del_datas(self):
+ """Remove all ResponseParameter elements."""
+ self._datas = set()
+ datas = [i for i in self.iterables if isinstance(i, ResponseParameter)]
+ for data in datas:
+ self.xml.remove(data.xml)
+ self.iterables.remove(data)
+
+
+class Error(ElementBase):
+ namespace = 'urn:xmpp:iot:control'
+ name = 'error'
+ plugin_attrib = name
+ interfaces = set(['var','text'])
+
+ def get_text(self):
+ """Return then contents inside the XML tag."""
+ return self.xml.text
+
+ def set_text(self, value):
+ """Set then contents inside the XML tag.
+
+ Arguments:
+ value -- string
+ """
+
+ self.xml.text = value;
+ return self
+
+ def del_text(self):
+ """Remove the contents inside the XML tag."""
+ self.xml.text = ""
+ return self
+
+class ResponseParameter(ElementBase):
+ """
+ Parameter element in ControlSetResponse.
+ """
+ namespace = 'urn:xmpp:iot:control'
+ name = 'parameter'
+ plugin_attrib = name
+ interfaces = set(['name']);
+
+
+class BaseParameter(ElementBase):
+ """
+ Parameter element in SetCommand. This is a base class,
+ all instances of parameters added to SetCommand must be of types:
+ BooleanParameter
+ ColorParameter
+ StringParameter
+ DateParameter
+ DateTimeParameter
+ DoubleParameter
+ DurationParameter
+ IntParameter
+ LongParameter
+ TimeParameter
+ """
+ namespace = 'urn:xmpp:iot:control'
+ name = 'baseParameter'
+ plugin_attrib = name
+ interfaces = set(['name','value']);
+
+ def _get_typename(self):
+ return self.name;
+
+class BooleanParameter(BaseParameter):
+ """
+ Field data of type boolean.
+ Note that the value is expressed as a string.
+ """
+ name = 'boolean'
+ plugin_attrib = name
+
+class ColorParameter(BaseParameter):
+ """
+ Field data of type color.
+ Note that the value is expressed as a string.
+ """
+ name = 'color'
+ plugin_attrib = name
+
+class StringParameter(BaseParameter):
+ """
+ Field data of type string.
+ """
+ name = 'string'
+ plugin_attrib = name
+
+class DateParameter(BaseParameter):
+ """
+ Field data of type date.
+ Note that the value is expressed as a string.
+ """
+ name = 'date'
+ plugin_attrib = name
+
+class DateTimeParameter(BaseParameter):
+ """
+ Field data of type dateTime.
+ Note that the value is expressed as a string.
+ """
+ name = 'dateTime'
+ plugin_attrib = name
+
+class DoubleParameter(BaseParameter):
+ """
+ Field data of type double.
+ Note that the value is expressed as a string.
+ """
+ name = 'double'
+ plugin_attrib = name
+
+class DurationParameter(BaseParameter):
+ """
+ Field data of type duration.
+ Note that the value is expressed as a string.
+ """
+ name = 'duration'
+ plugin_attrib = name
+
+class IntParameter(BaseParameter):
+ """
+ Field data of type int.
+ Note that the value is expressed as a string.
+ """
+ name = 'int'
+ plugin_attrib = name
+
+class LongParameter(BaseParameter):
+ """
+ Field data of type long (64-bit int).
+ Note that the value is expressed as a string.
+ """
+ name = 'long'
+ plugin_attrib = name
+
+class TimeParameter(BaseParameter):
+ """
+ Field data of type time.
+ Note that the value is expressed as a string.
+ """
+ name = 'time'
+ plugin_attrib = name
+
+register_stanza_plugin(Iq, ControlSet)
+register_stanza_plugin(Message, ControlSet)
+
+register_stanza_plugin(ControlSet, RequestNode, iterable=True)
+
+register_stanza_plugin(ControlSet, BooleanParameter, iterable=True)
+register_stanza_plugin(ControlSet, ColorParameter, iterable=True)
+register_stanza_plugin(ControlSet, StringParameter, iterable=True)
+register_stanza_plugin(ControlSet, DateParameter, iterable=True)
+register_stanza_plugin(ControlSet, DateTimeParameter, iterable=True)
+register_stanza_plugin(ControlSet, DoubleParameter, iterable=True)
+register_stanza_plugin(ControlSet, DurationParameter, iterable=True)
+register_stanza_plugin(ControlSet, IntParameter, iterable=True)
+register_stanza_plugin(ControlSet, LongParameter, iterable=True)
+register_stanza_plugin(ControlSet, TimeParameter, iterable=True)
+
+register_stanza_plugin(Iq, ControlSetResponse)
+register_stanza_plugin(ControlSetResponse, Error)
+register_stanza_plugin(ControlSetResponse, RequestNode, iterable=True)
+register_stanza_plugin(ControlSetResponse, ResponseParameter, iterable=True)
+