summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--slixmpp/plugins/xep_0325/control.py35
1 files changed, 6 insertions, 29 deletions
diff --git a/slixmpp/plugins/xep_0325/control.py b/slixmpp/plugins/xep_0325/control.py
index b4cb8a20..81ed9039 100644
--- a/slixmpp/plugins/xep_0325/control.py
+++ b/slixmpp/plugins/xep_0325/control.py
@@ -10,8 +10,9 @@
import logging
import time
-from threading import Thread, Timer, Lock
+from slixmpp import asyncio
+from functools import partial
from slixmpp.xmlstream import JID
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@@ -40,10 +41,6 @@ class XEP_0325(BasePlugin):
Also see <http://xmpp.org/extensions/xep-0325.html>
- Configuration Values:
- threaded -- Indicates if communication with sensors should be threaded.
- Defaults to True.
-
Events:
Sensor side
-----------
@@ -58,8 +55,6 @@ class XEP_0325(BasePlugin):
control request, type error
Attributes:
- threaded -- Indicates if command events should be threaded.
- Defaults to True.
sessions -- A dictionary or equivalent backend mapping
session IDs to dictionaries containing data
relevant to a request's session. This dictionary is used
@@ -107,7 +102,6 @@ class XEP_0325(BasePlugin):
default_config = {
- 'threaded': True
# 'session_db': None
}
@@ -139,7 +133,6 @@ class XEP_0325(BasePlugin):
self.sessions = {}
self.last_seqnr = 0
- self.seqnr_lock = Lock()
## For testning only
self.test_authenticated_from = ""
@@ -198,9 +191,7 @@ class XEP_0325(BasePlugin):
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
- self.seqnr_lock.acquire()
self.last_seqnr = self.last_seqnr + 1
- self.seqnr_lock.release()
return str(self.last_seqnr)
def _handle_set_req(self, iq):
@@ -264,14 +255,7 @@ class XEP_0325(BasePlugin):
self.sessions[session]["reply"] = True
self.sessions[session]["node_list"] = process_nodes
- if self.threaded:
- #print("starting thread")
- tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
- tr_req.start()
- #print("started thread")
- else:
- self._threaded_node_request(session, process_fields)
-
+ self._node_request(session, process_fields)
else:
iq = iq.reply()
iq['type'] = 'error'
@@ -332,16 +316,10 @@ class XEP_0325(BasePlugin):
self.sessions[session]["reply"] = False
self.sessions[session]["node_list"] = process_nodes
- if self.threaded:
- #print("starting thread")
- tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
- tr_req.start()
- #print("started thread")
- else:
- self._threaded_node_request(session, process_fields)
+ self._node_request(session, process_fields)
- def _threaded_node_request(self, session, process_fields):
+ def _node_request(self, session, process_fields):
"""
Helper function to handle the device control in a separate thread.
@@ -354,9 +332,8 @@ class XEP_0325(BasePlugin):
self.sessions[session]["nodeDone"][node] = False
for node in self.sessions[session]["node_list"]:
- timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node))
+ timer = asyncio.get_event_loop().call_later(self.nodes[node]['commTimeout'], partial(self._event_comm_timeout, args=(session, node)))
self.sessions[session]["commTimers"][node] = timer
- timer.start()
self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback)
def _event_comm_timeout(self, session, nodeId):