diff options
Diffstat (limited to 'sleekxmpp')
29 files changed, 955 insertions, 709 deletions
diff --git a/sleekxmpp/basexmpp.py b/sleekxmpp/basexmpp.py index bf0ae4df..8cd61b63 100644 --- a/sleekxmpp/basexmpp.py +++ b/sleekxmpp/basexmpp.py @@ -25,7 +25,6 @@ from sleekxmpp.exceptions import IqError, IqTimeout from sleekxmpp.stanza import Message, Presence, Iq, StreamError from sleekxmpp.stanza.roster import Roster from sleekxmpp.stanza.nick import Nick -from sleekxmpp.stanza.htmlim import HTMLIM from sleekxmpp.xmlstream import XMLStream, JID from sleekxmpp.xmlstream import ET, register_stanza_plugin @@ -245,7 +244,7 @@ class BaseXMPP(XMLStream): self.plugin[name].post_inited = True return XMLStream.process(self, *args, **kwargs) - def register_plugin(self, plugin, pconfig={}, module=None): + def register_plugin(self, plugin, pconfig=None, module=None): """Register and configure a plugin for use in this stream. :param plugin: The name of the plugin class. Plugin names must diff --git a/sleekxmpp/componentxmpp.py b/sleekxmpp/componentxmpp.py index bac455e2..4b229a6f 100644 --- a/sleekxmpp/componentxmpp.py +++ b/sleekxmpp/componentxmpp.py @@ -49,8 +49,13 @@ class ComponentXMPP(BaseXMPP): Defaults to ``False``. """ - def __init__(self, jid, secret, host=None, port=None, - plugin_config={}, plugin_whitelist=[], use_jc_ns=False): + def __init__(self, jid, secret, host=None, port=None, plugin_config=None, plugin_whitelist=None, use_jc_ns=False): + + if not plugin_whitelist: + plugin_whitelist = [] + if not plugin_config: + plugin_config = {} + if use_jc_ns: default_ns = 'jabber:client' else: diff --git a/sleekxmpp/features/feature_mechanisms/mechanisms.py b/sleekxmpp/features/feature_mechanisms/mechanisms.py index 17ad5ed0..1d8f8798 100644 --- a/sleekxmpp/features/feature_mechanisms/mechanisms.py +++ b/sleekxmpp/features/feature_mechanisms/mechanisms.py @@ -187,14 +187,14 @@ class FeatureMechanisms(BasePlugin): except sasl.SASLCancelled: self.attempted_mechs.add(self.mech.name) self._send_auth() - except sasl.SASLFailed: - self.attempted_mechs.add(self.mech.name) - self._send_auth() except sasl.SASLMutualAuthFailed: log.error("Mutual authentication failed! " + \ "A security breach is possible.") self.attempted_mechs.add(self.mech.name) self.xmpp.disconnect() + except sasl.SASLFailed: + self.attempted_mechs.add(self.mech.name) + self._send_auth() else: resp.send(now=True) @@ -207,13 +207,13 @@ class FeatureMechanisms(BasePlugin): resp['value'] = self.mech.process(stanza['value']) except sasl.SASLCancelled: self.stanza.Abort(self.xmpp).send() - except sasl.SASLFailed: - self.stanza.Abort(self.xmpp).send() except sasl.SASLMutualAuthFailed: log.error("Mutual authentication failed! " + \ "A security breach is possible.") self.attempted_mechs.add(self.mech.name) self.xmpp.disconnect() + except sasl.SASLFailed: + self.stanza.Abort(self.xmpp).send() else: if resp.get_value() == '': resp.del_value() diff --git a/sleekxmpp/plugins/google/auth/stanza.py b/sleekxmpp/plugins/google/auth/stanza.py index 49c5cba7..2d13f85a 100644 --- a/sleekxmpp/plugins/google/auth/stanza.py +++ b/sleekxmpp/plugins/google/auth/stanza.py @@ -24,7 +24,7 @@ class GoogleAuth(ElementBase): print('setting up google extension') def get_client_uses_full_bind_result(self): - return self.parent()._get_attr(self.disovery_attr) == 'true' + return self.parent()._get_attr(self.discovery_attr) == 'true' def set_client_uses_full_bind_result(self, value): print('>>>', value) diff --git a/sleekxmpp/plugins/google/nosave/stanza.py b/sleekxmpp/plugins/google/nosave/stanza.py index d8701322..791d4b0c 100644 --- a/sleekxmpp/plugins/google/nosave/stanza.py +++ b/sleekxmpp/plugins/google/nosave/stanza.py @@ -52,7 +52,7 @@ class Item(ElementBase): def get_source(self): return JID(self._get_attr('source', '')) - def set_source(self): + def set_source(self, value): self._set_attr('source', str(value)) diff --git a/sleekxmpp/plugins/google/settings/settings.py b/sleekxmpp/plugins/google/settings/settings.py index 7122ff56..591956fc 100644 --- a/sleekxmpp/plugins/google/settings/settings.py +++ b/sleekxmpp/plugins/google/settings/settings.py @@ -6,8 +6,6 @@ See the file LICENSE for copying permission. """ -import logging - from sleekxmpp.stanza import Iq from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath diff --git a/sleekxmpp/plugins/xep_0004/stanza/form.py b/sleekxmpp/plugins/xep_0004/stanza/form.py index f51e7f09..1d733760 100644 --- a/sleekxmpp/plugins/xep_0004/stanza/form.py +++ b/sleekxmpp/plugins/xep_0004/stanza/form.py @@ -151,7 +151,6 @@ class Form(ElementBase): return fields def get_instructions(self): - instructions = '' instsXML = self.xml.findall('{%s}instructions' % self.namespace) return "\n".join([instXML.text for instXML in instsXML]) diff --git a/sleekxmpp/plugins/xep_0009/remote.py b/sleekxmpp/plugins/xep_0009/remote.py index 8847ff24..b02f587e 100644 --- a/sleekxmpp/plugins/xep_0009/remote.py +++ b/sleekxmpp/plugins/xep_0009/remote.py @@ -6,7 +6,7 @@ See the file LICENSE for copying permission. """ -from binding import py2xml, xml2py, xml2fault, fault2xml +from sleekxmpp.plugins.xep_0009.binding import py2xml, xml2py, xml2fault, fault2xml from threading import RLock import abc import inspect @@ -18,6 +18,45 @@ import traceback log = logging.getLogger(__name__) +# Define a function _isstr() to check if an object is a string in a way +# compatible with Python 2 and Python 3 (basestring does not exists in Python 3). +try: + basestring # This evaluation will throw an exception if basestring does not exists (Python 3). + def _isstr(obj): + return isinstance(obj, basestring) +except NameError: + def _isstr(obj): + return isinstance(obj, str) + + +# Class decorator to declare a metaclass to a class in a way compatible with Python 2 and 3. +# This decorator is copied from 'six' (https://bitbucket.org/gutworth/six): +# +# Copyright (c) 2010-2015 Benjamin Peterson +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +def _add_metaclass(metaclass): + def wrapper(cls): + orig_vars = cls.__dict__.copy() + slots = orig_vars.get('__slots__') + if slots is not None: + if isinstance(slots, str): + slots = [slots] + for slots_var in slots: + orig_vars.pop(slots_var) + orig_vars.pop('__dict__', None) + orig_vars.pop('__weakref__', None) + return metaclass(cls.__name__, cls.__bases__, orig_vars) + return wrapper + def _intercept(method, name, public): def _resolver(instance, *args, **kwargs): log.debug("Locally calling %s.%s with arguments %s.", instance.FQN(), method.__name__, args) @@ -68,7 +107,7 @@ def remote(function_argument, public = True): if hasattr(function_argument, '__call__'): return _intercept(function_argument, None, public) else: - if not isinstance(function_argument, basestring): + if not _isstr(function_argument): if not isinstance(function_argument, bool): raise Exception('Expected an RPC method name or visibility modifier!') else: @@ -222,12 +261,11 @@ class TimeoutException(Exception): pass +@_add_metaclass(abc.ABCMeta) class Callback(object): ''' A base class for callback handlers. ''' - __metaclass__ = abc.ABCMeta - @abc.abstractproperty def set_value(self, value): @@ -291,7 +329,7 @@ class Future(Callback): self._event.set() - +@_add_metaclass(abc.ABCMeta) class Endpoint(object): ''' The Endpoint class is an abstract base class for all objects @@ -303,8 +341,6 @@ class Endpoint(object): which specifies which object an RPC call refers to. It is the first part in a RPC method name '<fqn>.<method>'. ''' - __metaclass__ = abc.ABCMeta - def __init__(self, session, target_jid): ''' @@ -491,7 +527,7 @@ class RemoteSession(object): def _find_key(self, dict, value): """return the key of dictionary dic given the value""" - search = [k for k, v in dict.iteritems() if v == value] + search = [k for k, v in dict.items() if v == value] if len(search) == 0: return None else: @@ -547,7 +583,7 @@ class RemoteSession(object): result = handler_cls(*args, **kwargs) Endpoint.__init__(result, self, self._client.boundjid.full) method_dict = result.get_methods() - for method_name, method in method_dict.iteritems(): + for method_name, method in method_dict.items(): #!!! self._client.plugin['xep_0009'].register_call(result.FQN(), method, method_name) self._register_call(result.FQN(), method, method_name) self._register_acl(result.FQN(), acl) @@ -697,7 +733,8 @@ class Remote(object): if(client.boundjid.bare in cls._sessions): raise RemoteException("There already is a session associated with these credentials!") else: - cls._sessions[client.boundjid.bare] = client; + cls._sessions[client.boundjid.bare] = client + def _session_close_callback(): with Remote._lock: del cls._sessions[client.boundjid.bare] diff --git a/sleekxmpp/plugins/xep_0009/rpc.py b/sleekxmpp/plugins/xep_0009/rpc.py index 3378c650..6179355e 100644 --- a/sleekxmpp/plugins/xep_0009/rpc.py +++ b/sleekxmpp/plugins/xep_0009/rpc.py @@ -61,7 +61,7 @@ class XEP_0009(BasePlugin): iq.enable('rpc_query')
iq['rpc_query']['method_call']['method_name'] = pmethod
iq['rpc_query']['method_call']['params'] = params
- return iq;
+ return iq
def make_iq_method_response(self, pid, pto, params):
iq = self.xmpp.makeIqResult(pid)
@@ -93,7 +93,7 @@ class XEP_0009(BasePlugin): def _item_not_found(self, iq):
payload = iq.get_payload()
- iq.reply().error().set_payload(payload);
+ iq.reply().error().set_payload(payload)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
diff --git a/sleekxmpp/plugins/xep_0030/disco.py b/sleekxmpp/plugins/xep_0030/disco.py index 8a397923..721f73f6 100644 --- a/sleekxmpp/plugins/xep_0030/disco.py +++ b/sleekxmpp/plugins/xep_0030/disco.py @@ -324,7 +324,7 @@ class XEP_0030(BasePlugin): callback -- Optional callback to execute when a reply is received instead of blocking and waiting for the reply. - timeout_callback -- Optional callback to execute when no result + timeout_callback -- Optional callback to execute when no result has been received in timeout seconds. """ if local is None: @@ -408,7 +408,7 @@ class XEP_0030(BasePlugin): iterator -- If True, return a result set iterator using the XEP-0059 plugin, if the plugin is loaded. Otherwise the parameter is ignored. - timeout_callback -- Optional callback to execute when no result + timeout_callback -- Optional callback to execute when no result has been received in timeout seconds. """ if local or local is None and jid is None: @@ -604,7 +604,7 @@ class XEP_0030(BasePlugin): """ self.api['del_features'](jid, node, None, kwargs) - def _run_node_handler(self, htype, jid, node=None, ifrom=None, data={}): + def _run_node_handler(self, htype, jid, node=None, ifrom=None, data=None): """ Execute the most specific node handler for the given JID/node combination. @@ -615,6 +615,9 @@ class XEP_0030(BasePlugin): node -- The node requested. data -- Optional, custom data to pass to the handler. """ + if not data: + data = {} + return self.api[htype](jid, node, ifrom, data) def _handle_disco_info(self, iq): diff --git a/sleekxmpp/plugins/xep_0065/proxy.py b/sleekxmpp/plugins/xep_0065/proxy.py index fdd9f97e..d890b57a 100644 --- a/sleekxmpp/plugins/xep_0065/proxy.py +++ b/sleekxmpp/plugins/xep_0065/proxy.py @@ -206,7 +206,7 @@ class XEP_0065(base_plugin): # Though this should not be neccessary remove the closed session anyway with self._sessions_lock: if sid in self._sessions: - log.warn(('SOCKS5 session with sid = "%s" was not ' + + log.warn(('SOCKS5 session with sid = "%s" was not ' + 'removed from _sessions by sock.close()') % sid) del self._sessions[sid] @@ -240,9 +240,9 @@ class XEP_0065(base_plugin): # The hostname MUST be SHA1(SID + Requester JID + Target JID) # where the output is hexadecimal-encoded (not binary). digest = sha1() - digest.update(sid) - digest.update(str(requester)) - digest.update(str(target)) + digest.update(sid.encode('utf-8')) + digest.update(str(requester).encode('utf-8')) + digest.update(str(target).encode('utf-8')) dest = digest.hexdigest() diff --git a/sleekxmpp/plugins/xep_0084/stanza.py b/sleekxmpp/plugins/xep_0084/stanza.py index 22f11b72..fd21e6f1 100644 --- a/sleekxmpp/plugins/xep_0084/stanza.py +++ b/sleekxmpp/plugins/xep_0084/stanza.py @@ -8,7 +8,7 @@ from base64 import b64encode, b64decode -from sleekxmpp.util import bytes +from sleekxmpp.util import bytes as sbytes from sleekxmpp.xmlstream import ET, ElementBase, register_stanza_plugin @@ -20,12 +20,15 @@ class Data(ElementBase): def get_value(self): if self.xml.text: - return b64decode(bytes(self.xml.text)) + return b64decode(sbytes(self.xml.text)) return '' def set_value(self, value): if value: - self.xml.text = b64encode(bytes(value)) + self.xml.text = b64encode(sbytes(value)) + # Python3 base64 encoded is bytes and needs to be decoded to string + if isinstance(self.xml.text, bytes): + self.xml.text = self.xml.text.decode() else: self.xml.text = '' diff --git a/sleekxmpp/plugins/xep_0138.py b/sleekxmpp/plugins/xep_0138.py new file mode 100644 index 00000000..c5d8f06f --- /dev/null +++ b/sleekxmpp/plugins/xep_0138.py @@ -0,0 +1,148 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2011 Nathanael C. Fritz + This file is part of SleekXMPP. + + See the file LICENSE for copying permission. +""" + +import logging +import socket +import zlib + +from sleekxmpp.thirdparty.suelta.util import bytes + + +from sleekxmpp.stanza import StreamFeatures +from sleekxmpp.xmlstream import RestartStream, register_stanza_plugin, ElementBase, StanzaBase +from sleekxmpp.xmlstream.matcher import * +from sleekxmpp.xmlstream.handler import * +from sleekxmpp.plugins import BasePlugin, register_plugin + +log = logging.getLogger(__name__) + + +class Compression(ElementBase): + name = 'compression' + namespace = 'http://jabber.org/features/compress' + interfaces = set(('methods',)) + plugin_attrib = 'compression' + plugin_tag_map = {} + plugin_attrib_map = {} + + def get_methods(self): + methods = [] + for method in self.xml.findall('{%s}method' % self.namespace): + methods.append(method.text) + return methods + + +class Compress(StanzaBase): + name = 'compress' + namespace = 'http://jabber.org/protocol/compress' + interfaces = set(('method',)) + sub_interfaces = interfaces + plugin_attrib = 'compress' + plugin_tag_map = {} + plugin_attrib_map = {} + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + +class Compressed(StanzaBase): + name = 'compressed' + namespace = 'http://jabber.org/protocol/compress' + interfaces = set() + plugin_tag_map = {} + plugin_attrib_map = {} + + def setup(self, xml): + StanzaBase.setup(self, xml) + self.xml.tag = self.tag_name() + + + + +class ZlibSocket(object): + + def __init__(self, socketobj): + self.__socket = socketobj + self.compressor = zlib.compressobj() + self.decompressor = zlib.decompressobj(zlib.MAX_WBITS) + + def __getattr__(self, name): + return getattr(self.__socket, name) + + def send(self, data): + sentlen = len(data) + data = self.compressor.compress(data) + data += self.compressor.flush(zlib.Z_SYNC_FLUSH) + log.debug(b'>>> (compressed)' + (data.encode("hex"))) + #return self.__socket.send(data) + sentactuallen = self.__socket.send(data) + assert(sentactuallen == len(data)) + + return sentlen + + def recv(self, *args, **kwargs): + data = self.__socket.recv(*args, **kwargs) + log.debug(b'<<< (compressed)' + data.encode("hex")) + return self.decompressor.decompress(self.decompressor.unconsumed_tail + data) + + +class XEP_0138(BasePlugin): + """ + XEP-0138: Compression + """ + name = "xep_0138" + description = "XEP-0138: Compression" + dependencies = set(["xep_0030"]) + + def plugin_init(self): + self.xep = '0138' + self.description = 'Stream Compression (Generic)' + + self.compression_methods = {'zlib': True} + + register_stanza_plugin(StreamFeatures, Compression) + self.xmpp.register_stanza(Compress) + self.xmpp.register_stanza(Compressed) + + self.xmpp.register_handler( + Callback('Compressed', + StanzaPath('compressed'), + self._handle_compressed, + instream=True)) + + self.xmpp.register_feature('compression', + self._handle_compression, + restart=True, + order=self.config.get('order', 5)) + + def register_compression_method(self, name, handler): + self.compression_methods[name] = handler + + def _handle_compression(self, features): + for method in features['compression']['methods']: + if method in self.compression_methods: + log.info('Attempting to use %s compression' % method) + c = Compress(self.xmpp) + c['method'] = method + c.send(now=True) + return True + return False + + def _handle_compressed(self, stanza): + self.xmpp.features.add('compression') + log.debug('Stream Compressed!') + compressed_socket = ZlibSocket(self.xmpp.socket) + self.xmpp.set_socket(compressed_socket) + raise RestartStream() + + def _handle_failure(self, stanza): + pass + +xep_0138 = XEP_0138 +register_plugin(XEP_0138) diff --git a/sleekxmpp/plugins/xep_0257/stanza.py b/sleekxmpp/plugins/xep_0257/stanza.py index 17e20136..c3c41db2 100644 --- a/sleekxmpp/plugins/xep_0257/stanza.py +++ b/sleekxmpp/plugins/xep_0257/stanza.py @@ -10,7 +10,7 @@ from sleekxmpp.xmlstream import ElementBase, ET, register_stanza_plugin class Certs(ElementBase): - name = 'query' + name = 'items' namespace = 'urn:xmpp:saslcert:1' plugin_attrib = 'sasl_certs' interfaces = set() diff --git a/sleekxmpp/plugins/xep_0323/device.py b/sleekxmpp/plugins/xep_0323/device.py index 0bc20327..80e6fd95 100644 --- a/sleekxmpp/plugins/xep_0323/device.py +++ b/sleekxmpp/plugins/xep_0323/device.py @@ -13,15 +13,18 @@ import logging class Device(object): """ - Example implementation of a device readout object. + Example implementation of a device readout object. Is registered in the XEP_0323.register_node call - The device object may be any custom implementation to support + The device object may be any custom implementation to support specific devices, but it must implement the functions: has_field request_fields """ - def __init__(self, nodeId, fields={}): + def __init__(self, nodeId, fields=None): + if not fields: + fields = {} + self.nodeId = nodeId self.fields = fields # see fields described below # {'type':'numeric', @@ -38,19 +41,19 @@ class Device(object): Returns true if the supplied field name exists in this device. Arguments: - field -- The field name + field -- The field name """ if field in self.fields.keys(): - return True; - return False; - + return True + return False + def refresh(self, fields): """ override method to do the refresh work refresh values from hardware or other """ pass - + def request_fields(self, fields, flags, session, callback): """ @@ -65,7 +68,7 @@ class Device(object): Formatted as a dictionary like { "flag name": "flag value" ... } session -- Session id, only used in the callback as identifier callback -- Callback function to call when data is available. - + The callback function must support the following arguments: session -- Session id, as supplied in the request_fields call @@ -73,11 +76,11 @@ class Device(object): result -- The current result status of the readout. Valid values are: "error" - Readout failed. "fields" - Contains readout data. - "done" - Indicates that the readout is complete. May contain + "done" - Indicates that the readout is complete. May contain readout data. - timestamp_block -- [optional] Only applies when result != "error" + timestamp_block -- [optional] Only applies when result != "error" The readout data. Structured as a dictionary: - { + { timestamp: timestamp for this datablock, fields: list of field dictionary (one per readout field). readout field dictionary format: @@ -89,10 +92,10 @@ class Device(object): dataType: The datatype of the field. Only applies to type enum. flags: [optional] data classifier flags for the field, e.g. momentary Formatted as a dictionary like { "flag name": "flag value" ... } - } + } } error_msg -- [optional] Only applies when result == "error". - Error details when a request failed. + Error details when a request failed. """ logging.debug("request_fields called looking for fields %s",fields) @@ -101,10 +104,10 @@ class Device(object): for f in fields: if f not in self.fields.keys(): self._send_reject(session, callback) - return False; + return False else: # Request all fields - fields = self.fields.keys(); + fields = self.fields.keys() # Refresh data from device @@ -114,27 +117,27 @@ class Device(object): if "momentary" in flags and flags['momentary'] == "true" or \ "all" in flags and flags['all'] == "true": - ts_block = {}; - timestamp = ""; + ts_block = {} + timestamp = "" if len(self.momentary_timestamp) > 0: - timestamp = self.momentary_timestamp; + timestamp = self.momentary_timestamp else: - timestamp = self._get_timestamp(); + timestamp = self._get_timestamp() - field_block = []; + field_block = [] for f in self.momentary_data: if f in fields: - field_block.append({"name": f, - "type": self.fields[f]["type"], + field_block.append({"name": f, + "type": self.fields[f]["type"], "unit": self.fields[f]["unit"], "dataType": self.fields[f]["dataType"], - "value": self.momentary_data[f]["value"], - "flags": self.momentary_data[f]["flags"]}); - ts_block["timestamp"] = timestamp; - ts_block["fields"] = field_block; + "value": self.momentary_data[f]["value"], + "flags": self.momentary_data[f]["flags"]}) + ts_block["timestamp"] = timestamp + ts_block["fields"] = field_block - callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block); + callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block) return from_flag = self._datetime_flag_parser(flags, 'from') @@ -142,36 +145,36 @@ class Device(object): for ts in sorted(self.timestamp_data.keys()): tsdt = datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S") - if not from_flag is None: - if tsdt < from_flag: + if not from_flag is None: + if tsdt < from_flag: #print (str(tsdt) + " < " + str(from_flag)) continue - if not to_flag is None: - if tsdt > to_flag: + if not to_flag is None: + if tsdt > to_flag: #print (str(tsdt) + " > " + str(to_flag)) continue - - ts_block = {}; - field_block = []; + + ts_block = {} + field_block = [] for f in self.timestamp_data[ts]: if f in fields: - field_block.append({"name": f, - "type": self.fields[f]["type"], + field_block.append({"name": f, + "type": self.fields[f]["type"], "unit": self.fields[f]["unit"], "dataType": self.fields[f]["dataType"], - "value": self.timestamp_data[ts][f]["value"], - "flags": self.timestamp_data[ts][f]["flags"]}); + "value": self.timestamp_data[ts][f]["value"], + "flags": self.timestamp_data[ts][f]["flags"]}) - ts_block["timestamp"] = ts; - ts_block["fields"] = field_block; - callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block); - callback(session, result="done", nodeId=self.nodeId, timestamp_block=None); + ts_block["timestamp"] = ts + ts_block["fields"] = field_block + callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block) + callback(session, result="done", nodeId=self.nodeId, timestamp_block=None) def _datetime_flag_parser(self, flags, flagname): if not flagname in flags: return None - + dt = None try: dt = datetime.datetime.strptime(flags[flagname], "%Y-%m-%dT%H:%M:%S") @@ -195,7 +198,7 @@ class Device(object): session -- Session id, see definition in request_fields function callback -- Callback function, see definition in request_fields function """ - callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject"); + callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject") def _add_field(self, name, typename, unit=None, dataType=None): """ @@ -207,7 +210,7 @@ class Device(object): unit -- [optional] only applies to "numeric". Unit for the field. dataType -- [optional] only applies to "enum". Datatype for the field. """ - self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType}; + self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType} def _add_field_timestamp_data(self, name, timestamp, value, flags=None): """ @@ -221,12 +224,12 @@ class Device(object): Formatted as a dictionary like { "flag name": "flag value" ... } """ if not name in self.fields.keys(): - return False; + return False if not timestamp in self.timestamp_data: - self.timestamp_data[timestamp] = {}; + self.timestamp_data[timestamp] = {} - self.timestamp_data[timestamp][name] = {"value": value, "flags": flags}; - return True; + self.timestamp_data[timestamp][name] = {"value": value, "flags": flags} + return True def _add_field_momentary_data(self, name, value, flags=None): """ @@ -239,17 +242,17 @@ class Device(object): Formatted as a dictionary like { "flag name": "flag value" ... } """ if name not in self.fields: - return False; + return False if flags is None: - flags = {}; - + flags = {} + flags["momentary"] = "true" - self.momentary_data[name] = {"value": value, "flags": flags}; - return True; + self.momentary_data[name] = {"value": value, "flags": flags} + return True def _set_momentary_timestamp(self, timestamp): """ This function is only for unit testing to produce predictable results. """ - self.momentary_timestamp = timestamp; + self.momentary_timestamp = timestamp diff --git a/sleekxmpp/plugins/xep_0323/sensordata.py b/sleekxmpp/plugins/xep_0323/sensordata.py index 2e2f2470..30c28504 100644 --- a/sleekxmpp/plugins/xep_0323/sensordata.py +++ b/sleekxmpp/plugins/xep_0323/sensordata.py @@ -29,12 +29,12 @@ log = logging.getLogger(__name__) class XEP_0323(BasePlugin): """ - XEP-0323: IoT Sensor Data + XEP-0323: IoT Sensor Data This XEP provides the underlying architecture, basic operations and data structures for sensor data communication over XMPP networks. It includes - a hardware abstraction model, removing any technical detail implemented + a hardware abstraction model, removing any technical detail implemented in underlying technologies. Also see <http://xmpp.org/extensions/xep-0323.html> @@ -55,10 +55,10 @@ class XEP_0323(BasePlugin): Sensordata Event:Rejected -- Received a reject from sensor for a request Sensordata Event:Cancelled -- Received a cancel confirm from sensor Sensordata Event:Fields -- Received fields from sensor for a request - This may be triggered multiple times since + This may be triggered multiple times since the sensor can split up its response in multiple messages. - Sensordata Event:Failure -- Received a failure indication from sensor + Sensordata Event:Failure -- Received a failure indication from sensor for a request. Typically a comm timeout. Attributes: @@ -69,7 +69,7 @@ class XEP_0323(BasePlugin): relevant to a request's session. This dictionary is used both by the client and sensor side. On client side, seqnr is used as key, while on sensor side, a session_id is used - as key. This ensures that the two will not collide, so + as key. This ensures that the two will not collide, so one instance can be both client and sensor. Sensor side ----------- @@ -89,12 +89,12 @@ class XEP_0323(BasePlugin): Sensor side ----------- - register_node -- Register a sensor as available from this XMPP + register_node -- Register a sensor as available from this XMPP instance. Client side ----------- - request_data -- Initiates a request for data from one or more + request_data -- Initiates a request for data from one or more sensors. Non-blocking, a callback function will be called when data is available. @@ -102,7 +102,7 @@ class XEP_0323(BasePlugin): name = 'xep_0323' description = 'XEP-0323 Internet of Things - Sensor Data' - dependencies = set(['xep_0030']) + dependencies = set(['xep_0030']) stanza = stanza @@ -155,11 +155,11 @@ class XEP_0323(BasePlugin): self._handle_event_started)) # Server side dicts - self.nodes = {}; - self.sessions = {}; + self.nodes = {} + self.sessions = {} - self.last_seqnr = 0; - self.seqnr_lock = Lock(); + self.last_seqnr = 0 + self.seqnr_lock = Lock() ## For testning only self.test_authenticated_from = "" @@ -182,7 +182,7 @@ class XEP_0323(BasePlugin): def plugin_end(self): """ Stop the XEP-0323 plugin """ - self.sessions.clear(); + self.sessions.clear() self.xmpp.remove_handler('Sensordata Event:Req') self.xmpp.remove_handler('Sensordata Event:Accepted') self.xmpp.remove_handler('Sensordata Event:Rejected') @@ -198,9 +198,9 @@ class XEP_0323(BasePlugin): def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): """ Register a sensor/device as available for serving of data through this XMPP - instance. + instance. - The device object may by any custom implementation to support + The device object may by any custom implementation to support specific devices, but it must implement the functions: has_field request_fields @@ -212,25 +212,25 @@ class XEP_0323(BasePlugin): commTimeout -- Time in seconds to wait between each callback from device during a data readout. Float. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ - self.nodes[nodeId] = {"device": device, + self.nodes[nodeId] = {"device": device, "commTimeout": commTimeout, - "sourceId": sourceId, - "cacheType": cacheType}; + "sourceId": sourceId, + "cacheType": cacheType} def _set_authenticated(self, auth=''): """ Internal testing function """ - self.test_authenticated_from = auth; + self.test_authenticated_from = auth def _handle_event_req(self, iq): """ Event handler for reception of an Iq with req - this is a request. - Verifies that + Verifies that - all the requested nodes are available - - at least one of the requested fields is available from at least + - at least one of the requested fields is available from at least one of the nodes If the request passes verification, an accept response is sent, and @@ -238,42 +238,42 @@ class XEP_0323(BasePlugin): If the verification fails, a reject message is sent. """ - seqnr = iq['req']['seqnr']; - error_msg = ''; - req_ok = True; + seqnr = iq['req']['seqnr'] + error_msg = '' + req_ok = True # Authentication if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: # Invalid authentication - req_ok = False; - error_msg = "Access denied"; + req_ok = False + error_msg = "Access denied" # Nodes - process_nodes = []; + process_nodes = [] if len(iq['req']['nodes']) > 0: for n in iq['req']['nodes']: if not n['nodeId'] in self.nodes: - req_ok = False; - error_msg = "Invalid nodeId " + n['nodeId']; - process_nodes = [n['nodeId'] for n in iq['req']['nodes']]; + req_ok = False + error_msg = "Invalid nodeId " + n['nodeId'] + process_nodes = [n['nodeId'] for n in iq['req']['nodes']] else: - process_nodes = self.nodes.keys(); + process_nodes = self.nodes.keys() # Fields - if we just find one we are happy, otherwise we reject - process_fields = []; + process_fields = [] if len(iq['req']['fields']) > 0: found = False for f in iq['req']['fields']: for node in self.nodes: if self.nodes[node]["device"].has_field(f['name']): - found = True; - break; + found = True + break if not found: - req_ok = False; - error_msg = "Invalid field " + f['name']; - process_fields = [f['name'] for n in iq['req']['fields']]; + req_ok = False + error_msg = "Invalid field " + f['name'] + process_fields = [f['name'] for n in iq['req']['fields']] - req_flags = iq['req']._get_flags(); + req_flags = iq['req']._get_flags() request_delay_sec = None if 'when' in req_flags: @@ -283,7 +283,7 @@ class XEP_0323(BasePlugin): try: dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S") except ValueError: - req_ok = False; + req_ok = False error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)." if not dt is None: @@ -292,30 +292,30 @@ class XEP_0323(BasePlugin): dtdiff = dt - dtnow request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600 if request_delay_sec <= 0: - req_ok = False; - error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat(); + req_ok = False + error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat() if req_ok: - session = self._new_session(); - self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr}; - self.sessions[session]["commTimers"] = {}; - self.sessions[session]["nodeDone"] = {}; + session = self._new_session() + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr} + self.sessions[session]["commTimers"] = {} + self.sessions[session]["nodeDone"] = {} #print("added session: " + str(self.sessions)) - iq.reply(); - iq['accepted']['seqnr'] = seqnr; + iq.reply() + iq['accepted']['seqnr'] = seqnr if not request_delay_sec is None: iq['accepted']['queued'] = "true" - iq.send(block=False); + iq.send(block=False) - self.sessions[session]["node_list"] = process_nodes; + self.sessions[session]["node_list"] = process_nodes if not request_delay_sec is None: # Delay request to requested time timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags)) - self.sessions[session]["commTimers"]["delaytimer"] = timer; - timer.start(); + self.sessions[session]["commTimers"]["delaytimer"] = timer + timer.start() return if self.threaded: @@ -324,19 +324,19 @@ class XEP_0323(BasePlugin): tr_req.start() #print("started thread") else: - self._threaded_node_request(session, process_fields, req_flags); + self._threaded_node_request(session, process_fields, req_flags) else: - iq.reply(); - iq['type'] = 'error'; - iq['rejected']['seqnr'] = seqnr; - iq['rejected']['error'] = error_msg; - iq.send(block=False); + iq.reply() + iq['type'] = 'error' + iq['rejected']['seqnr'] = seqnr + iq['rejected']['error'] = error_msg + iq.send(block=False) def _threaded_node_request(self, session, process_fields, flags): - """ + """ Helper function to handle the device readouts in a separate thread. - + Arguments: session -- The request session id process_fields -- The fields to request from the devices @@ -344,41 +344,41 @@ class XEP_0323(BasePlugin): Formatted as a dictionary like { "flag name": "flag value" ... } """ for node in self.sessions[session]["node_list"]: - self.sessions[session]["nodeDone"][node] = False; + self.sessions[session]["nodeDone"][node] = False for node in self.sessions[session]["node_list"]: - timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); - self.sessions[session]["commTimers"][node] = timer; + timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)) + self.sessions[session]["commTimers"][node] = timer #print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout'])) - timer.start(); - self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback); + timer.start() + self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback) def _event_comm_timeout(self, session, nodeId): - """ + """ Triggered if any of the readout operations timeout. Sends a failure message back to the client, stops communicating with the failing device. - + Arguments: session -- The request session id nodeId -- The id of the device which timed out """ - msg = self.xmpp.Message(); - msg['from'] = self.sessions[session]['to']; - msg['to'] = self.sessions[session]['from']; - msg['failure']['seqnr'] = self.sessions[session]['seqnr']; - msg['failure']['error']['text'] = "Timeout"; - msg['failure']['error']['nodeId'] = nodeId; - msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + msg = self.xmpp.Message() + msg['from'] = self.sessions[session]['to'] + msg['to'] = self.sessions[session]['from'] + msg['failure']['seqnr'] = self.sessions[session]['seqnr'] + msg['failure']['error']['text'] = "Timeout" + msg['failure']['error']['nodeId'] = nodeId + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat() # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): - msg['failure']['done'] = 'true'; - msg.send(); + msg['failure']['done'] = 'true' + msg.send() # The session is complete, delete it #print("del session " + session + " due to timeout") - del self.sessions[session]; + del self.sessions[session] def _event_delayed_req(self, session, process_fields, req_flags): """ @@ -390,47 +390,47 @@ class XEP_0323(BasePlugin): flags -- [optional] flags to pass to the devices, e.g. momentary Formatted as a dictionary like { "flag name": "flag value" ... } """ - msg = self.xmpp.Message(); - msg['from'] = self.sessions[session]['to']; - msg['to'] = self.sessions[session]['from']; - msg['started']['seqnr'] = self.sessions[session]['seqnr']; - msg.send(); + msg = self.xmpp.Message() + msg['from'] = self.sessions[session]['to'] + msg['to'] = self.sessions[session]['from'] + msg['started']['seqnr'] = self.sessions[session]['seqnr'] + msg.send() if self.threaded: tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags)) tr_req.start() else: - self._threaded_node_request(session, process_fields, req_flags); + self._threaded_node_request(session, process_fields, req_flags) def _all_nodes_done(self, session): - """ + """ Checks wheter all devices are done replying to the readout. - + Arguments: session -- The request session id """ for n in self.sessions[session]["nodeDone"]: if not self.sessions[session]["nodeDone"][n]: - return False; - return True; + return False + return True def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None): - """ + """ Callback function called by the devices when they have any additional data. - Composes a message with the data and sends it back to the client, and resets + Composes a message with the data and sends it back to the client, and resets the timeout timer for the device. - + Arguments: session -- The request session id nodeId -- The device id which initiated the callback result -- The current result status of the readout. Valid values are: "error" - Readout failed. "fields" - Contains readout data. - "done" - Indicates that the readout is complete. May contain + "done" - Indicates that the readout is complete. May contain readout data. - timestamp_block -- [optional] Only applies when result != "error" + timestamp_block -- [optional] Only applies when result != "error" The readout data. Structured as a dictionary: - { + { timestamp: timestamp for this datablock, fields: list of field dictionary (one per readout field). readout field dictionary format: @@ -442,7 +442,7 @@ class XEP_0323(BasePlugin): dataType: The datatype of the field. Only applies to type enum. flags: [optional] data classifier flags for the field, e.g. momentary Formatted as a dictionary like { "flag name": "flag value" ... } - } + } } error_msg -- [optional] Only applies when result == "error". Error details when a request failed. @@ -452,99 +452,99 @@ class XEP_0323(BasePlugin): return if result == "error": - self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["commTimers"][nodeId].cancel() - msg = self.xmpp.Message(); - msg['from'] = self.sessions[session]['to']; - msg['to'] = self.sessions[session]['from']; - msg['failure']['seqnr'] = self.sessions[session]['seqnr']; - msg['failure']['error']['text'] = error_msg; - msg['failure']['error']['nodeId'] = nodeId; - msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat(); + msg = self.xmpp.Message() + msg['from'] = self.sessions[session]['to'] + msg['to'] = self.sessions[session]['from'] + msg['failure']['seqnr'] = self.sessions[session]['seqnr'] + msg['failure']['error']['text'] = error_msg + msg['failure']['error']['nodeId'] = nodeId + msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat() # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): - msg['failure']['done'] = 'true'; + msg['failure']['done'] = 'true' # The session is complete, delete it # print("del session " + session + " due to error") - del self.sessions[session]; - msg.send(); + del self.sessions[session] + msg.send() else: - msg = self.xmpp.Message(); - msg['from'] = self.sessions[session]['to']; - msg['to'] = self.sessions[session]['from']; - msg['fields']['seqnr'] = self.sessions[session]['seqnr']; + msg = self.xmpp.Message() + msg['from'] = self.sessions[session]['to'] + msg['to'] = self.sessions[session]['from'] + msg['fields']['seqnr'] = self.sessions[session]['seqnr'] if timestamp_block is not None and len(timestamp_block) > 0: - node = msg['fields'].add_node(nodeId); - ts = node.add_timestamp(timestamp_block["timestamp"]); + node = msg['fields'].add_node(nodeId) + ts = node.add_timestamp(timestamp_block["timestamp"]) for f in timestamp_block["fields"]: - data = ts.add_data( typename=f['type'], - name=f['name'], - value=f['value'], - unit=f['unit'], - dataType=f['dataType'], - flags=f['flags']); + data = ts.add_data( typename=f['type'], + name=f['name'], + value=f['value'], + unit=f['unit'], + dataType=f['dataType'], + flags=f['flags']) if result == "done": - self.sessions[session]["commTimers"][nodeId].cancel(); - self.sessions[session]["nodeDone"][nodeId] = True; - msg['fields']['done'] = 'true'; + self.sessions[session]["commTimers"][nodeId].cancel() + self.sessions[session]["nodeDone"][nodeId] = True + msg['fields']['done'] = 'true' if (self._all_nodes_done(session)): # The session is complete, delete it # print("del session " + session + " due to complete") - del self.sessions[session]; + del self.sessions[session] else: # Restart comm timer - self.sessions[session]["commTimers"][nodeId].reset(); + self.sessions[session]["commTimers"][nodeId].reset() - msg.send(); + msg.send() def _handle_event_cancel(self, iq): - """ Received Iq with cancel - this is a cancel request. + """ Received Iq with cancel - this is a cancel request. Delete the session and confirm. """ - seqnr = iq['cancel']['seqnr']; + seqnr = iq['cancel']['seqnr'] # Find the session for s in self.sessions: if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr: # found it. Cancel all timers for n in self.sessions[s]["commTimers"]: - self.sessions[s]["commTimers"][n].cancel(); + self.sessions[s]["commTimers"][n].cancel() # Confirm - iq.reply(); - iq['type'] = 'result'; - iq['cancelled']['seqnr'] = seqnr; - iq.send(block=False); - + iq.reply() + iq['type'] = 'result' + iq['cancelled']['seqnr'] = seqnr + iq.send(block=False) + # Delete session del self.sessions[s] return # Could not find session, send reject - iq.reply(); - iq['type'] = 'error'; - iq['rejected']['seqnr'] = seqnr; - iq['rejected']['error'] = "Cancel request received, no matching request is active."; - iq.send(block=False); + iq.reply() + iq['type'] = 'error' + iq['rejected']['seqnr'] = seqnr + iq['rejected']['error'] = "Cancel request received, no matching request is active." + iq.send(block=False) - # ================================================================= + # ================================================================= # Client side (data retriever) API def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None): - """ + """ Called on the client side to initiade a data readout. Composes a message with the request and sends it to the device(s). Does not block, the callback will be called when data is available. - + Arguments: from_jid -- The jid of the requester to_jid -- The jid of the device(s) - callback -- The callback function to call when data is availble. - + callback -- The callback function to call when data is availble. + The callback function must support the following arguments: from_jid -- The jid of the responding device(s) @@ -565,7 +565,7 @@ class XEP_0323(BasePlugin): The timestamp of data in this callback. One callback will only contain data from one timestamp. fields -- [optional] Mandatory when result == "fields". - List of field dictionaries representing the readout data. + List of field dictionaries representing the readout data. Dictionary format: { typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum) @@ -575,11 +575,11 @@ class XEP_0323(BasePlugin): dataType: The datatype of the field. Only applies to type enum. flags: [optional] data classifier flags for the field, e.g. momentary. Formatted as a dictionary like { "flag name": "flag value" ... } - } + } error_msg -- [optional] Mandatory when result == "rejected" or "failure". - Details about why the request is rejected or failed. - "rejected" means that the request is stopped, but note that the + Details about why the request is rejected or failed. + "rejected" means that the request is stopped, but note that the request will continue even after a "failure". "failure" only means that communication was stopped to that specific device, other device(s) (if any) will continue their readout. @@ -593,131 +593,131 @@ class XEP_0323(BasePlugin): session -- Session identifier. Client can use this as a reference to cancel the request. """ - iq = self.xmpp.Iq(); - iq['from'] = from_jid; - iq['to'] = to_jid; - iq['type'] = "get"; - seqnr = self._get_new_seqnr(); - iq['id'] = seqnr; - iq['req']['seqnr'] = seqnr; + iq = self.xmpp.Iq() + iq['from'] = from_jid + iq['to'] = to_jid + iq['type'] = "get" + seqnr = self._get_new_seqnr() + iq['id'] = seqnr + iq['req']['seqnr'] = seqnr if nodeIds is not None: for nodeId in nodeIds: - iq['req'].add_node(nodeId); + iq['req'].add_node(nodeId) if fields is not None: for field in fields: - iq['req'].add_field(field); + iq['req'].add_field(field) - iq['req']._set_flags(flags); + iq['req']._set_flags(flags) - self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback}; - iq.send(block=False); + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback} + iq.send(block=False) - return seqnr; + return seqnr def cancel_request(self, session): - """ + """ Called on the client side to cancel a request for data readout. Composes a message with the cancellation and sends it to the device(s). - Does not block, the callback will be called when cancellation is + Does not block, the callback will be called when cancellation is confirmed. - + Arguments: session -- The session id of the request to cancel """ seqnr = session - iq = self.xmpp.Iq(); + iq = self.xmpp.Iq() iq['from'] = self.sessions[seqnr]['from'] - iq['to'] = self.sessions[seqnr]['to']; - iq['type'] = "get"; - iq['id'] = seqnr; - iq['cancel']['seqnr'] = seqnr; - iq.send(block=False); + iq['to'] = self.sessions[seqnr]['to'] + iq['type'] = "get" + iq['id'] = seqnr + iq['cancel']['seqnr'] = seqnr + iq.send(block=False) def _get_new_seqnr(self): """ Returns a unique sequence number (unique across threads) """ - self.seqnr_lock.acquire(); - self.last_seqnr = self.last_seqnr + 1; - self.seqnr_lock.release(); - return str(self.last_seqnr); + self.seqnr_lock.acquire() + self.last_seqnr += 1 + self.seqnr_lock.release() + return str(self.last_seqnr) def _handle_event_accepted(self, iq): """ Received Iq with accepted - request was accepted """ - seqnr = iq['accepted']['seqnr']; + seqnr = iq['accepted']['seqnr'] result = "accepted" if iq['accepted']['queued'] == 'true': result = "queued" - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=iq['from'], result=result); + callback = self.sessions[seqnr]["callback"] + callback(from_jid=iq['from'], result=result) def _handle_event_rejected(self, iq): - """ Received Iq with rejected - this is a reject. + """ Received Iq with rejected - this is a reject. Delete the session. """ - seqnr = iq['rejected']['seqnr']; - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']); + seqnr = iq['rejected']['seqnr'] + callback = self.sessions[seqnr]["callback"] + callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']) # Session terminated - del self.sessions[seqnr]; + del self.sessions[seqnr] def _handle_event_cancelled(self, iq): - """ - Received Iq with cancelled - this is a cancel confirm. - Delete the session. + """ + Received Iq with cancelled - this is a cancel confirm. + Delete the session. """ #print("Got cancelled") - seqnr = iq['cancelled']['seqnr']; - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=iq['from'], result="cancelled"); + seqnr = iq['cancelled']['seqnr'] + callback = self.sessions[seqnr]["callback"] + callback(from_jid=iq['from'], result="cancelled") # Session cancelled - del self.sessions[seqnr]; + del self.sessions[seqnr] def _handle_event_fields(self, msg): - """ + """ Received Msg with fields - this is a data reponse to a request. If this is the last data block, issue a "done" callback. """ - seqnr = msg['fields']['seqnr']; - callback = self.sessions[seqnr]["callback"]; + seqnr = msg['fields']['seqnr'] + callback = self.sessions[seqnr]["callback"] for node in msg['fields']['nodes']: for ts in node['timestamps']: - fields = []; + fields = [] for d in ts['datas']: - field_block = {}; - field_block["name"] = d['name']; - field_block["typename"] = d._get_typename(); - field_block["value"] = d['value']; + field_block = {} + field_block["name"] = d['name'] + field_block["typename"] = d._get_typename() + field_block["value"] = d['value'] if not d['unit'] == "": field_block["unit"] = d['unit']; if not d['dataType'] == "": field_block["dataType"] = d['dataType']; - flags = d._get_flags(); + flags = d._get_flags() if not len(flags) == 0: - field_block["flags"] = flags; - fields.append(field_block); + field_block["flags"] = flags + fields.append(field_block) + + callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields) - callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields); - if msg['fields']['done'] == "true": - callback(from_jid=msg['from'], result="done"); + callback(from_jid=msg['from'], result="done") # Session done - del self.sessions[seqnr]; + del self.sessions[seqnr] def _handle_event_failure(self, msg): - """ + """ Received Msg with failure - our request failed - Delete the session. + Delete the session. """ - seqnr = msg['failure']['seqnr']; - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']); + seqnr = msg['failure']['seqnr'] + callback = self.sessions[seqnr]["callback"] + callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']) # Session failed - del self.sessions[seqnr]; + del self.sessions[seqnr] def _handle_event_started(self, msg): - """ - Received Msg with started - our request was queued and is now started. """ - seqnr = msg['started']['seqnr']; - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=msg['from'], result="started"); - + Received Msg with started - our request was queued and is now started. + """ + seqnr = msg['started']['seqnr'] + callback = self.sessions[seqnr]["callback"] + callback(from_jid=msg['from'], result="started") + diff --git a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py index a11c3e94..e8718161 100644 --- a/sleekxmpp/plugins/xep_0323/stanza/sensordata.py +++ b/sleekxmpp/plugins/xep_0323/stanza/sensordata.py @@ -20,14 +20,14 @@ class Sensordata(ElementBase): interfaces = set(tuple()) class FieldTypes(): - """ + """ All field types are optional booleans that default to False """ field_types = set([ 'momentary','peak','status','computed','identity','historicalSecond','historicalMinute','historicalHour', \ 'historicalDay','historicalWeek','historicalMonth','historicalQuarter','historicalYear','historicalOther']) class FieldStatus(): - """ + """ All field statuses are optional booleans that default to False """ field_status = set([ 'missing','automaticEstimate','manualEstimate','manualReadout','automaticReadout','timeOffset','warning','error', \ @@ -38,12 +38,12 @@ class Request(ElementBase): name = 'req' plugin_attrib = name interfaces = set(['seqnr','nodes','fields','serviceToken','deviceToken','userToken','from','to','when','historical','all']) - interfaces.update(FieldTypes.field_types); - _flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']); - _flags.update(FieldTypes.field_types); - + interfaces.update(FieldTypes.field_types) + _flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']) + _flags.update(FieldTypes.field_types) + def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._nodes = set() self._fields = set() @@ -64,27 +64,27 @@ class Request(ElementBase): def _get_flags(self): """ - Helper function for getting of flags. Returns all flags in - dictionary format: { "flag name": "flag value" ... } + Helper function for getting of flags. Returns all flags in + dictionary format: { "flag name": "flag value" ... } """ - flags = {}; + flags = {} for f in self._flags: if not self[f] == "": - flags[f] = self[f]; - return flags; + flags[f] = self[f] + return flags def _set_flags(self, flags): """ - Helper function for setting of flags. + Helper function for setting of flags. Arguments: - flags -- Flags in dictionary format: { "flag name": "flag value" ... } + flags -- Flags in dictionary format: { "flag name": "flag value" ... } """ for f in self._flags: if flags is not None and f in flags: - self[f] = flags[f]; + self[f] = flags[f] else: - self[f] = None; + self[f] = None def add_node(self, nodeId, sourceId=None, cacheType=None): """ @@ -94,7 +94,7 @@ class Request(ElementBase): Arguments: nodeId -- The ID for the node. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ if nodeId not in self._nodes: self._nodes.add((nodeId)) @@ -269,7 +269,7 @@ class Error(ElementBase): :param value: string """ - self.xml.text = value; + self.xml.text = value return self def del_text(self): @@ -292,7 +292,7 @@ class Fields(ElementBase): interfaces = set(['seqnr','done','nodes']) def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._nodes = set() def setup(self, xml=None): @@ -318,7 +318,7 @@ class Fields(ElementBase): Arguments: nodeId -- The ID for the node. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ if nodeId not in self._nodes: self._nodes.add((nodeId)) @@ -392,7 +392,7 @@ class FieldsNode(ElementBase): interfaces = set(['nodeId','sourceId','cacheType','timestamps']) def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._timestamps = set() def setup(self, xml=None): @@ -411,7 +411,7 @@ class FieldsNode(ElementBase): def add_timestamp(self, timestamp, substanzas=None): """ - Add a new timestamp element. + Add a new timestamp element. Arguments: timestamp -- The timestamp in ISO format. @@ -423,7 +423,7 @@ class FieldsNode(ElementBase): ts = Timestamp(parent=self) ts['value'] = timestamp if not substanzas is None: - ts.set_datas(substanzas); + ts.set_datas(substanzas) #print("add_timestamp with substanzas: " + str(substanzas)) self.iterables.append(ts) #print(str(id(self)) + " added_timestamp: " + str(id(ts))) @@ -485,7 +485,7 @@ class FieldsNode(ElementBase): self.iterables.remove(timestamp) class Field(ElementBase): - """ + """ Field element in response Timestamp. This is a base class, all instances of fields added to Timestamp must be of types: DataNumeric @@ -494,17 +494,17 @@ class Field(ElementBase): DataDateTime DataTimeSpan DataEnum - """ + """ namespace = 'urn:xmpp:iot:sensordata' name = 'field' plugin_attrib = name - interfaces = set(['name','module','stringIds']); - interfaces.update(FieldTypes.field_types); - interfaces.update(FieldStatus.field_status); + interfaces = set(['name','module','stringIds']) + interfaces.update(FieldTypes.field_types) + interfaces.update(FieldStatus.field_status) - _flags = set(); - _flags.update(FieldTypes.field_types); - _flags.update(FieldStatus.field_status); + _flags = set() + _flags.update(FieldTypes.field_types) + _flags.update(FieldStatus.field_status) def set_stringIds(self, value): """Verifies stringIds according to regexp from specification XMPP-0323. @@ -514,7 +514,7 @@ class Field(ElementBase): pattern = re.compile("^\d+([|]\w+([.]\w+)*([|][^,]*)?)?(,\d+([|]\w+([.]\w+)*([|][^,]*)?)?)*$") if pattern.match(value) is not None: - self.xml.stringIds = value; + self.xml.stringIds = value else: # Bad content, add nothing pass @@ -523,30 +523,30 @@ class Field(ElementBase): def _get_flags(self): """ - Helper function for getting of flags. Returns all flags in - dictionary format: { "flag name": "flag value" ... } + Helper function for getting of flags. Returns all flags in + dictionary format: { "flag name": "flag value" ... } """ - flags = {}; + flags = {} for f in self._flags: if not self[f] == "": - flags[f] = self[f]; - return flags; + flags[f] = self[f] + return flags def _set_flags(self, flags): """ - Helper function for setting of flags. + Helper function for setting of flags. Arguments: - flags -- Flags in dictionary format: { "flag name": "flag value" ... } + flags -- Flags in dictionary format: { "flag name": "flag value" ... } """ for f in self._flags: if flags is not None and f in flags: - self[f] = flags[f]; + self[f] = flags[f] else: - self[f] = None; + self[f] = None def _get_typename(self): - return "invalid type, use subclasses!"; + return "invalid type, use subclasses!" class Timestamp(ElementBase): @@ -557,7 +557,7 @@ class Timestamp(ElementBase): interfaces = set(['value','datas']) def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._datas = set() def setup(self, xml=None): @@ -576,7 +576,7 @@ class Timestamp(ElementBase): def add_data(self, typename, name, value, module=None, stringIds=None, unit=None, dataType=None, flags=None): """ - Add a new data element. + Add a new data element. Arguments: typename -- The type of data element (numeric, string, boolean, dateTime, timeSpan or enum) @@ -587,29 +587,29 @@ class Timestamp(ElementBase): dataType -- [optional] The dataType. Only applicable for type enum """ if name not in self._datas: - dataObj = None; + dataObj = None if typename == "numeric": - dataObj = DataNumeric(parent=self); - dataObj['unit'] = unit; + dataObj = DataNumeric(parent=self) + dataObj['unit'] = unit elif typename == "string": - dataObj = DataString(parent=self); + dataObj = DataString(parent=self) elif typename == "boolean": - dataObj = DataBoolean(parent=self); + dataObj = DataBoolean(parent=self) elif typename == "dateTime": - dataObj = DataDateTime(parent=self); + dataObj = DataDateTime(parent=self) elif typename == "timeSpan": - dataObj = DataTimeSpan(parent=self); + dataObj = DataTimeSpan(parent=self) elif typename == "enum": - dataObj = DataEnum(parent=self); - dataObj['dataType'] = dataType; + dataObj = DataEnum(parent=self) + dataObj['dataType'] = dataType - dataObj['name'] = name; - dataObj['value'] = value; - dataObj['module'] = module; - dataObj['stringIds'] = stringIds; + dataObj['name'] = name + dataObj['value'] = value + dataObj['module'] = module + dataObj['stringIds'] = stringIds if flags is not None: - dataObj._set_flags(flags); + dataObj._set_flags(flags) self._datas.add(name) self.iterables.append(dataObj) @@ -661,87 +661,87 @@ class Timestamp(ElementBase): self.iterables.remove(data) class DataNumeric(Field): - """ - Field data of type numeric. - Note that the value is expressed as a string. + """ + Field data of type numeric. + Note that the value is expressed as a string. """ namespace = 'urn:xmpp:iot:sensordata' name = 'numeric' plugin_attrib = name - interfaces = set(['value', 'unit']); - interfaces.update(Field.interfaces); + interfaces = set(['value', 'unit']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "numeric" + return "numeric" class DataString(Field): - """ - Field data of type string + """ + Field data of type string """ namespace = 'urn:xmpp:iot:sensordata' name = 'string' plugin_attrib = name - interfaces = set(['value']); - interfaces.update(Field.interfaces); + interfaces = set(['value']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "string" + return "string" class DataBoolean(Field): - """ + """ Field data of type boolean. - Note that the value is expressed as a string. + Note that the value is expressed as a string. """ namespace = 'urn:xmpp:iot:sensordata' name = 'boolean' plugin_attrib = name - interfaces = set(['value']); - interfaces.update(Field.interfaces); + interfaces = set(['value']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "boolean" + return "boolean" class DataDateTime(Field): - """ + """ Field data of type dateTime. - Note that the value is expressed as a string. + Note that the value is expressed as a string. """ namespace = 'urn:xmpp:iot:sensordata' name = 'dateTime' plugin_attrib = name - interfaces = set(['value']); - interfaces.update(Field.interfaces); + interfaces = set(['value']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "dateTime" + return "dateTime" class DataTimeSpan(Field): - """ + """ Field data of type timeSpan. - Note that the value is expressed as a string. + Note that the value is expressed as a string. """ namespace = 'urn:xmpp:iot:sensordata' name = 'timeSpan' plugin_attrib = name - interfaces = set(['value']); - interfaces.update(Field.interfaces); + interfaces = set(['value']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "timeSpan" + return "timeSpan" class DataEnum(Field): - """ + """ Field data of type enum. - Note that the value is expressed as a string. + Note that the value is expressed as a string. """ namespace = 'urn:xmpp:iot:sensordata' name = 'enum' plugin_attrib = name - interfaces = set(['value', 'dataType']); - interfaces.update(Field.interfaces); + interfaces = set(['value', 'dataType']) + interfaces.update(Field.interfaces) def _get_typename(self): - return "enum" + return "enum" class Done(ElementBase): """ Done element used to signal that all data has been transferred """ diff --git a/sleekxmpp/plugins/xep_0323/timerreset.py b/sleekxmpp/plugins/xep_0323/timerreset.py index 578f1efe..398b47c1 100644 --- a/sleekxmpp/plugins/xep_0323/timerreset.py +++ b/sleekxmpp/plugins/xep_0323/timerreset.py @@ -23,7 +23,12 @@ class _TimerReset(Thread): t.cancel() # stop the timer's action if it's still waiting """ - def __init__(self, interval, function, args=[], kwargs={}): + def __init__(self, interval, function, args=None, kwargs=None): + if not kwargs: + kwargs = {} + if not args: + args = [] + Thread.__init__(self) self.interval = interval self.function = function diff --git a/sleekxmpp/plugins/xep_0325/control.py b/sleekxmpp/plugins/xep_0325/control.py index e34eb2c2..11e7a045 100644 --- a/sleekxmpp/plugins/xep_0325/control.py +++ b/sleekxmpp/plugins/xep_0325/control.py @@ -12,7 +12,6 @@ import logging import time from threading import Thread, Timer, Lock -from sleekxmpp.xmlstream import JID from sleekxmpp.xmlstream.handler import Callback from sleekxmpp.xmlstream.matcher import StanzaPath from sleekxmpp.plugins.base import BasePlugin @@ -26,16 +25,16 @@ log = logging.getLogger(__name__) class XEP_0325(BasePlugin): """ - XEP-0325: IoT Control + XEP-0325: IoT Control - Actuators are devices in sensor networks that can be controlled through - the network and act with the outside world. In sensor networks and - Internet of Things applications, actuators make it possible to automate - real-world processes. - This plugin implements a mechanism whereby actuators can be controlled - in XMPP-based sensor networks, making it possible to integrate sensors - and actuators of different brands, makes and models into larger + Actuators are devices in sensor networks that can be controlled through + the network and act with the outside world. In sensor networks and + Internet of Things applications, actuators make it possible to automate + real-world processes. + This plugin implements a mechanism whereby actuators can be controlled + in XMPP-based sensor networks, making it possible to integrate sensors + and actuators of different brands, makes and models into larger Internet of Things applications. Also see <http://xmpp.org/extensions/xep-0325.html> @@ -52,9 +51,9 @@ class XEP_0325(BasePlugin): Client side ----------- - Control Event:SetResponse -- Received a response to a + Control Event:SetResponse -- Received a response to a control request, type result - Control Event:SetResponseError -- Received a response to a + Control Event:SetResponseError -- Received a response to a control request, type error Attributes: @@ -65,7 +64,7 @@ class XEP_0325(BasePlugin): relevant to a request's session. This dictionary is used both by the client and sensor side. On client side, seqnr is used as key, while on sensor side, a session_id is used - as key. This ensures that the two will not collide, so + as key. This ensures that the two will not collide, so one instance can be both client and sensor. Sensor side ----------- @@ -85,15 +84,15 @@ class XEP_0325(BasePlugin): Sensor side ----------- - register_node -- Register a sensor as available from this XMPP + register_node -- Register a sensor as available from this XMPP instance. Client side ----------- - set_request -- Initiates a control request to modify data in + set_request -- Initiates a control request to modify data in sensor(s). Non-blocking, a callback function will be called when the sensor has responded. - set_command -- Initiates a control command to modify data in + set_command -- Initiates a control command to modify data in sensor(s). Non-blocking. The sensor(s) will not respond regardless of the result of the command, so no callback is made. @@ -102,7 +101,7 @@ class XEP_0325(BasePlugin): name = 'xep_0325' description = 'XEP-0325 Internet of Things - Control' - dependencies = set(['xep_0030']) + dependencies = set(['xep_0030']) stanza = stanza @@ -135,11 +134,11 @@ class XEP_0325(BasePlugin): self._handle_set_response)) # Server side dicts - self.nodes = {}; - self.sessions = {}; + self.nodes = {} + self.sessions = {} - self.last_seqnr = 0; - self.seqnr_lock = Lock(); + self.last_seqnr = 0 + self.seqnr_lock = Lock() ## For testning only self.test_authenticated_from = "" @@ -156,13 +155,13 @@ class XEP_0325(BasePlugin): def plugin_end(self): """ Stop the XEP-0325 plugin """ - self.sessions.clear(); + self.sessions.clear() self.xmpp.remove_handler('Control Event:DirectSet') self.xmpp.remove_handler('Control Event:SetReq') self.xmpp.remove_handler('Control Event:SetResponse') self.xmpp.remove_handler('Control Event:SetResponseError') self.xmpp['xep_0030'].del_feature(feature=Control.namespace) - self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()); + self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple()) # ================================================================= @@ -170,10 +169,10 @@ class XEP_0325(BasePlugin): def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None): """ - Register a sensor/device as available for control requests/commands - through this XMPP instance. + Register a sensor/device as available for control requests/commands + through this XMPP instance. - The device object may by any custom implementation to support + The device object may by any custom implementation to support specific devices, but it must implement the functions: has_control_field set_control_fields @@ -185,30 +184,30 @@ class XEP_0325(BasePlugin): commTimeout -- Time in seconds to wait between each callback from device during a data readout. Float. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ - self.nodes[nodeId] = {"device": device, + self.nodes[nodeId] = {"device": device, "commTimeout": commTimeout, - "sourceId": sourceId, - "cacheType": cacheType}; + "sourceId": sourceId, + "cacheType": cacheType} def _set_authenticated(self, auth=''): """ Internal testing function """ - self.test_authenticated_from = auth; + self.test_authenticated_from = auth def _get_new_seqnr(self): """ Returns a unique sequence number (unique across threads) """ - self.seqnr_lock.acquire(); - self.last_seqnr = self.last_seqnr + 1; - self.seqnr_lock.release(); - return str(self.last_seqnr); + self.seqnr_lock.acquire() + self.last_seqnr += 1 + self.seqnr_lock.release() + return str(self.last_seqnr) def _handle_set_req(self, iq): """ - Event handler for reception of an Iq with set req - this is a + Event handler for reception of an Iq with set req - this is a control request. - Verifies that + Verifies that - all the requested nodes are available (if no nodes are specified in the request, assume all nodes) - all the control fields are available from all requested nodes @@ -216,80 +215,79 @@ class XEP_0325(BasePlugin): If the request passes verification, the control request is passed to the devices (in a separate thread). - If the verification fails, a setResponse with error indication + If the verification fails, a setResponse with error indication is sent. """ - error_msg = ''; - req_ok = True; - missing_node = None; - missing_field = None; + error_msg = '' + req_ok = True + missing_node = None + missing_field = None # Authentication if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from: # Invalid authentication - req_ok = False; - error_msg = "Access denied"; + req_ok = False + error_msg = "Access denied" # Nodes - process_nodes = []; if len(iq['set']['nodes']) > 0: for n in iq['set']['nodes']: if not n['nodeId'] in self.nodes: - req_ok = False; - missing_node = n['nodeId']; - error_msg = "Invalid nodeId " + n['nodeId']; - process_nodes = [n['nodeId'] for n in iq['set']['nodes']]; + req_ok = False + missing_node = n['nodeId'] + error_msg = "Invalid nodeId " + n['nodeId'] + process_nodes = [n['nodeId'] for n in iq['set']['nodes']] else: - process_nodes = self.nodes.keys(); + process_nodes = self.nodes.keys() # Fields - for control we need to find all in all devices, otherwise we reject - process_fields = []; + process_fields = [] if len(iq['set']['datas']) > 0: for f in iq['set']['datas']: for node in self.nodes: if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): - req_ok = False; - missing_field = f['name']; - error_msg = "Invalid field " + f['name']; - break; - process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']]; + req_ok = False + missing_field = f['name'] + error_msg = "Invalid field " + f['name'] + break + process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']] if req_ok: - session = self._new_session(); - self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']}; - self.sessions[session]["commTimers"] = {}; - self.sessions[session]["nodeDone"] = {}; + session = self._new_session() + self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']} + self.sessions[session]["commTimers"] = {} + self.sessions[session]["nodeDone"] = {} # Flag that a reply is exected when we are done - self.sessions[session]["reply"] = True; + self.sessions[session]["reply"] = True - self.sessions[session]["node_list"] = process_nodes; + self.sessions[session]["node_list"] = process_nodes if self.threaded: #print("starting thread") tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) tr_req.start() #print("started thread") else: - self._threaded_node_request(session, process_fields); + self._threaded_node_request(session, process_fields) else: - iq.reply(); - iq['type'] = 'error'; - iq['setResponse']['responseCode'] = "NotFound"; + iq.reply() + iq['type'] = 'error' + iq['setResponse']['responseCode'] = "NotFound" if missing_node is not None: - iq['setResponse'].add_node(missing_node); + iq['setResponse'].add_node(missing_node) if missing_field is not None: - iq['setResponse'].add_data(missing_field); - iq['setResponse']['error']['var'] = "Output"; - iq['setResponse']['error']['text'] = error_msg; - iq.send(block=False); + iq['setResponse'].add_data(missing_field) + iq['setResponse']['error']['var'] = "Output" + iq['setResponse']['error']['text'] = error_msg + iq.send(block=False) def _handle_direct_set(self, msg): """ - Event handler for reception of a Message with set command - this is a + Event handler for reception of a Message with set command - this is a direct control command. - Verifies that + Verifies that - all the requested nodes are available (if no nodes are specified in the request, assume all nodes) - all the control fields are available from all requested nodes @@ -299,73 +297,72 @@ class XEP_0325(BasePlugin): to the devices (in a separate thread). If the verification fails, do nothing. """ - req_ok = True; + req_ok = True # Nodes - process_nodes = []; if len(msg['set']['nodes']) > 0: for n in msg['set']['nodes']: if not n['nodeId'] in self.nodes: - req_ok = False; - error_msg = "Invalid nodeId " + n['nodeId']; - process_nodes = [n['nodeId'] for n in msg['set']['nodes']]; + req_ok = False + error_msg = "Invalid nodeId " + n['nodeId'] + process_nodes = [n['nodeId'] for n in msg['set']['nodes']] else: - process_nodes = self.nodes.keys(); + process_nodes = self.nodes.keys() # Fields - for control we need to find all in all devices, otherwise we reject - process_fields = []; + process_fields = [] if len(msg['set']['datas']) > 0: for f in msg['set']['datas']: for node in self.nodes: if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()): - req_ok = False; - missing_field = f['name']; - error_msg = "Invalid field " + f['name']; - break; - process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']]; + req_ok = False + missing_field = f['name'] + error_msg = "Invalid field " + f['name'] + break + process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']] if req_ok: - session = self._new_session(); - self.sessions[session] = {"from": msg['from'], "to": msg['to']}; - self.sessions[session]["commTimers"] = {}; - self.sessions[session]["nodeDone"] = {}; - self.sessions[session]["reply"] = False; + session = self._new_session() + self.sessions[session] = {"from": msg['from'], "to": msg['to']} + self.sessions[session]["commTimers"] = {} + self.sessions[session]["nodeDone"] = {} + self.sessions[session]["reply"] = False - self.sessions[session]["node_list"] = process_nodes; + self.sessions[session]["node_list"] = process_nodes if self.threaded: #print("starting thread") tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) tr_req.start() #print("started thread") else: - self._threaded_node_request(session, process_fields); + self._threaded_node_request(session, process_fields) def _threaded_node_request(self, session, process_fields): - """ + """ Helper function to handle the device control in a separate thread. - + Arguments: session -- The request session id process_fields -- The fields to set in the devices. List of tuple format: (name, datatype, value) """ for node in self.sessions[session]["node_list"]: - self.sessions[session]["nodeDone"][node] = False; + self.sessions[session]["nodeDone"][node] = False for node in self.sessions[session]["node_list"]: - timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)); - self.sessions[session]["commTimers"][node] = timer; - timer.start(); - self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback); + timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)) + self.sessions[session]["commTimers"][node] = timer + timer.start() + self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback) def _event_comm_timeout(self, session, nodeId): - """ + """ Triggered if any of the control operations timeout. Stop communicating with the failing device. - If the control command was an Iq request, sends a failure - message back to the client. - + If the control command was an Iq request, sends a failure + message back to the client. + Arguments: session -- The request session id nodeId -- The id of the device which timed out @@ -373,51 +370,51 @@ class XEP_0325(BasePlugin): if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "error"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OtherError"; - iq['setResponse'].add_node(nodeId); - iq['setResponse']['error']['var'] = "Output"; - iq['setResponse']['error']['text'] = "Timeout."; - iq.send(block=False); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "error" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OtherError" + iq['setResponse'].add_node(nodeId) + iq['setResponse']['error']['var'] = "Output" + iq['setResponse']['error']['text'] = "Timeout." + iq.send(block=False) ## TODO - should we send one timeout per node?? # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] def _all_nodes_done(self, session): - """ + """ Checks wheter all devices are done replying to the control command. - + Arguments: session -- The request session id """ for n in self.sessions[session]["nodeDone"]: if not self.sessions[session]["nodeDone"][n]: - return False; - return True; + return False + return True def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None): - """ - Callback function called by the devices when the control command is + """ + Callback function called by the devices when the control command is complete or failed. - If needed, composes a message with the result and sends it back to the + If needed, composes a message with the result and sends it back to the client. - + Arguments: session -- The request session id nodeId -- The device id which initiated the callback result -- The current result status of the control command. Valid values are: "error" - Set fields failed. "ok" - All fields were set. - error_field -- [optional] Only applies when result == "error" + error_field -- [optional] Only applies when result == "error" The field name that failed (usually means it is missing) error_msg -- [optional] Only applies when result == "error". Error details when a request failed. @@ -428,62 +425,62 @@ class XEP_0325(BasePlugin): return if result == "error": - self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["commTimers"][nodeId].cancel() if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "error"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OtherError"; - iq['setResponse'].add_node(nodeId); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "error" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OtherError" + iq['setResponse'].add_node(nodeId) if error_field is not None: - iq['setResponse'].add_data(error_field); - iq['setResponse']['error']['var'] = error_field; - iq['setResponse']['error']['text'] = error_msg; - iq.send(block=False); + iq['setResponse'].add_data(error_field) + iq['setResponse']['error']['var'] = error_field + iq['setResponse']['error']['text'] = error_msg + iq.send(block=False) # Drop communication with this device and check if we are done - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] else: - self.sessions[session]["commTimers"][nodeId].cancel(); + self.sessions[session]["commTimers"][nodeId].cancel() - self.sessions[session]["nodeDone"][nodeId] = True; + self.sessions[session]["nodeDone"][nodeId] = True if (self._all_nodes_done(session)): if self.sessions[session]["reply"]: # Reply is exected when we are done - iq = self.xmpp.Iq(); - iq['from'] = self.sessions[session]['to']; - iq['to'] = self.sessions[session]['from']; - iq['type'] = "result"; - iq['id'] = self.sessions[session]['seqnr']; - iq['setResponse']['responseCode'] = "OK"; - iq.send(block=False); + iq = self.xmpp.Iq() + iq['from'] = self.sessions[session]['to'] + iq['to'] = self.sessions[session]['from'] + iq['type'] = "result" + iq['id'] = self.sessions[session]['seqnr'] + iq['setResponse']['responseCode'] = "OK" + iq.send(block=False) # The session is complete, delete it - del self.sessions[session]; + del self.sessions[session] # ================================================================= # Client side (data controller) API def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None): - """ + """ Called on the client side to initiade a control request. Composes a message with the request and sends it to the device(s). - Does not block, the callback will be called when the device(s) + Does not block, the callback will be called when the device(s) has responded. - + Arguments: from_jid -- The jid of the requester to_jid -- The jid of the device(s) - callback -- The callback function to call when data is availble. - + callback -- The callback function to call when data is availble. + The callback function must support the following arguments: from_jid -- The jid of the responding device(s) @@ -494,46 +491,46 @@ class XEP_0325(BasePlugin): "Locked" - Field(s) is locked and cannot be changed at the moment. "NotImplemented" - Request feature not implemented. - "FormError" - Error while setting with + "FormError" - Error while setting with a form (not implemented). - "OtherError" - Indicates other types of - errors, such as timeout. + "OtherError" - Indicates other types of + errors, such as timeout. Details in the error_msg. - - nodeId -- [optional] Only applicable when result == "error" - List of node Ids of failing device(s). - fields -- [optional] Only applicable when result == "error" + nodeId -- [optional] Only applicable when result == "error" + List of node Ids of failing device(s). + + fields -- [optional] Only applicable when result == "error" List of fields that failed.[optional] Mandatory when result == "rejected" or "failure". - - error_msg -- Details about why the request failed. + + error_msg -- Details about why the request failed. fields -- Fields to set. List of tuple format: (name, typename, value). nodeIds -- [optional] Limits the request to the node Ids in this list. """ - iq = self.xmpp.Iq(); - iq['from'] = from_jid; - iq['to'] = to_jid; - seqnr = self._get_new_seqnr(); - iq['id'] = seqnr; - iq['type'] = "set"; + iq = self.xmpp.Iq() + iq['from'] = from_jid + iq['to'] = to_jid + seqnr = self._get_new_seqnr() + iq['id'] = seqnr + iq['type'] = "set" if nodeIds is not None: for nodeId in nodeIds: - iq['set'].add_node(nodeId); + iq['set'].add_node(nodeId) if fields is not None: for name, typename, value in fields: - iq['set'].add_data(name=name, typename=typename, value=value); + iq['set'].add_data(name=name, typename=typename, value=value) - self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback}; - iq.send(block=False); + self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback} + iq.send(block=False) def set_command(self, from_jid, to_jid, fields, nodeIds=None): - """ + """ Called on the client side to initiade a control command. Composes a message with the set commandand sends it to the device(s). Does not block. Device(s) will not respond, regardless of result. - + Arguments: from_jid -- The jid of the requester to_jid -- The jid of the device(s) @@ -541,34 +538,32 @@ class XEP_0325(BasePlugin): fields -- Fields to set. List of tuple format: (name, typename, value). nodeIds -- [optional] Limits the request to the node Ids in this list. """ - msg = self.xmpp.Message(); - msg['from'] = from_jid; - msg['to'] = to_jid; - msg['type'] = "set"; + msg = self.xmpp.Message() + msg['from'] = from_jid + msg['to'] = to_jid + msg['type'] = "set" if nodeIds is not None: for nodeId in nodeIds: - msg['set'].add_node(nodeId); + msg['set'].add_node(nodeId) if fields is not None: for name, typename, value in fields: - msg['set'].add_data(name, typename, value); + msg['set'].add_data(name, typename, value) # We won't get any reply, so don't create a session - msg.send(); + msg.send() def _handle_set_response(self, iq): """ Received response from device(s) """ #print("ooh") - seqnr = iq['id']; - from_jid = str(iq['from']); - result = iq['setResponse']['responseCode']; - nodeIds = [n['name'] for n in iq['setResponse']['nodes']]; - fields = [f['name'] for f in iq['setResponse']['datas']]; - error_msg = None; + seqnr = iq['id'] + from_jid = str(iq['from']) + result = iq['setResponse']['responseCode'] + nodeIds = [n['name'] for n in iq['setResponse']['nodes']] + fields = [f['name'] for f in iq['setResponse']['datas']] + error_msg = None if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "": - error_msg = iq['setResponse']['error']['text']; - - callback = self.sessions[seqnr]["callback"]; - callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg); + error_msg = iq['setResponse']['error']['text'] - + callback = self.sessions[seqnr]["callback"] + callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg) diff --git a/sleekxmpp/plugins/xep_0325/device.py b/sleekxmpp/plugins/xep_0325/device.py index a60d5f9a..f1ed0733 100644 --- a/sleekxmpp/plugins/xep_0325/device.py +++ b/sleekxmpp/plugins/xep_0325/device.py @@ -13,16 +13,16 @@ import datetime class Device(object): """ Example implementation of a device control object. - - The device object may by any custom implementation to support + + The device object may by any custom implementation to support specific devices, but it must implement the functions: has_control_field set_control_fields """ def __init__(self, nodeId): - self.nodeId = nodeId; - self.control_fields = {}; + self.nodeId = nodeId + self.control_fields = {} def has_control_field(self, field, typename): """ @@ -30,12 +30,12 @@ class Device(object): and the type matches for control in this device. Arguments: - field -- The field name + field -- The field name typename -- The expected type """ if field in self.control_fields and self.control_fields[field]["type"] == typename: - return True; - return False; + return True + return False def set_control_fields(self, fields, session, callback): """ @@ -43,22 +43,22 @@ class Device(object): sets the data and (if needed) and calls the callback. Arguments: - fields -- List of control fields in tuple format: + fields -- List of control fields in tuple format: (name, typename, value) session -- Session id, only used in the callback as identifier callback -- Callback function to call when control set is complete. The callback function must support the following arguments: - session -- Session id, as supplied in the + session -- Session id, as supplied in the request_fields call nodeId -- Identifier for this device - result -- The current result status of the readout. + result -- The current result status of the readout. Valid values are: "error" - Set fields failed. "ok" - All fields were set. - error_field -- [optional] Only applies when result == "error" - The field name that failed + error_field -- [optional] Only applies when result == "error" + The field name that failed (usually means it is missing) error_msg -- [optional] Only applies when result == "error". Error details when a request failed. @@ -69,12 +69,12 @@ class Device(object): for name, typename, value in fields: if not self.has_control_field(name, typename): self._send_control_reject(session, name, "NotFound", callback) - return False; + return False for name, typename, value in fields: self._set_field_value(name, value) - callback(session, result="ok", nodeId=self.nodeId); + callback(session, result="ok", nodeId=self.nodeId) return True def _send_control_reject(self, session, field, message, callback): @@ -82,12 +82,12 @@ class Device(object): Sends a reject to the caller Arguments: - session -- Session id, see definition in + session -- Session id, see definition in set_control_fields function - callback -- Callback function, see definition in + callback -- Callback function, see definition in set_control_fields function """ - callback(session, result="error", nodeId=self.nodeId, error_field=field, error_msg=message); + callback(session, result="error", nodeId=self.nodeId, error_field=field, error_msg=message) def _add_control_field(self, name, typename, value): """ @@ -95,12 +95,12 @@ class Device(object): Arguments: name -- Name of the field - typename -- Type of the field, one of: - (boolean, color, string, date, dateTime, + typename -- Type of the field, one of: + (boolean, color, string, date, dateTime, double, duration, int, long, time) value -- Field value """ - self.control_fields[name] = {"type": typename, "value": value}; + self.control_fields[name] = {"type": typename, "value": value} def _set_field_value(self, name, value): """ @@ -111,7 +111,7 @@ class Device(object): value -- New value for the field """ if name in self.control_fields: - self.control_fields[name]["value"] = value; + self.control_fields[name]["value"] = value def _get_field_value(self, name): """ @@ -121,5 +121,5 @@ class Device(object): name -- Name of the field """ if name in self.control_fields: - return self.control_fields[name]["value"]; - return None; + return self.control_fields[name]["value"] + return None diff --git a/sleekxmpp/plugins/xep_0325/stanza/control.py b/sleekxmpp/plugins/xep_0325/stanza/control.py index 67107ecb..1fd5c35d 100644 --- a/sleekxmpp/plugins/xep_0325/stanza/control.py +++ b/sleekxmpp/plugins/xep_0325/stanza/control.py @@ -26,7 +26,7 @@ class ControlSet(ElementBase): interfaces = set(['nodes','datas']) def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._nodes = set() self._datas = set() @@ -53,7 +53,7 @@ class ControlSet(ElementBase): Arguments: nodeId -- The ID for the node. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ if nodeId not in self._nodes: self._nodes.add((nodeId)) @@ -117,40 +117,40 @@ class ControlSet(ElementBase): def add_data(self, name, typename, value): """ - Add a new data element. + Add a new data element. Arguments: name -- The name of the data element - typename -- The type of data element - (boolean, color, string, date, dateTime, + typename -- The type of data element + (boolean, color, string, date, dateTime, double, duration, int, long, time) value -- The value of the data element """ if name not in self._datas: - dataObj = None; + dataObj = None if typename == "boolean": - dataObj = BooleanParameter(parent=self); + dataObj = BooleanParameter(parent=self) elif typename == "color": - dataObj = ColorParameter(parent=self); + dataObj = ColorParameter(parent=self) elif typename == "string": - dataObj = StringParameter(parent=self); + dataObj = StringParameter(parent=self) elif typename == "date": - dataObj = DateParameter(parent=self); + dataObj = DateParameter(parent=self) elif typename == "dateTime": - dataObj = DateTimeParameter(parent=self); + dataObj = DateTimeParameter(parent=self) elif typename == "double": - dataObj = DoubleParameter(parent=self); + dataObj = DoubleParameter(parent=self) elif typename == "duration": - dataObj = DurationParameter(parent=self); + dataObj = DurationParameter(parent=self) elif typename == "int": - dataObj = IntParameter(parent=self); + dataObj = IntParameter(parent=self) elif typename == "long": - dataObj = LongParameter(parent=self); + dataObj = LongParameter(parent=self) elif typename == "time": - dataObj = TimeParameter(parent=self); + dataObj = TimeParameter(parent=self) - dataObj['name'] = name; - dataObj['value'] = value; + dataObj['name'] = name + dataObj['value'] = value self._datas.add(name) self.iterables.append(dataObj) @@ -217,7 +217,7 @@ class ControlSetResponse(ElementBase): interfaces = set(['responseCode']) def __init__(self, xml=None, parent=None): - ElementBase.__init__(self, xml, parent); + ElementBase.__init__(self, xml, parent) self._nodes = set() self._datas = set() @@ -244,7 +244,7 @@ class ControlSetResponse(ElementBase): Arguments: nodeId -- The ID for the node. sourceId -- [optional] identifying the data source controlling the device - cacheType -- [optional] narrowing down the search to a specific kind of node + cacheType -- [optional] narrowing down the search to a specific kind of node """ if nodeId not in self._nodes: self._nodes.add(nodeId) @@ -308,7 +308,7 @@ class ControlSetResponse(ElementBase): def add_data(self, name): """ - Add a new ResponseParameter element. + Add a new ResponseParameter element. Arguments: name -- Name of the parameter @@ -316,7 +316,7 @@ class ControlSetResponse(ElementBase): if name not in self._datas: self._datas.add(name) data = ResponseParameter(parent=self) - data['name'] = name; + data['name'] = name self.iterables.append(data) return data return None @@ -383,26 +383,26 @@ class Error(ElementBase): value -- string """ - self.xml.text = value; + self.xml.text = value return self def del_text(self): """Remove the contents inside the XML tag.""" self.xml.text = "" - return self + return self class ResponseParameter(ElementBase): - """ - Parameter element in ControlSetResponse. - """ + """ + Parameter element in ControlSetResponse. + """ namespace = 'urn:xmpp:iot:control' name = 'parameter' plugin_attrib = name - interfaces = set(['name']); + interfaces = set(['name']) class BaseParameter(ElementBase): - """ + """ Parameter element in SetCommand. This is a base class, all instances of parameters added to SetCommand must be of types: BooleanParameter @@ -415,90 +415,91 @@ class BaseParameter(ElementBase): IntParameter LongParameter TimeParameter - """ + """ namespace = 'urn:xmpp:iot:control' name = 'baseParameter' plugin_attrib = name - interfaces = set(['name','value']); + interfaces = set(['name','value']) def _get_typename(self): - return self.name; + return self.name + class BooleanParameter(BaseParameter): - """ - Field data of type boolean. - Note that the value is expressed as a string. + """ + Field data of type boolean. + Note that the value is expressed as a string. """ name = 'boolean' plugin_attrib = name class ColorParameter(BaseParameter): - """ - Field data of type color. - Note that the value is expressed as a string. + """ + Field data of type color. + Note that the value is expressed as a string. """ name = 'color' plugin_attrib = name class StringParameter(BaseParameter): - """ - Field data of type string. + """ + Field data of type string. """ name = 'string' plugin_attrib = name class DateParameter(BaseParameter): - """ - Field data of type date. - Note that the value is expressed as a string. + """ + Field data of type date. + Note that the value is expressed as a string. """ name = 'date' plugin_attrib = name class DateTimeParameter(BaseParameter): - """ - Field data of type dateTime. - Note that the value is expressed as a string. + """ + Field data of type dateTime. + Note that the value is expressed as a string. """ name = 'dateTime' plugin_attrib = name class DoubleParameter(BaseParameter): - """ - Field data of type double. - Note that the value is expressed as a string. + """ + Field data of type double. + Note that the value is expressed as a string. """ name = 'double' plugin_attrib = name class DurationParameter(BaseParameter): - """ - Field data of type duration. - Note that the value is expressed as a string. + """ + Field data of type duration. + Note that the value is expressed as a string. """ name = 'duration' plugin_attrib = name class IntParameter(BaseParameter): - """ - Field data of type int. - Note that the value is expressed as a string. + """ + Field data of type int. + Note that the value is expressed as a string. """ name = 'int' plugin_attrib = name class LongParameter(BaseParameter): - """ - Field data of type long (64-bit int). - Note that the value is expressed as a string. + """ + Field data of type long (64-bit int). + Note that the value is expressed as a string. """ name = 'long' plugin_attrib = name class TimeParameter(BaseParameter): - """ - Field data of type time. - Note that the value is expressed as a string. + """ + Field data of type time. + Note that the value is expressed as a string. """ name = 'time' plugin_attrib = name diff --git a/sleekxmpp/roster/single.py b/sleekxmpp/roster/single.py index f080ae8a..e9ce4f21 100644 --- a/sleekxmpp/roster/single.py +++ b/sleekxmpp/roster/single.py @@ -237,8 +237,7 @@ class RosterNode(object): if not self.xmpp.is_component: return self.update(jid, subscription='remove') - def update(self, jid, name=None, subscription=None, groups=[], - block=True, timeout=None, callback=None): + def update(self, jid, name=None, subscription=None, groups=None, block=True, timeout=None, callback=None): """ Update a JID's subscription information. @@ -258,6 +257,9 @@ class RosterNode(object): Will be executed when the roster is received. Implies block=False. """ + if not groups: + groups = [] + self[jid]['name'] = name self[jid]['groups'] = groups self[jid].save() diff --git a/sleekxmpp/stanza/atom.py b/sleekxmpp/stanza/atom.py index 244ef315..4e9591a5 100644 --- a/sleekxmpp/stanza/atom.py +++ b/sleekxmpp/stanza/atom.py @@ -6,8 +6,7 @@ See the file LICENSE for copying permission. """ -from sleekxmpp.xmlstream import ElementBase - +from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase class AtomEntry(ElementBase): @@ -22,5 +21,23 @@ class AtomEntry(ElementBase): namespace = 'http://www.w3.org/2005/Atom' name = 'entry' plugin_attrib = 'entry' - interfaces = set(('title', 'summary')) - sub_interfaces = set(('title', 'summary')) + interfaces = set(('title', 'summary', 'id', 'published', 'updated')) + sub_interfaces = set(('title', 'summary', 'id', 'published', + 'updated')) + +class AtomAuthor(ElementBase): + + """ + An Atom author. + + Stanza Interface: + name -- The printable author name + uri -- The bare jid of the author + """ + + name = 'author' + plugin_attrib = 'author' + interfaces = set(('name', 'uri')) + sub_interfaces = set(('name', 'uri')) + +register_stanza_plugin(AtomEntry, AtomAuthor) diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py index d28f77e2..e26f99ce 100644 --- a/sleekxmpp/test/sleektest.py +++ b/sleekxmpp/test/sleektest.py @@ -288,11 +288,8 @@ class SleekTest(unittest.TestCase): if self.xmpp: self.xmpp.socket.disconnect_error() - def stream_start(self, mode='client', skip=True, header=None, - socket='mock', jid='tester@localhost', - password='test', server='localhost', - port=5222, sasl_mech=None, - plugins=None, plugin_config={}): + def stream_start(self, mode='client', skip=True, header=None, socket='mock', jid='tester@localhost', + password='test', server='localhost', port=5222, sasl_mech=None, plugins=None, plugin_config=None): """ Initialize an XMPP client or component using a dummy XML stream. @@ -315,6 +312,9 @@ class SleekTest(unittest.TestCase): plugins -- List of plugins to register. By default, all plugins are loaded. """ + if not plugin_config: + plugin_config = {} + if mode == 'client': self.xmpp = ClientXMPP(jid, password, sasl_mech=sasl_mech, @@ -425,8 +425,7 @@ class SleekTest(unittest.TestCase): parts.append('xmlns="%s"' % default_ns) return header % ' '.join(parts) - def recv(self, data, defaults=[], method='exact', - use_values=True, timeout=1): + def recv(self, data, defaults=None, method='exact', use_values=True, timeout=1): """ Pass data to the dummy XMPP client as if it came from an XMPP server. @@ -447,6 +446,9 @@ class SleekTest(unittest.TestCase): timeout -- Time to wait in seconds for data to be received by a live connection. """ + if not defaults: + defaults = [] + if self.xmpp.socket.is_live: # we are working with a live connection, so we should # verify what has been received instead of simulating diff --git a/sleekxmpp/thirdparty/socks.py b/sleekxmpp/thirdparty/socks.py index 9239a7b9..34090d51 100644 --- a/sleekxmpp/thirdparty/socks.py +++ b/sleekxmpp/thirdparty/socks.py @@ -28,6 +28,9 @@ OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMANGE. This module provides a standard socket-like interface for Python for tunneling connections through SOCKS proxies. +""" + +""" Minor modifications made by Christopher Gilbert (http://motomastyle.com/) for use in PyLoris (http://pyloris.sourceforge.net/) @@ -35,10 +38,13 @@ for use in PyLoris (http://pyloris.sourceforge.net/) Minor modifications made by Mario Vilas (http://breakingcode.wordpress.com/) mainly to merge bug fixes found in Sourceforge +Minor modifications made by Eugene Dementiev (http://www.dementiev.eu/) + """ import socket import struct +import sys PROXY_TYPE_SOCKS4 = 1 PROXY_TYPE_SOCKS5 = 2 @@ -208,12 +214,12 @@ class socksocket(socket.socket): if self.__proxy[3]: # Resolve remotely ipaddr = None - req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + destaddr + req = req + chr(0x03).encode() + chr(len(destaddr)).encode() + destaddr.encode() else: # Resolve locally ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) req = req + chr(0x01).encode() + ipaddr - req = req + struct.pack(">H", destport) + req += struct.pack(">H", destport) self.sendall(req) # Get the response resp = self.__recvall(4) @@ -282,7 +288,7 @@ class socksocket(socket.socket): # The username parameter is considered userid for SOCKS4 if self.__proxy[4] != None: req = req + self.__proxy[4] - req = req + chr(0x00).encode() + req += chr(0x00).encode() # DNS name if remote resolving is required # NOTE: This is actually an extension to the SOCKS4 protocol # called SOCKS4A and may not be supported in all cases. @@ -323,7 +329,10 @@ class socksocket(socket.socket): # We read the response until we get the string "\r\n\r\n" resp = self.recv(1) while resp.find("\r\n\r\n".encode()) == -1: - resp = resp + self.recv(1) + recv = self.recv(1) + if not recv: + raise GeneralProxyError((1, _generalerrors[1])) + resp = resp + recv # We just need the first line to check if the connection # was successful statusline = resp.splitlines()[0].split(" ".encode(), 2) diff --git a/sleekxmpp/thirdparty/statemachine.py b/sleekxmpp/thirdparty/statemachine.py index 113320fa..6c504dce 100644 --- a/sleekxmpp/thirdparty/statemachine.py +++ b/sleekxmpp/thirdparty/statemachine.py @@ -34,7 +34,7 @@ class StateMachine(object): self.lock.release() - def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={}): + def transition(self, from_state, to_state, wait=0.0, func=None, args=None, kwargs=None): ''' Transition from the given `from_state` to the given `to_state`. This method will return `True` if the state machine is now in `to_state`. It @@ -65,15 +65,23 @@ class StateMachine(object): values for `args` and `kwargs` are provided, they are expanded and passed like so: `func( *args, **kwargs )`. ''' + if not args: + args = [] + if not kwargs: + kwargs = {} return self.transition_any((from_state,), to_state, wait=wait, func=func, args=args, kwargs=kwargs) - def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={}): + def transition_any(self, from_states, to_state, wait=0.0, func=None, args=None, kwargs=None): ''' Transition from any of the given `from_states` to the given `to_state`. ''' + if not args: + args = [] + if not kwargs: + kwargs = {} if not isinstance(from_states, (tuple, list, set)): raise ValueError("from_states should be a list, tuple, or set") diff --git a/sleekxmpp/util/__init__.py b/sleekxmpp/util/__init__.py index 05286d33..47a935af 100644 --- a/sleekxmpp/util/__init__.py +++ b/sleekxmpp/util/__init__.py @@ -32,12 +32,17 @@ def _gevent_threads_enabled(): if _gevent_threads_enabled(): import gevent.queue as queue - Queue = queue.JoinableQueue + _queue = queue.JoinableQueue else: try: import queue except ImportError: import Queue as queue - Queue = queue.Queue + _queue = queue.Queue +class Queue(_queue): + def put(self, item, block=True, timeout=None): + if _queue.full(self): + _queue.get(self) + return _queue.put(self, item, block, timeout) QueueEmpty = queue.Empty diff --git a/sleekxmpp/version.py b/sleekxmpp/version.py index ecf62550..acea9334 100644 --- a/sleekxmpp/version.py +++ b/sleekxmpp/version.py @@ -9,5 +9,5 @@ # We don't want to have to import the entire library # just to get the version info for setup.py -__version__ = '1.3.1' -__version_info__ = (1, 3, 1, '', 0) +__version__ = '1.4.0' +__version_info__ = (1, 4, 0, '', 0) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index e011cf3d..f9ec4947 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -291,7 +291,7 @@ class XMLStream(object): self.event_queue = Queue() #: A queue of string data to be sent over the stream. - self.send_queue = Queue() + self.send_queue = Queue(maxsize=256) self.send_queue_lock = threading.Lock() self.send_lock = threading.RLock() @@ -460,9 +460,11 @@ class XMLStream(object): def _connect(self, reattempt=True): self.scheduler.remove('Session timeout check') - if self.reconnect_delay is None or not reattempt: + if self.reconnect_delay is None: delay = 1.0 - else: + self.reconnect_delay = delay + + if reattempt: delay = min(self.reconnect_delay * 2, self.reconnect_max_delay) delay = random.normalvariate(delay, delay * 0.1) log.debug('Waiting %s seconds before connecting.', delay) @@ -523,7 +525,8 @@ class XMLStream(object): 'keyfile': self.keyfile, 'ca_certs': self.ca_certs, 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False + 'do_handshake_on_connect': False, + "ssl_version": self.ssl_version }) if sys.version_info >= (2, 7): @@ -847,13 +850,14 @@ class XMLStream(object): 'keyfile': self.keyfile, 'ca_certs': self.ca_certs, 'cert_reqs': cert_policy, - 'do_handshake_on_connect': False + 'do_handshake_on_connect': False, + "ssl_version": self.ssl_version }) if sys.version_info >= (2, 7): ssl_args['ciphers'] = self.ciphers - ssl_socket = ssl.wrap_socket(self.socket, **ssl_args); + ssl_socket = ssl.wrap_socket(self.socket, **ssl_args) if hasattr(self.socket, 'socket'): # We are using a testing socket, so preserve the top @@ -1149,7 +1153,7 @@ class XMLStream(object): """ return len(self.__event_handlers.get(name, [])) - def event(self, name, data={}, direct=False): + def event(self, name, data=None, direct=False): """Manually trigger a custom event. :param name: The name of the event to trigger. @@ -1160,6 +1164,9 @@ class XMLStream(object): event queue. All event handlers will run in the same thread. """ + if not data: + data = {} + log.debug("Event triggered: " + name) handlers = self.__event_handlers.get(name, []) @@ -1319,9 +1326,6 @@ class XMLStream(object): try: sent += self.socket.send(data[sent:]) count += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') @@ -1336,6 +1340,9 @@ class XMLStream(object): if not self.stop.is_set(): time.sleep(self.ssl_retry_delay) tries += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise if count > 1: log.debug('SENT: %d chunks', count) except (Socket.error, ssl.SSLError) as serr: @@ -1745,9 +1752,6 @@ class XMLStream(object): try: sent += self.socket.send(enc_data[sent:]) count += 1 - except Socket.error as serr: - if serr.errno != errno.EINTR: - raise except ssl.SSLError as serr: if tries >= self.ssl_retry_max: log.debug('SSL error: max retries reached') @@ -1760,6 +1764,9 @@ class XMLStream(object): if not self.stop.is_set(): time.sleep(self.ssl_retry_delay) tries += 1 + except Socket.error as serr: + if serr.errno != errno.EINTR: + raise if count > 1: log.debug('SENT: %d chunks', count) self.send_queue.task_done() |