summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0323/sensordata.py
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp/plugins/xep_0323/sensordata.py')
-rw-r--r--slixmpp/plugins/xep_0323/sensordata.py318
1 files changed, 159 insertions, 159 deletions
diff --git a/slixmpp/plugins/xep_0323/sensordata.py b/slixmpp/plugins/xep_0323/sensordata.py
index 41a3b58d..0e7f7028 100644
--- a/slixmpp/plugins/xep_0323/sensordata.py
+++ b/slixmpp/plugins/xep_0323/sensordata.py
@@ -155,11 +155,11 @@ class XEP_0323(BasePlugin):
self._handle_event_started))
# Server side dicts
- self.nodes = {};
- self.sessions = {};
+ self.nodes = {}
+ self.sessions = {}
- self.last_seqnr = 0;
- self.seqnr_lock = Lock();
+ self.last_seqnr = 0
+ self.seqnr_lock = Lock()
## For testning only
self.test_authenticated_from = ""
@@ -182,7 +182,7 @@ class XEP_0323(BasePlugin):
def plugin_end(self):
""" Stop the XEP-0323 plugin """
- self.sessions.clear();
+ self.sessions.clear()
self.xmpp.remove_handler('Sensordata Event:Req')
self.xmpp.remove_handler('Sensordata Event:Accepted')
self.xmpp.remove_handler('Sensordata Event:Rejected')
@@ -217,11 +217,11 @@ class XEP_0323(BasePlugin):
self.nodes[nodeId] = {"device": device,
"commTimeout": commTimeout,
"sourceId": sourceId,
- "cacheType": cacheType};
+ "cacheType": cacheType}
def _set_authenticated(self, auth=''):
""" Internal testing function """
- self.test_authenticated_from = auth;
+ self.test_authenticated_from = auth
def _handle_event_req(self, iq):
@@ -238,42 +238,42 @@ class XEP_0323(BasePlugin):
If the verification fails, a reject message is sent.
"""
- seqnr = iq['req']['seqnr'];
- error_msg = '';
- req_ok = True;
+ seqnr = iq['req']['seqnr']
+ error_msg = ''
+ req_ok = True
# Authentication
if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
# Invalid authentication
- req_ok = False;
- error_msg = "Access denied";
+ req_ok = False
+ error_msg = "Access denied"
# Nodes
- process_nodes = [];
+ process_nodes = []
if len(iq['req']['nodes']) > 0:
for n in iq['req']['nodes']:
if not n['nodeId'] in self.nodes:
- req_ok = False;
- error_msg = "Invalid nodeId " + n['nodeId'];
- process_nodes = [n['nodeId'] for n in iq['req']['nodes']];
+ req_ok = False
+ error_msg = "Invalid nodeId " + n['nodeId']
+ process_nodes = [n['nodeId'] for n in iq['req']['nodes']]
else:
- process_nodes = self.nodes.keys();
+ process_nodes = self.nodes.keys()
# Fields - if we just find one we are happy, otherwise we reject
- process_fields = [];
+ process_fields = []
if len(iq['req']['fields']) > 0:
found = False
for f in iq['req']['fields']:
for node in self.nodes:
if self.nodes[node]["device"].has_field(f['name']):
- found = True;
- break;
+ found = True
+ break
if not found:
- req_ok = False;
- error_msg = "Invalid field " + f['name'];
- process_fields = [f['name'] for n in iq['req']['fields']];
+ req_ok = False
+ error_msg = "Invalid field " + f['name']
+ process_fields = [f['name'] for n in iq['req']['fields']]
- req_flags = iq['req']._get_flags();
+ req_flags = iq['req']._get_flags()
request_delay_sec = None
if 'when' in req_flags:
@@ -283,7 +283,7 @@ class XEP_0323(BasePlugin):
try:
dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S")
except ValueError:
- req_ok = False;
+ req_ok = False
error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)."
if not dt is None:
@@ -292,30 +292,30 @@ class XEP_0323(BasePlugin):
dtdiff = dt - dtnow
request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600
if request_delay_sec <= 0:
- req_ok = False;
- error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat();
+ req_ok = False
+ error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat()
if req_ok:
- session = self._new_session();
- self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr};
- self.sessions[session]["commTimers"] = {};
- self.sessions[session]["nodeDone"] = {};
+ session = self._new_session()
+ self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr}
+ self.sessions[session]["commTimers"] = {}
+ self.sessions[session]["nodeDone"] = {}
#print("added session: " + str(self.sessions))
- iq.reply();
- iq['accepted']['seqnr'] = seqnr;
+ iq.reply()
+ iq['accepted']['seqnr'] = seqnr
if not request_delay_sec is None:
iq['accepted']['queued'] = "true"
- iq.send(block=False);
+ iq.send(block=False)
- self.sessions[session]["node_list"] = process_nodes;
+ self.sessions[session]["node_list"] = process_nodes
if not request_delay_sec is None:
# Delay request to requested time
timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags))
- self.sessions[session]["commTimers"]["delaytimer"] = timer;
- timer.start();
+ self.sessions[session]["commTimers"]["delaytimer"] = timer
+ timer.start()
return
if self.threaded:
@@ -324,14 +324,14 @@ class XEP_0323(BasePlugin):
tr_req.start()
#print("started thread")
else:
- self._threaded_node_request(session, process_fields, req_flags);
+ self._threaded_node_request(session, process_fields, req_flags)
else:
- iq.reply();
- iq['type'] = 'error';
- iq['rejected']['seqnr'] = seqnr;
- iq['rejected']['error'] = error_msg;
- iq.send(block=False);
+ iq.reply()
+ iq['type'] = 'error'
+ iq['rejected']['seqnr'] = seqnr
+ iq['rejected']['error'] = error_msg
+ iq.send(block=False)
def _threaded_node_request(self, session, process_fields, flags):
"""
@@ -344,14 +344,14 @@ class XEP_0323(BasePlugin):
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
for node in self.sessions[session]["node_list"]:
- self.sessions[session]["nodeDone"][node] = False;
+ self.sessions[session]["nodeDone"][node] = False
for node in self.sessions[session]["node_list"]:
- timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
- self.sessions[session]["commTimers"][node] = timer;
+ timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node))
+ self.sessions[session]["commTimers"][node] = timer
#print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout']))
- timer.start();
- self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback);
+ timer.start()
+ self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback)
def _event_comm_timeout(self, session, nodeId):
"""
@@ -363,22 +363,22 @@ class XEP_0323(BasePlugin):
session -- The request session id
nodeId -- The id of the device which timed out
"""
- msg = self.xmpp.Message();
- msg['from'] = self.sessions[session]['to'];
- msg['to'] = self.sessions[session]['from'];
- msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
- msg['failure']['error']['text'] = "Timeout";
- msg['failure']['error']['nodeId'] = nodeId;
- msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+ msg = self.xmpp.Message()
+ msg['from'] = self.sessions[session]['to']
+ msg['to'] = self.sessions[session]['from']
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr']
+ msg['failure']['error']['text'] = "Timeout"
+ msg['failure']['error']['nodeId'] = nodeId
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat()
# Drop communication with this device and check if we are done
- self.sessions[session]["nodeDone"][nodeId] = True;
+ self.sessions[session]["nodeDone"][nodeId] = True
if (self._all_nodes_done(session)):
- msg['failure']['done'] = 'true';
- msg.send();
+ msg['failure']['done'] = 'true'
+ msg.send()
# The session is complete, delete it
#print("del session " + session + " due to timeout")
- del self.sessions[session];
+ del self.sessions[session]
def _event_delayed_req(self, session, process_fields, req_flags):
"""
@@ -390,17 +390,17 @@ class XEP_0323(BasePlugin):
flags -- [optional] flags to pass to the devices, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
- msg = self.xmpp.Message();
- msg['from'] = self.sessions[session]['to'];
- msg['to'] = self.sessions[session]['from'];
- msg['started']['seqnr'] = self.sessions[session]['seqnr'];
- msg.send();
+ msg = self.xmpp.Message()
+ msg['from'] = self.sessions[session]['to']
+ msg['to'] = self.sessions[session]['from']
+ msg['started']['seqnr'] = self.sessions[session]['seqnr']
+ msg.send()
if self.threaded:
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
tr_req.start()
else:
- self._threaded_node_request(session, process_fields, req_flags);
+ self._threaded_node_request(session, process_fields, req_flags)
def _all_nodes_done(self, session):
"""
@@ -411,8 +411,8 @@ class XEP_0323(BasePlugin):
"""
for n in self.sessions[session]["nodeDone"]:
if not self.sessions[session]["nodeDone"][n]:
- return False;
- return True;
+ return False
+ return True
def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None):
"""
@@ -452,33 +452,33 @@ class XEP_0323(BasePlugin):
return
if result == "error":
- self.sessions[session]["commTimers"][nodeId].cancel();
+ self.sessions[session]["commTimers"][nodeId].cancel()
- msg = self.xmpp.Message();
- msg['from'] = self.sessions[session]['to'];
- msg['to'] = self.sessions[session]['from'];
- msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
- msg['failure']['error']['text'] = error_msg;
- msg['failure']['error']['nodeId'] = nodeId;
- msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
+ msg = self.xmpp.Message()
+ msg['from'] = self.sessions[session]['to']
+ msg['to'] = self.sessions[session]['from']
+ msg['failure']['seqnr'] = self.sessions[session]['seqnr']
+ msg['failure']['error']['text'] = error_msg
+ msg['failure']['error']['nodeId'] = nodeId
+ msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat()
# Drop communication with this device and check if we are done
- self.sessions[session]["nodeDone"][nodeId] = True;
+ self.sessions[session]["nodeDone"][nodeId] = True
if (self._all_nodes_done(session)):
- msg['failure']['done'] = 'true';
+ msg['failure']['done'] = 'true'
# The session is complete, delete it
# print("del session " + session + " due to error")
- del self.sessions[session];
- msg.send();
+ del self.sessions[session]
+ msg.send()
else:
- msg = self.xmpp.Message();
- msg['from'] = self.sessions[session]['to'];
- msg['to'] = self.sessions[session]['from'];
- msg['fields']['seqnr'] = self.sessions[session]['seqnr'];
+ msg = self.xmpp.Message()
+ msg['from'] = self.sessions[session]['to']
+ msg['to'] = self.sessions[session]['from']
+ msg['fields']['seqnr'] = self.sessions[session]['seqnr']
if timestamp_block is not None and len(timestamp_block) > 0:
- node = msg['fields'].add_node(nodeId);
- ts = node.add_timestamp(timestamp_block["timestamp"]);
+ node = msg['fields'].add_node(nodeId)
+ ts = node.add_timestamp(timestamp_block["timestamp"])
for f in timestamp_block["fields"]:
data = ts.add_data( typename=f['type'],
@@ -486,50 +486,50 @@ class XEP_0323(BasePlugin):
value=f['value'],
unit=f['unit'],
dataType=f['dataType'],
- flags=f['flags']);
+ flags=f['flags'])
if result == "done":
- self.sessions[session]["commTimers"][nodeId].cancel();
- self.sessions[session]["nodeDone"][nodeId] = True;
- msg['fields']['done'] = 'true';
+ self.sessions[session]["commTimers"][nodeId].cancel()
+ self.sessions[session]["nodeDone"][nodeId] = True
+ msg['fields']['done'] = 'true'
if (self._all_nodes_done(session)):
# The session is complete, delete it
# print("del session " + session + " due to complete")
- del self.sessions[session];
+ del self.sessions[session]
else:
# Restart comm timer
- self.sessions[session]["commTimers"][nodeId].reset();
+ self.sessions[session]["commTimers"][nodeId].reset()
- msg.send();
+ msg.send()
def _handle_event_cancel(self, iq):
""" Received Iq with cancel - this is a cancel request.
Delete the session and confirm. """
- seqnr = iq['cancel']['seqnr'];
+ seqnr = iq['cancel']['seqnr']
# Find the session
for s in self.sessions:
if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr:
# found it. Cancel all timers
for n in self.sessions[s]["commTimers"]:
- self.sessions[s]["commTimers"][n].cancel();
+ self.sessions[s]["commTimers"][n].cancel()
# Confirm
- iq.reply();
- iq['type'] = 'result';
- iq['cancelled']['seqnr'] = seqnr;
- iq.send(block=False);
+ iq.reply()
+ iq['type'] = 'result'
+ iq['cancelled']['seqnr'] = seqnr
+ iq.send(block=False)
# Delete session
del self.sessions[s]
return
# Could not find session, send reject
- iq.reply();
- iq['type'] = 'error';
- iq['rejected']['seqnr'] = seqnr;
- iq['rejected']['error'] = "Cancel request received, no matching request is active.";
- iq.send(block=False);
+ iq.reply()
+ iq['type'] = 'error'
+ iq['rejected']['seqnr'] = seqnr
+ iq['rejected']['error'] = "Cancel request received, no matching request is active."
+ iq.send(block=False)
# =================================================================
# Client side (data retriever) API
@@ -593,26 +593,26 @@ class XEP_0323(BasePlugin):
session -- Session identifier. Client can use this as a reference to cancel
the request.
"""
- iq = self.xmpp.Iq();
- iq['from'] = from_jid;
- iq['to'] = to_jid;
- iq['type'] = "get";
- seqnr = self._get_new_seqnr();
- iq['id'] = seqnr;
- iq['req']['seqnr'] = seqnr;
+ iq = self.xmpp.Iq()
+ iq['from'] = from_jid
+ iq['to'] = to_jid
+ iq['type'] = "get"
+ seqnr = self._get_new_seqnr()
+ iq['id'] = seqnr
+ iq['req']['seqnr'] = seqnr
if nodeIds is not None:
for nodeId in nodeIds:
- iq['req'].add_node(nodeId);
+ iq['req'].add_node(nodeId)
if fields is not None:
for field in fields:
- iq['req'].add_field(field);
+ iq['req'].add_field(field)
- iq['req']._set_flags(flags);
+ iq['req']._set_flags(flags)
- self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback};
- iq.send(block=False);
+ self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback}
+ iq.send(block=False)
- return seqnr;
+ return seqnr
def cancel_request(self, session):
"""
@@ -625,39 +625,39 @@ class XEP_0323(BasePlugin):
session -- The session id of the request to cancel
"""
seqnr = session
- iq = self.xmpp.Iq();
+ iq = self.xmpp.Iq()
iq['from'] = self.sessions[seqnr]['from']
- iq['to'] = self.sessions[seqnr]['to'];
- iq['type'] = "get";
- iq['id'] = seqnr;
- iq['cancel']['seqnr'] = seqnr;
- iq.send(block=False);
+ iq['to'] = self.sessions[seqnr]['to']
+ iq['type'] = "get"
+ iq['id'] = seqnr
+ iq['cancel']['seqnr'] = seqnr
+ iq.send(block=False)
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
- self.seqnr_lock.acquire();
- self.last_seqnr = self.last_seqnr + 1;
- self.seqnr_lock.release();
- return str(self.last_seqnr);
+ self.seqnr_lock.acquire()
+ self.last_seqnr = self.last_seqnr + 1
+ self.seqnr_lock.release()
+ return str(self.last_seqnr)
def _handle_event_accepted(self, iq):
""" Received Iq with accepted - request was accepted """
- seqnr = iq['accepted']['seqnr'];
+ seqnr = iq['accepted']['seqnr']
result = "accepted"
if iq['accepted']['queued'] == 'true':
result = "queued"
- callback = self.sessions[seqnr]["callback"];
- callback(from_jid=iq['from'], result=result);
+ callback = self.sessions[seqnr]["callback"]
+ callback(from_jid=iq['from'], result=result)
def _handle_event_rejected(self, iq):
""" Received Iq with rejected - this is a reject.
Delete the session. """
- seqnr = iq['rejected']['seqnr'];
- callback = self.sessions[seqnr]["callback"];
- callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']);
+ seqnr = iq['rejected']['seqnr']
+ callback = self.sessions[seqnr]["callback"]
+ callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error'])
# Session terminated
- del self.sessions[seqnr];
+ del self.sessions[seqnr]
def _handle_event_cancelled(self, iq):
"""
@@ -665,59 +665,59 @@ class XEP_0323(BasePlugin):
Delete the session.
"""
#print("Got cancelled")
- seqnr = iq['cancelled']['seqnr'];
- callback = self.sessions[seqnr]["callback"];
- callback(from_jid=iq['from'], result="cancelled");
+ seqnr = iq['cancelled']['seqnr']
+ callback = self.sessions[seqnr]["callback"]
+ callback(from_jid=iq['from'], result="cancelled")
# Session cancelled
- del self.sessions[seqnr];
+ del self.sessions[seqnr]
def _handle_event_fields(self, msg):
"""
Received Msg with fields - this is a data reponse to a request.
If this is the last data block, issue a "done" callback.
"""
- seqnr = msg['fields']['seqnr'];
- callback = self.sessions[seqnr]["callback"];
+ seqnr = msg['fields']['seqnr']
+ callback = self.sessions[seqnr]["callback"]
for node in msg['fields']['nodes']:
for ts in node['timestamps']:
- fields = [];
+ fields = []
for d in ts['datas']:
- field_block = {};
- field_block["name"] = d['name'];
- field_block["typename"] = d._get_typename();
- field_block["value"] = d['value'];
- if not d['unit'] == "": field_block["unit"] = d['unit'];
- if not d['dataType'] == "": field_block["dataType"] = d['dataType'];
- flags = d._get_flags();
+ field_block = {}
+ field_block["name"] = d['name']
+ field_block["typename"] = d._get_typename()
+ field_block["value"] = d['value']
+ if not d['unit'] == "": field_block["unit"] = d['unit']
+ if not d['dataType'] == "": field_block["dataType"] = d['dataType']
+ flags = d._get_flags()
if not len(flags) == 0:
- field_block["flags"] = flags;
- fields.append(field_block);
+ field_block["flags"] = flags
+ fields.append(field_block)
- callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields);
+ callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields)
if msg['fields']['done'] == "true":
- callback(from_jid=msg['from'], result="done");
+ callback(from_jid=msg['from'], result="done")
# Session done
- del self.sessions[seqnr];
+ del self.sessions[seqnr]
def _handle_event_failure(self, msg):
"""
Received Msg with failure - our request failed
Delete the session.
"""
- seqnr = msg['failure']['seqnr'];
- callback = self.sessions[seqnr]["callback"];
- callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']);
+ seqnr = msg['failure']['seqnr']
+ callback = self.sessions[seqnr]["callback"]
+ callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text'])
# Session failed
- del self.sessions[seqnr];
+ del self.sessions[seqnr]
def _handle_event_started(self, msg):
"""
Received Msg with started - our request was queued and is now started.
"""
- seqnr = msg['started']['seqnr'];
- callback = self.sessions[seqnr]["callback"];
- callback(from_jid=msg['from'], result="started");
+ seqnr = msg['started']['seqnr']
+ callback = self.sessions[seqnr]["callback"]
+ callback(from_jid=msg['from'], result="started")