summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conn_tests/test_pubsubjobs.py37
-rw-r--r--sleekxmpp/xmlstream/scheduler.py51
-rw-r--r--sleekxmpp/xmlstream/xmlstream.py6
3 files changed, 52 insertions, 42 deletions
diff --git a/conn_tests/test_pubsubjobs.py b/conn_tests/test_pubsubjobs.py
index 73059f6c..edf22ccc 100644
--- a/conn_tests/test_pubsubjobs.py
+++ b/conn_tests/test_pubsubjobs.py
@@ -67,7 +67,7 @@ class TestPubsubServer(unittest.TestCase):
p = self.xmpp2.Presence()
p['to'] = self.pshost
p.send()
- self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='queue'))
+ self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job'))
def test005reconfigure(self):
"""Retrieving node config and reconfiguring"""
@@ -86,31 +86,26 @@ class TestPubsubServer(unittest.TestCase):
item = ET.Element('{http://netflint.net/protocol/test}test')
w = Waiter('wait publish', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w)
- result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),))
+ #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),))
+ result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item)
msg = w.wait(5) # got to get a result in 5 seconds
self.failUnless(msg != False, "Account #2 did not get message event")
- result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),))
- iq = self.xmpp2.Iq()
- iq['to'] = self.pshost
- iq['type'] = 'set'
- iq['psstate']['node'] = 'testnode2'
- iq['psstate']['item'] = 'test1'
- iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
- result = iq.send()
- time.sleep(10)
- iq = self.xmpp2.Iq()
- iq['to'] = self.pshost
- iq['type'] = 'set'
- iq['psstate']['node'] = 'testnode2'
- iq['psstate']['item'] = 'test2'
- iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
- result = iq.send()
- self.failUnless(result['type'] == 'result')
+ #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),))
+ result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item)
+ w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items'))
+ self.xmpp2.registerHandler(w)
+ self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1')
+ msg = w.wait(5) # got to get a result in 5 seconds
+ self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2')
+ self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1')
+ self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2')
+ print result
#need to add check for update
def test900cleanup(self):
"Cleaning up"
- self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.")
+ #self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.")
+ time.sleep(10)
if __name__ == '__main__':
@@ -141,10 +136,12 @@ if __name__ == '__main__':
xmpp1.registerPlugin('xep_0030')
xmpp1.registerPlugin('xep_0060')
xmpp1.registerPlugin('xep_0199')
+ xmpp1.registerPlugin('jobs')
xmpp2.registerPlugin('xep_0004')
xmpp2.registerPlugin('xep_0030')
xmpp2.registerPlugin('xep_0060')
xmpp2.registerPlugin('xep_0199')
+ xmpp2.registerPlugin('jobs')
if not config.get('account1', 'server'):
# we don't know the server, but the lib can probably figure it out
diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py
index 7aa59f3d..18d9229e 100644
--- a/sleekxmpp/xmlstream/scheduler.py
+++ b/sleekxmpp/xmlstream/scheduler.py
@@ -31,11 +31,12 @@ class Task(object):
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
- def __init__(self):
+ def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
+ self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
@@ -47,29 +48,37 @@ class Scheduler(object):
def _process(self):
self.run = True
while self.run:
- wait = 5
- updated = False
- if self.schedule:
- wait = self.schedule[0].next - time.time()
try:
- newtask = self.addq.get(True, wait)
- except queue.Empty:
- cleanup = []
- for task in self.schedule:
- if time.time() >= task.next:
- updated = True
- if not task.run():
- cleanup.append(task)
+ wait = 5
+ updated = False
+ if self.schedule:
+ wait = self.schedule[0].next - time.time()
+ try:
+ if wait <= 0.0:
+ newtask = self.addq.get(False)
else:
- break
- for task in cleanup:
- x = self.schedule.pop(self.schedule.index(task))
- else:
- updated = True
- self.schedule.append(newtask)
- finally:
- if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
+ newtask = self.addq.get(True, wait)
+ except queue.Empty:
+ cleanup = []
+ for task in self.schedule:
+ if time.time() >= task.next:
+ updated = True
+ if not task.run():
+ cleanup.append(task)
+ else:
+ break
+ for task in cleanup:
+ x = self.schedule.pop(self.schedule.index(task))
+ else:
+ updated = True
+ self.schedule.append(newtask)
+ finally:
+ if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
+ except KeyboardInterrupt:
+ self.run = False
logging.debug("Qutting Scheduler thread")
+ if self.parentqueue is not None:
+ self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py
index 006f3876..cea300a7 100644
--- a/sleekxmpp/xmlstream/xmlstream.py
+++ b/sleekxmpp/xmlstream/xmlstream.py
@@ -76,7 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
- self.scheduler = scheduler.Scheduler()
+ self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@@ -149,6 +149,7 @@ class XMLStream(object):
def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
+ logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
@@ -333,6 +334,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
+ except KeyboardInterrupt:
+ self.run = False
+ self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]