diff options
-rw-r--r-- | slixmpp/plugins/xep_0325/control.py | 35 |
1 files changed, 6 insertions, 29 deletions
diff --git a/slixmpp/plugins/xep_0325/control.py b/slixmpp/plugins/xep_0325/control.py index b4cb8a20..81ed9039 100644 --- a/slixmpp/plugins/xep_0325/control.py +++ b/slixmpp/plugins/xep_0325/control.py @@ -10,8 +10,9 @@ import logging import time -from threading import Thread, Timer, Lock +from slixmpp import asyncio +from functools import partial from slixmpp.xmlstream import JID from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.matcher import StanzaPath @@ -40,10 +41,6 @@ class XEP_0325(BasePlugin): Also see <http://xmpp.org/extensions/xep-0325.html> - Configuration Values: - threaded -- Indicates if communication with sensors should be threaded. - Defaults to True. - Events: Sensor side ----------- @@ -58,8 +55,6 @@ class XEP_0325(BasePlugin): control request, type error Attributes: - threaded -- Indicates if command events should be threaded. - Defaults to True. sessions -- A dictionary or equivalent backend mapping session IDs to dictionaries containing data relevant to a request's session. This dictionary is used @@ -107,7 +102,6 @@ class XEP_0325(BasePlugin): default_config = { - 'threaded': True # 'session_db': None } @@ -139,7 +133,6 @@ class XEP_0325(BasePlugin): self.sessions = {} self.last_seqnr = 0 - self.seqnr_lock = Lock() ## For testning only self.test_authenticated_from = "" @@ -198,9 +191,7 @@ class XEP_0325(BasePlugin): def _get_new_seqnr(self): """ Returns a unique sequence number (unique across threads) """ - self.seqnr_lock.acquire() self.last_seqnr = self.last_seqnr + 1 - self.seqnr_lock.release() return str(self.last_seqnr) def _handle_set_req(self, iq): @@ -264,14 +255,7 @@ class XEP_0325(BasePlugin): self.sessions[session]["reply"] = True self.sessions[session]["node_list"] = process_nodes - if self.threaded: - #print("starting thread") - tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) - tr_req.start() - #print("started thread") - else: - self._threaded_node_request(session, process_fields) - + self._node_request(session, process_fields) else: iq = iq.reply() iq['type'] = 'error' @@ -332,16 +316,10 @@ class XEP_0325(BasePlugin): self.sessions[session]["reply"] = False self.sessions[session]["node_list"] = process_nodes - if self.threaded: - #print("starting thread") - tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields)) - tr_req.start() - #print("started thread") - else: - self._threaded_node_request(session, process_fields) + self._node_request(session, process_fields) - def _threaded_node_request(self, session, process_fields): + def _node_request(self, session, process_fields): """ Helper function to handle the device control in a separate thread. @@ -354,9 +332,8 @@ class XEP_0325(BasePlugin): self.sessions[session]["nodeDone"][node] = False for node in self.sessions[session]["node_list"]: - timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)) + timer = asyncio.get_event_loop().call_later(self.nodes[node]['commTimeout'], partial(self._event_comm_timeout, args=(session, node))) self.sessions[session]["commTimers"][node] = timer - timer.start() self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback) def _event_comm_timeout(self, session, nodeId): |