From 257bcadd9614041d5c501d8e513f59de6f88befa Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Sat, 29 May 2010 10:19:28 +0800 Subject: control-c fixes --- conn_tests/test_pubsubjobs.py | 37 ++++++++++++++--------------- sleekxmpp/xmlstream/scheduler.py | 51 +++++++++++++++++++++++----------------- sleekxmpp/xmlstream/xmlstream.py | 6 ++++- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/conn_tests/test_pubsubjobs.py b/conn_tests/test_pubsubjobs.py index 73059f6c..edf22ccc 100644 --- a/conn_tests/test_pubsubjobs.py +++ b/conn_tests/test_pubsubjobs.py @@ -67,7 +67,7 @@ class TestPubsubServer(unittest.TestCase): p = self.xmpp2.Presence() p['to'] = self.pshost p.send() - self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='queue')) + self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job')) def test005reconfigure(self): """Retrieving node config and reconfiguring""" @@ -86,31 +86,26 @@ class TestPubsubServer(unittest.TestCase): item = ET.Element('{http://netflint.net/protocol/test}test') w = Waiter('wait publish', StanzaPath('message/pubsub_event/items')) self.xmpp2.registerHandler(w) - result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) + #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) + result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item) msg = w.wait(5) # got to get a result in 5 seconds self.failUnless(msg != False, "Account #2 did not get message event") - result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) - iq = self.xmpp2.Iq() - iq['to'] = self.pshost - iq['type'] = 'set' - iq['psstate']['node'] = 'testnode2' - iq['psstate']['item'] = 'test1' - iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') - result = iq.send() - time.sleep(10) - iq = self.xmpp2.Iq() - iq['to'] = self.pshost - iq['type'] = 'set' - iq['psstate']['node'] = 'testnode2' - iq['psstate']['item'] = 'test2' - iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') - result = iq.send() - self.failUnless(result['type'] == 'result') + #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) + result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item) + w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items')) + self.xmpp2.registerHandler(w) + self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1') + msg = w.wait(5) # got to get a result in 5 seconds + self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2') + self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1') + self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2') + print result #need to add check for update def test900cleanup(self): "Cleaning up" - self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") + #self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") + time.sleep(10) if __name__ == '__main__': @@ -141,10 +136,12 @@ if __name__ == '__main__': xmpp1.registerPlugin('xep_0030') xmpp1.registerPlugin('xep_0060') xmpp1.registerPlugin('xep_0199') + xmpp1.registerPlugin('jobs') xmpp2.registerPlugin('xep_0004') xmpp2.registerPlugin('xep_0030') xmpp2.registerPlugin('xep_0060') xmpp2.registerPlugin('xep_0199') + xmpp2.registerPlugin('jobs') if not config.get('account1', 'server'): # we don't know the server, but the lib can probably figure it out diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 7aa59f3d..18d9229e 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -31,11 +31,12 @@ class Task(object): class Scheduler(object): """Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched""" - def __init__(self): + def __init__(self, parentqueue=None): self.addq = queue.Queue() self.schedule = [] self.thread = None self.run = False + self.parentqueue = parentqueue def process(self, threaded=True): if threaded: @@ -47,29 +48,37 @@ class Scheduler(object): def _process(self): self.run = True while self.run: - wait = 5 - updated = False - if self.schedule: - wait = self.schedule[0].next - time.time() try: - newtask = self.addq.get(True, wait) - except queue.Empty: - cleanup = [] - for task in self.schedule: - if time.time() >= task.next: - updated = True - if not task.run(): - cleanup.append(task) + wait = 5 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) else: - break - for task in cleanup: - x = self.schedule.pop(self.schedule.index(task)) - else: - updated = True - self.schedule.append(newtask) - finally: - if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule.append(newtask) + finally: + if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + except KeyboardInterrupt: + self.run = False logging.debug("Qutting Scheduler thread") + if self.parentqueue is not None: + self.parentqueue.put(('quit', None, None)) def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 006f3876..cea300a7 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -76,7 +76,7 @@ class XMLStream(object): self.eventqueue = queue.Queue() self.sendqueue = queue.Queue() - self.scheduler = scheduler.Scheduler() + self.scheduler = scheduler.Scheduler(self.eventqueue) self.namespace_map = {} @@ -149,6 +149,7 @@ class XMLStream(object): def process(self, threaded=True): self.scheduler.process(threaded=True) for t in range(0, HANDLER_THREADS): + logging.debug("Starting HANDLER THREAD") self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) self.__thread['eventhandle%s' % t].start() self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread) @@ -333,6 +334,9 @@ class XMLStream(object): event = self.eventqueue.get(True, timeout=5) except queue.Empty: event = None + except KeyboardInterrupt: + self.run = False + self.scheduler.run = False if event is not None: etype = event[0] handler = event[1] -- cgit v1.2.3