summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0323/sensordata.py
diff options
context:
space:
mode:
authorJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
committerJoachim Lindborg <Joachim.Lindborg@lsys.se>2013-08-30 02:29:52 +0200
commit45689fd8799186fd6be0b308745aef428ab50dcc (patch)
tree0dd4e5f41684ad8f4b31920f1b8b9d3e27fd1527 /sleekxmpp/plugins/xep_0323/sensordata.py
parentb7adaafb3ecb0a615c93fbb1830e66357b081fe3 (diff)
downloadslixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.gz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.bz2
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.tar.xz
slixmpp-45689fd8799186fd6be0b308745aef428ab50dcc.zip
First implementation of the xep_0323 and xep_325 used in IoT systems. Tests are added for stanza and streams
Diffstat (limited to 'sleekxmpp/plugins/xep_0323/sensordata.py')
-rw-r--r--sleekxmpp/plugins/xep_0323/sensordata.py699
1 files changed, 676 insertions, 23 deletions
diff --git a/sleekxmpp/plugins/xep_0323/sensordata.py b/sleekxmpp/plugins/xep_0323/sensordata.py
index 1343069d..ff671663 100644
--- a/sleekxmpp/plugins/xep_0323/sensordata.py
+++ b/sleekxmpp/plugins/xep_0323/sensordata.py
@@ -2,19 +2,25 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
- Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
+ Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
+import time
+import datetime
+from threading import Thread, Lock, Timer
+
+from sleekxmpp.plugins.xep_0323.timerreset import TimerReset
from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.plugins.base import BasePlugin
from sleekxmpp.plugins.xep_0323 import stanza
+from sleekxmpp.plugins.xep_0323.stanza import Sensordata
log = logging.getLogger(__name__)
@@ -23,43 +29,690 @@ log = logging.getLogger(__name__)
class XEP_0323(BasePlugin):
"""
- XEP-0323 IoT Sensor Data
+ XEP-0323: IoT Sensor Data
+
+
+ This XEP provides the underlying architecture, basic operations and data
+ structures for sensor data communication over XMPP networks. It includes
+ a hardware abstraction model, removing any technical detail implemented
+ in underlying technologies.
+
+ Also see <http://xmpp.org/extensions/xep-0323.html>
+
+ Configuration Values:
+ threaded -- Indicates if communication with sensors should be threaded.
+ Defaults to True.
+
+ Events:
+ Sensor side
+ -----------
+ Sensordata Event:Req -- Received a request for data
+ Sensordata Event:Cancel -- Received a cancellation for a request
+
+ Client side
+ -----------
+ Sensordata Event:Accepted -- Received a accept from sensor for a request
+ Sensordata Event:Rejected -- Received a reject from sensor for a request
+ Sensordata Event:Cancelled -- Received a cancel confirm from sensor
+ Sensordata Event:Fields -- Received fields from sensor for a request
+ This may be triggered multiple times since
+ the sensor can split up its response in
+ multiple messages.
+ Sensordata Event:Failure -- Received a failure indication from sensor
+ for a request. Typically a comm timeout.
+
+ Attributes:
+ threaded -- Indicates if command events should be threaded.
+ Defaults to True.
+ sessions -- A dictionary or equivalent backend mapping
+ session IDs to dictionaries containing data
+ relevant to a request's session. This dictionary is used
+ both by the client and sensor side. On client side, seqnr
+ is used as key, while on sensor side, a session_id is used
+ as key. This ensures that the two will not collide, so
+ one instance can be both client and sensor.
+ Sensor side
+ -----------
+ nodes -- A dictionary mapping sensor nodes that are serviced through
+ this XMPP instance to their device handlers ("drivers").
+ Client side
+ -----------
+ last_seqnr -- The last used sequence number (integer). One sequence of
+ communication (e.g. -->request, <--accept, <--fields)
+ between client and sensor is identified by a unique
+ sequence number (unique between the client/sensor pair)
+
+ Methods:
+ plugin_init -- Overrides base_plugin.plugin_init
+ post_init -- Overrides base_plugin.post_init
+ plugin_end -- Overrides base_plugin.plugin_end
+
+ Sensor side
+ -----------
+ register_node -- Register a sensor as available from this XMPP
+ instance.
+
+ Client side
+ -----------
+ request_data -- Initiates a request for data from one or more
+ sensors. Non-blocking, a callback function will
+ be called when data is available.
+
"""
name = 'xep_0323'
description = 'XEP-0323 Internet of Things - Sensor Data'
- dependencies = set(['xep_0030']) # set(['xep_0030', 'xep_0004', 'xep_0082', 'xep_0131'])
+ dependencies = set(['xep_0030'])
stanza = stanza
+
+ default_config = {
+ 'threaded': True
+# 'session_db': None
+ }
+
def plugin_init(self):
- pass
- # self.node_event_map = {}
+ """ Start the XEP-0323 plugin """
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Req',
+ StanzaPath('iq@type=get/req'),
+ self._handle_event_req))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Accepted',
+ StanzaPath('iq@type=result/accepted'),
+ self._handle_event_accepted))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Rejected',
+ StanzaPath('iq@type=error/rejected'),
+ self._handle_event_rejected))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Cancel',
+ StanzaPath('iq@type=get/cancel'),
+ self._handle_event_cancel))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Cancelled',
+ StanzaPath('iq@type=result/cancelled'),
+ self._handle_event_cancelled))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Fields',
+ StanzaPath('message/fields'),
+ self._handle_event_fields))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Failure',
+ StanzaPath('message/failure'),
+ self._handle_event_failure))
+
+ self.xmpp.register_handler(
+ Callback('Sensordata Event:Started',
+ StanzaPath('message/started'),
+ self._handle_event_started))
+
+ # Server side dicts
+ self.nodes = {};
+ self.sessions = {};
+
+ self.last_seqnr = 0;
+ self.seqnr_lock = Lock();
+
+ ## For testning only
+ self.test_authenticated_from = ""
+
+ def post_init(self):
+ """ Init complete. Register our features in Serivce discovery. """
+ BasePlugin.post_init(self)
+ self.xmpp['xep_0030'].add_feature(Sensordata.namespace)
+ self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple())
- # self.xmpp.register_handler(
- # Callback('Sensordata Event: Get',
- # StanzaPath('message/sensordata_event/get'),
- # self._handle_event_get))
+ def _new_session(self):
+ """ Return a new session ID. """
+ return str(time.time()) + '-' + self.xmpp.new_id()
def plugin_end(self):
- # self.xmpp.remove_handler('Sensordata Event: Get')
- pass
-
- def get_value(self, jid, msg):
+ """ Stop the XEP-0323 plugin """
+ self.sessions.clear();
+ self.xmpp.remove_handler('Sensordata Event:Req')
+ self.xmpp.remove_handler('Sensordata Event:Accepted')
+ self.xmpp.remove_handler('Sensordata Event:Rejected')
+ self.xmpp.remove_handler('Sensordata Event:Cancel')
+ self.xmpp.remove_handler('Sensordata Event:Cancelled')
+ self.xmpp.remove_handler('Sensordata Event:Fields')
+ self.xmpp['xep_0030'].del_feature(feature=Sensordata.namespace)
+ self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple());
+
+
+ # =================================================================
+ # Sensor side (data provider) API
+
+ def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
+ """
+ Register a sensor/device as available for serving of data through this XMPP
+ instance.
+
+ The device object may by any custom implementation to support
+ specific devices, but it must implement the functions:
+ has_field
+ request_fields
+ according to the interfaces shown in the example device.py file.
+
+ Arguments:
+ nodeId -- The identifier for the device
+ device -- The device object
+ commTimeout -- Time in seconds to wait between each callback from device during
+ a data readout. Float.
+ sourceId -- [optional] identifying the data source controlling the device
+ cacheType -- [optional] narrowing down the search to a specific kind of node
+ """
+ self.nodes[nodeId] = {"device": device,
+ "commTimeout": commTimeout,
+ "sourceId": sourceId,
+ "cacheType": cacheType};
+
+ def _set_authenticated(self, auth=''):
+ """ Internal testing function """
+ self.test_authenticated_from = auth;
+
+
+ def _handle_event_req(self, iq):
+ """
+ Event handler for reception of an Iq with req - this is a request.
+
+ Verifies that
+ - all the requested nodes are available
+ - at least one of the requested fields is available from at least
+ one of the nodes
+
+ If the request passes verification, an accept response is sent, and
+ the readout process is started in a separate thread.
+ If the verification fails, a reject message is sent.
+ """
+
+ seqnr = iq['req']['seqnr'];
+ error_msg = '';
+ req_ok = True;
+
+ # Authentication
+ if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
+ # Invalid authentication
+ req_ok = False;
+ error_msg = "Access denied";
+
+ # Nodes
+ process_nodes = [];
+ if len(iq['req']['nodes']) > 0:
+ for n in iq['req']['nodes']:
+ if not n['nodeId'] in self.nodes:
+ req_ok = False;
+ error_msg = "Invalid nodeId " + n['nodeId'];
+ process_nodes = [n['nodeId'] for n in iq['req']['nodes']];
+ else:
+ process_nodes = self.nodes.keys();
+
+ # Fields - if we just find one we are happy, otherwise we reject
+ process_fields = [];
+ if len(iq['req']['fields']) > 0:
+ found = False
+ for f in iq['req']['fields']:
+ for node in self.nodes:
+ if self.nodes[node]["device"].has_field(f['name']):
+ found = True;
+ break;
+ if not found:
+ req_ok = False;
+ error_msg = "Invalid field " + f['name'];
+ process_fields = [f['name'] for n in iq['req']['fields']];
+
+ req_flags = iq['req']._get_flags();
+
+ request_delay_sec = None
+ if 'when' in req_flags:
+ # Timed request - requires datetime string in iso format
+ # ex. 2013-04-05T15:00:03
+ dt = None
+ try:
+ dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ req_ok = False;
+ error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)."
+
+ if not dt is None:
+ # Datetime properly formatted
+ dtnow = datetime.datetime.now()
+ dtdiff = dt - dtnow
+ request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600
+ if request_delay_sec <= 0:
+ req_ok = False;
+ error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat();
+
+ if req_ok:
+ session = self._new_session();
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr};
+ self.sessions[session]["commTimers"] = {};
+ self.sessions[session]["nodeDone"] = {};
+
+ #print("added session: " + str(self.sessions))
+
+ iq.reply();
+ iq['accepted']['seqnr'] = seqnr;
+ if not request_delay_sec is None:
+ iq['accepted']['queued'] = "true"
+ iq.send(block=False);
+
+ self.sessions[session]["node_list"] = process_nodes;
+
+ if not request_delay_sec is None:
+ # Delay request to requested time
+ timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags))
+ self.sessions[session]["commTimers"]["delaytimer"] = timer;
+ timer.start();
+ return
+
+ if self.threaded:
+ #print("starting thread")
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
+ tr_req.start()
+ #print("started thread")
+ else:
+ self._threaded_node_request(session, process_fields, req_flags);
+
+ else:
+ iq.reply();
+ iq['type'] = 'error';
+ iq['rejected']['seqnr'] = seqnr;
+ iq['rejected']['error'] = error_msg;
+ iq.send(block=False);
+
+ def _threaded_node_request(self, session, process_fields, flags):
+ """
+ Helper function to handle the device readouts in a separate thread.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to request from the devices
+ flags -- [optional] flags to pass to the devices, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ """
+ for node in self.sessions[session]["node_list"]:
+ self.sessions[session]["nodeDone"][node] = False;
+
+ for node in self.sessions[session]["node_list"]:
+ timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
+ self.sessions[session]["commTimers"][node] = timer;
+ #print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout']))
+ timer.start();
+ self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback);
+
+ def _event_comm_timeout(self, session, nodeId):
+ """
+ Triggered if any of the readout operations timeout.
+ Sends a failure message back to the client, stops communicating
+ with the failing device.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The id of the device which timed out
+ """
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
+ msg['failure']['error']['text'] = "Timeout";
+ msg['failure']['error']['nodeId'] = nodeId;
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ msg['failure']['done'] = 'true';
+ msg.send();
+ # The session is complete, delete it
+ #print("del session " + session + " due to timeout")
+ del self.sessions[session];
+
+ def _event_delayed_req(self, session, process_fields, req_flags):
+ """
+ Triggered when the timer from a delayed request fires.
+
+ Arguments:
+ session -- The request session id
+ process_fields -- The fields to request from the devices
+ flags -- [optional] flags to pass to the devices, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
"""
- Recieving a stanza for erading values
- # verify provisioning
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['started']['seqnr'] = self.sessions[session]['seqnr'];
+ msg.send();
+
+ if self.threaded:
+ tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
+ tr_req.start()
+ else:
+ self._threaded_node_request(session, process_fields, req_flags);
+
+ def _all_nodes_done(self, session):
+ """
+ Checks wheter all devices are done replying to the readout.
+
+ Arguments:
+ session -- The request session id
+ """
+ for n in self.sessions[session]["nodeDone"]:
+ if not self.sessions[session]["nodeDone"][n]:
+ return False;
+ return True;
+
+ def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None):
+ """
+ Callback function called by the devices when they have any additional data.
+ Composes a message with the data and sends it back to the client, and resets
+ the timeout timer for the device.
+
+ Arguments:
+ session -- The request session id
+ nodeId -- The device id which initiated the callback
+ result -- The current result status of the readout. Valid values are:
+ "error" - Readout failed.
+ "fields" - Contains readout data.
+ "done" - Indicates that the readout is complete. May contain
+ readout data.
+ timestamp_block -- [optional] Only applies when result != "error"
+ The readout data. Structured as a dictionary:
+ {
+ timestamp: timestamp for this datablock,
+ fields: list of field dictionary (one per readout field).
+ readout field dictionary format:
+ {
+ type: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
+ name: The field name
+ value: The field value
+ unit: The unit of the field. Only applies to type numeric.
+ dataType: The datatype of the field. Only applies to type enum.
+ flags: [optional] data classifier flags for the field, e.g. momentary
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ }
+ }
+ error_msg -- [optional] Only applies when result == "error".
+ Error details when a request failed.
+ """
+ if not session in self.sessions:
+ # This can happend if a session was deleted, like in a cancellation. Just drop the data.
+ return
+
+ if result == "error":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
+ msg['failure']['error']['text'] = error_msg;
+ msg['failure']['error']['nodeId'] = nodeId;
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+
+ # Drop communication with this device and check if we are done
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ if (self._all_nodes_done(session)):
+ msg['failure']['done'] = 'true';
+ # The session is complete, delete it
+ # print("del session " + session + " due to error")
+ del self.sessions[session];
+ msg.send();
+ else:
+ msg = self.xmpp.Message();
+ msg['from'] = self.sessions[session]['to'];
+ msg['to'] = self.sessions[session]['from'];
+ msg['fields']['seqnr'] = self.sessions[session]['seqnr'];
+
+ if timestamp_block is not None and len(timestamp_block) > 0:
+ node = msg['fields'].add_node(nodeId);
+ ts = node.add_timestamp(timestamp_block["timestamp"]);
+
+ for f in timestamp_block["fields"]:
+ data = ts.add_data( typename=f['type'],
+ name=f['name'],
+ value=f['value'],
+ unit=f['unit'],
+ dataType=f['dataType'],
+ flags=f['flags']);
+
+ if result == "done":
+ self.sessions[session]["commTimers"][nodeId].cancel();
+ self.sessions[session]["nodeDone"][nodeId] = True;
+ msg['fields']['done'] = 'true';
+ if (self._all_nodes_done(session)):
+ # The session is complete, delete it
+ # print("del session " + session + " due to complete")
+ del self.sessions[session];
+ else:
+ # Restart comm timer
+ self.sessions[session]["commTimers"][nodeId].reset();
+
+ msg.send();
+
+ def _handle_event_cancel(self, iq):
+ """ Received Iq with cancel - this is a cancel request.
+ Delete the session and confirm. """
- # verify requested values and categories
+ seqnr = iq['cancel']['seqnr'];
+ # Find the session
+ for s in self.sessions:
+ if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr:
+ # found it. Cancel all timers
+ for n in self.sessions[s]["commTimers"]:
+ self.sessions[s]["commTimers"][n].cancel();
- # Send accepted
- # Thread of the readout
+ # Confirm
+ iq.reply();
+ iq['type'] = 'result';
+ iq['cancelled']['seqnr'] = seqnr;
+ iq.send(block=False);
+
+ # Delete session
+ del self.sessions[s]
+ return
- # send started
+ # Could not find session, send reject
+ iq.reply();
+ iq['type'] = 'error';
+ iq['rejected']['seqnr'] = seqnr;
+ iq['rejected']['error'] = "Cancel request received, no matching request is active.";
+ iq.send(block=False);
- # send data messages
+ # =================================================================
+ # Client side (data retriever) API
- # send done
- """
- pass
+ def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None):
+ """
+ Called on the client side to initiade a data readout.
+ Composes a message with the request and sends it to the device(s).
+ Does not block, the callback will be called when data is available.
+
+ Arguments:
+ from_jid -- The jid of the requester
+ to_jid -- The jid of the device(s)
+ callback -- The callback function to call when data is availble.
+
+ The callback function must support the following arguments:
+
+ from_jid -- The jid of the responding device(s)
+ result -- The current result status of the readout. Valid values are:
+ "accepted" - Readout request accepted
+ "queued" - Readout request accepted and queued
+ "rejected" - Readout request rejected
+ "failure" - Readout failed.
+ "cancelled" - Confirmation of request cancellation.
+ "started" - Previously queued request is now started
+ "fields" - Contains readout data.
+ "done" - Indicates that the readout is complete.
+
+ nodeId -- [optional] Mandatory when result == "fields" or "failure".
+ The node Id of the responding device. One callback will only
+ contain data from one device.
+ timestamp -- [optional] Mandatory when result == "fields".
+ The timestamp of data in this callback. One callback will only
+ contain data from one timestamp.
+ fields -- [optional] Mandatory when result == "fields".
+ List of field dictionaries representing the readout data.
+ Dictionary format:
+ {
+ typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
+ name: The field name
+ value: The field value
+ unit: The unit of the field. Only applies to type numeric.
+ dataType: The datatype of the field. Only applies to type enum.
+ flags: [optional] data classifier flags for the field, e.g. momentary.
+ Formatted as a dictionary like { "flag name": "flag value" ... }
+ }
+
+ error_msg -- [optional] Mandatory when result == "rejected" or "failure".
+ Details about why the request is rejected or failed.
+ "rejected" means that the request is stopped, but note that the
+ request will continue even after a "failure". "failure" only means
+ that communication was stopped to that specific device, other
+ device(s) (if any) will continue their readout.
+
+ nodeIds -- [optional] Limits the request to the node Ids in this list.
+ fields -- [optional] Limits the request to the field names in this list.
+ flags -- [optional] Limits the request according to the flags, or sets
+ readout conditions such as timing.
+
+ Return value:
+ session -- Session identifier. Client can use this as a reference to cancel
+ the request.
+ """
+ iq = self.xmpp.Iq();
+ iq['from'] = from_jid;
+ iq['to'] = to_jid;
+ iq['type'] = "get";
+ seqnr = self._get_new_seqnr();
+ iq['id'] = seqnr;
+ iq['req']['seqnr'] = seqnr;
+ if nodeIds is not None:
+ for nodeId in nodeIds:
+ iq['req'].add_node(nodeId);
+ if fields is not None:
+ for field in fields:
+ iq['req'].add_field(field);
+
+ iq['req']._set_flags(flags);
+
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback};
+ iq.send(block=False);
+
+ return seqnr;
+
+ def cancel_request(self, session):
+ """
+ Called on the client side to cancel a request for data readout.
+ Composes a message with the cancellation and sends it to the device(s).
+ Does not block, the callback will be called when cancellation is
+ confirmed.
+
+ Arguments:
+ session -- The session id of the request to cancel
+ """
+ seqnr = session
+ iq = self.xmpp.Iq();
+ iq['from'] = self.sessions[seqnr]['from']
+ iq['to'] = self.sessions[seqnr]['to'];
+ iq['type'] = "get";
+ iq['id'] = seqnr;
+ iq['cancel']['seqnr'] = seqnr;
+ iq.send(block=False);
+
+ def _get_new_seqnr(self):
+ """ Returns a unique sequence number (unique across threads) """
+ self.seqnr_lock.acquire();
+ self.last_seqnr = self.last_seqnr + 1;
+ self.seqnr_lock.release();
+ return str(self.last_seqnr);
+
+ def _handle_event_accepted(self, iq):
+ """ Received Iq with accepted - request was accepted """
+ seqnr = iq['accepted']['seqnr'];
+ result = "accepted"
+ if iq['accepted']['queued'] == 'true':
+ result = "queued"
+
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result=result);
+
+ def _handle_event_rejected(self, iq):
+ """ Received Iq with rejected - this is a reject.
+ Delete the session. """
+ seqnr = iq['rejected']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']);
+ # Session terminated
+ del self.sessions[seqnr];
+
+ def _handle_event_cancelled(self, iq):
+ """
+ Received Iq with cancelled - this is a cancel confirm.
+ Delete the session.
+ """
+ #print("Got cancelled")
+ seqnr = iq['cancelled']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=iq['from'], result="cancelled");
+ # Session cancelled
+ del self.sessions[seqnr];
+
+ def _handle_event_fields(self, msg):
+ """
+ Received Msg with fields - this is a data reponse to a request.
+ If this is the last data block, issue a "done" callback.
+ """
+ seqnr = msg['fields']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ for node in msg['fields']['nodes']:
+ for ts in node['timestamps']:
+ fields = [];
+ for d in ts['datas']:
+ field_block = {};
+ field_block["name"] = d['name'];
+ field_block["typename"] = d._get_typename();
+ field_block["value"] = d['value'];
+ if not d['unit'] == "": field_block["unit"] = d['unit'];
+ if not d['dataType'] == "": field_block["dataType"] = d['dataType'];
+ flags = d._get_flags();
+ if not len(flags) == 0:
+ field_block["flags"] = flags;
+ fields.append(field_block);
+
+ callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields);
+
+ if msg['fields']['done'] == "true":
+ callback(from_jid=msg['from'], result="done");
+ # Session done
+ del self.sessions[seqnr];
+
+ def _handle_event_failure(self, msg):
+ """
+ Received Msg with failure - our request failed
+ Delete the session.
+ """
+ seqnr = msg['failure']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']);
+
+ # Session failed
+ del self.sessions[seqnr];
+
+ def _handle_event_started(self, msg):
+ """
+ Received Msg with started - our request was queued and is now started.
+ """
+ seqnr = msg['started']['seqnr'];
+ callback = self.sessions[seqnr]["callback"];
+ callback(from_jid=msg['from'], result="started");