From 0bda5fd3f28da14478db87c98fb89f647994deaa Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Wed, 26 May 2010 18:32:28 -0700 Subject: adding scheduler --- sleekxmpp/xmlstream/scheduler.py | 76 ++++++++++++++++++++++++++++++++++++++++ sleekxmpp/xmlstream/xmlstream.py | 9 +++-- 2 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 sleekxmpp/xmlstream/scheduler.py diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py new file mode 100644 index 00000000..5cb8aff0 --- /dev/null +++ b/sleekxmpp/xmlstream/scheduler.py @@ -0,0 +1,76 @@ +try: + import queue +except ImportError: + import Queue as queue +import time +import threading + +class Task(object): + """Task object for the Scheduler class""" + def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): + self.name = name + self.seconds = seconds + self.callback = callback + self.args = args or tuple() + self.kwargs = kwargs or {} + self.repeat = repeat + self.next = time.time() + self.seconds + self.qpointer = qpointer + + def run(self): + if self.qpointer is not None: + self.qpointer.put(('schedule', self.callback, self.args)) + else: + self.callback(*self.args, **self.kwargs) + self.reset() + return self.repeat + + def reset(self): + self.next = time.time() + self.seconds + +class Scheduler(object): + """Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched""" + def __init__(self): + self.addq = queue.Queue() + self.schedule = [] + self.thread = None + self.run = True + + def process(self, threaded=True): + if threaded: + self.thread = threading.Thread(name='shedulerprocess', target=self._process) + self.thread.start() + else: + self._process() + + def _process(self): + while self.run: + wait = 5 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule.append(newtask) + finally: + if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + print [x.name for x in self.schedule] + + def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): + self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) + + def quit(self): + self.run = False diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 025884b7..96b47411 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -22,6 +22,7 @@ import time import traceback import types import xml.sax.saxutils +from . import scheduler HANDLER_THREADS = 1 @@ -75,6 +76,7 @@ class XMLStream(object): self.eventqueue = queue.Queue() self.sendqueue = queue.Queue() + self.scheduler = scheduler.Scheduler() self.namespace_map = {} @@ -145,6 +147,7 @@ class XMLStream(object): raise RestartStream() def process(self, threaded=True): + self.scheduler.process(threaded=True) for t in range(0, HANDLER_THREADS): self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) self.__thread['eventhandle%s' % t].start() @@ -156,8 +159,8 @@ class XMLStream(object): else: self._process() - def schedule(self, seconds, handler, args=None): - threading.Timer(seconds, handler, args).start() + def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False): + self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue) def _process(self): "Start processing the socket." @@ -334,7 +337,7 @@ class XMLStream(object): except Exception as e: traceback.print_exc() args[0].exception(e) - elif etype == 'sched': + elif etype == 'schedule': try: handler.run(*args) except: -- cgit v1.2.3 From 3a28f9e5d247b9124e0d14c26c2a6e79aaee86ff Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Thu, 27 May 2010 04:58:57 -0700 Subject: added pubsub state stanzas and scheduled events --- conn_tests/test_pubsubserver.py | 1 - sleekxmpp/plugins/stanza_pubsub.py | 45 ++++++++++++++++++++++++++++---------- sleekxmpp/plugins/xep_0004.py | 1 - sleekxmpp/plugins/xep_0060.py | 10 +++++---- sleekxmpp/xmlstream/scheduler.py | 6 +++-- sleekxmpp/xmlstream/stanzabase.py | 4 ++-- sleekxmpp/xmlstream/xmlstream.py | 8 ++++++- tests/test_pubsubstanzas.py | 15 +++++++++++++ 8 files changed, 67 insertions(+), 23 deletions(-) diff --git a/conn_tests/test_pubsubserver.py b/conn_tests/test_pubsubserver.py index d1e2208f..15635b4b 100644 --- a/conn_tests/test_pubsubserver.py +++ b/conn_tests/test_pubsubserver.py @@ -5,7 +5,6 @@ from xml.etree import cElementTree as ET import os import time import sys -import thread import unittest import sleekxmpp.plugins.xep_0004 from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath diff --git a/sleekxmpp/plugins/stanza_pubsub.py b/sleekxmpp/plugins/stanza_pubsub.py index 1dd73d99..0a75e1e7 100644 --- a/sleekxmpp/plugins/stanza_pubsub.py +++ b/sleekxmpp/plugins/stanza_pubsub.py @@ -10,6 +10,39 @@ def stanzaPlugin(stanza, plugin): stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin +class PubsubState(ElementBase): + namespace = 'http://jabber.org/protocol/psstate' + name = 'state' + plugin_attrib = 'psstate' + interfaces = set(('node', 'item', 'payload')) + plugin_attrib_map = {} + plugin_tag_map = {} + + def setPayload(self, value): + self.xml.append(value) + + def getPayload(self): + childs = self.xml.getchildren() + if len(childs) > 0: + return childs[0] + + def delPayload(self): + for child in self.xml.getchildren(): + self.xml.remove(child) + +stanzaPlugin(Iq, PubsubState) + +class PubsubStateEvent(ElementBase): + namespace = 'http://jabber.org/protocol/psstate#event' + name = 'event' + plugin_attrib = 'psstate_event' + intefaces = set(tuple()) + plugin_attrib_map = {} + plugin_tag_map = {} + +stanzaPlugin(Message, PubsubState) +stanzaPlugin(PubsubState, PubsubStateEvent) + class Pubsub(ElementBase): namespace = 'http://jabber.org/protocol/pubsub' name = 'pubsub' @@ -321,18 +354,6 @@ class Options(ElementBase): stanzaPlugin(Pubsub, Options) stanzaPlugin(Subscribe, Options) -#iq = Iq() -#iq['pubsub']['defaultconfig'] -#print(iq) - -#from xml.etree import cElementTree as ET -#iq = Iq() -#item = Item() -#item['payload'] = ET.Element("{http://netflint.net/p/crap}stupidshit") -#item['id'] = 'aa11bbcc' -#iq['pubsub']['items'].append(item) -#print(iq) - class OwnerAffiliations(Affiliations): namespace = 'http://jabber.org/protocol/pubsub#owner' interfaces = set(('node')) diff --git a/sleekxmpp/plugins/xep_0004.py b/sleekxmpp/plugins/xep_0004.py index 56d18929..015bd8bc 100644 --- a/sleekxmpp/plugins/xep_0004.py +++ b/sleekxmpp/plugins/xep_0004.py @@ -188,7 +188,6 @@ class Form(FieldContainer): #def getXML(self, tostring = False): def getXML(self, ftype=None): - logging.debug("creating form as %s" % ftype) if ftype: self.type = ftype form = ET.Element('{jabber:x:data}x') diff --git a/sleekxmpp/plugins/xep_0060.py b/sleekxmpp/plugins/xep_0060.py index 44a70e9a..bff158a0 100644 --- a/sleekxmpp/plugins/xep_0060.py +++ b/sleekxmpp/plugins/xep_0060.py @@ -14,12 +14,14 @@ class xep_0060(base.base_plugin): self.xep = '0060' self.description = 'Publish-Subscribe' - def create_node(self, jid, node, config=None, collection=False): + def create_node(self, jid, node, config=None, collection=False, ntype=None): pubsub = ET.Element('{http://jabber.org/protocol/pubsub}pubsub') create = ET.Element('create') create.set('node', node) pubsub.append(create) configure = ET.Element('configure') + if collection: + ntype = 'collection' #if config is None: # submitform = self.xmpp.plugin['xep_0004'].makeForm('submit') #else: @@ -29,11 +31,11 @@ class xep_0060(base.base_plugin): submitform.field['FORM_TYPE'].setValue('http://jabber.org/protocol/pubsub#node_config') else: submitform.addField('FORM_TYPE', 'hidden', value='http://jabber.org/protocol/pubsub#node_config') - if collection: + if ntype: if 'pubsub#node_type' in submitform.field: - submitform.field['pubsub#node_type'].setValue('collection') + submitform.field['pubsub#node_type'].setValue(ntype) else: - submitform.addField('pubsub#node_type', value='collection') + submitform.addField('pubsub#node_type', value=ntype) else: if 'pubsub#node_type' in submitform.field: submitform.field['pubsub#node_type'].setValue('leaf') diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 5cb8aff0..7aa59f3d 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -4,6 +4,7 @@ except ImportError: import Queue as queue import time import threading +import logging class Task(object): """Task object for the Scheduler class""" @@ -34,7 +35,7 @@ class Scheduler(object): self.addq = queue.Queue() self.schedule = [] self.thread = None - self.run = True + self.run = False def process(self, threaded=True): if threaded: @@ -44,6 +45,7 @@ class Scheduler(object): self._process() def _process(self): + self.run = True while self.run: wait = 5 updated = False @@ -67,7 +69,7 @@ class Scheduler(object): self.schedule.append(newtask) finally: if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) - print [x.name for x in self.schedule] + logging.debug("Qutting Scheduler thread") def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index 3f3f5e08..c40922be 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -319,6 +319,8 @@ class StanzaBase(ElementBase): def __init__(self, stream=None, xml=None, stype=None, sto=None, sfrom=None, sid=None): self.stream = stream + if stream is not None: + self.namespace = stream.default_ns ElementBase.__init__(self, xml) if stype is not None: self['type'] = stype @@ -326,8 +328,6 @@ class StanzaBase(ElementBase): self['to'] = sto if sfrom is not None: self['from'] = sfrom - if stream is not None: - self.namespace = stream.default_ns self.tag = "{%s}%s" % (self.namespace, self.name) def setType(self, value): diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 96b47411..7fbc5f9e 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -180,6 +180,7 @@ class XMLStream(object): self.state.set('reconnect', False) self.disconnect() self.run = False + self.scheduler.run = False self.eventqueue.put(('quit', None, None)) return except CloseStream: @@ -226,6 +227,7 @@ class XMLStream(object): edepth += -1 if edepth == 0 and event == b'end': self.disconnect(reconnect=self.state['reconnect']) + logging.debug("Ending readXML loop") return False elif edepth == 1: #self.xmlin.put(xmlobj) @@ -234,11 +236,13 @@ class XMLStream(object): except RestartStream: return True except CloseStream: + logging.debug("Ending readXML loop") return False if root: root.clear() if event == b'start': edepth += 1 + logging.debug("Ending readXML loop") def _sendThread(self): while self.run: @@ -268,6 +272,7 @@ class XMLStream(object): logging.debug("Disconnecting...") self.state.set('disconnecting', True) self.run = False + self.scheduler.run = False if self.state['connected']: self.sendRaw(self.stream_footer) time.sleep(1) @@ -339,7 +344,8 @@ class XMLStream(object): args[0].exception(e) elif etype == 'schedule': try: - handler.run(*args) + logging.debug(args) + handler(*args[0]) except: logging.error(traceback.format_exc()) elif etype == 'quit': diff --git a/tests/test_pubsubstanzas.py b/tests/test_pubsubstanzas.py index 55407c16..089ee180 100644 --- a/tests/test_pubsubstanzas.py +++ b/tests/test_pubsubstanzas.py @@ -97,6 +97,21 @@ class testpubsubstanzas(unittest.TestCase): iq3.setValues(values) self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3)) + def testState(self): + "Testing iq/psstate stanzas" + from sleekxmpp.plugins import xep_0004 + iq = self.ps.Iq() + iq['psstate']['node']= 'mynode' + iq['psstate']['item']= 'myitem' + pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') + iq['psstate']['payload'] = pl + xmlstring = """""" + iq2 = self.ps.Iq(None, self.ps.ET.fromstring(xmlstring)) + iq3 = self.ps.Iq() + values = iq2.getValues() + iq3.setValues(values) + self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3)) + def testDefault(self): "Testing iq/pubsub_owner/default stanzas" from sleekxmpp.plugins import xep_0004 -- cgit v1.2.3 From bde181840024716e233881a4319daaaeab1555f0 Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Thu, 27 May 2010 04:59:41 -0700 Subject: added pubsubjobs test --- conn_tests/test_pubsubjobs.py | 174 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 conn_tests/test_pubsubjobs.py diff --git a/conn_tests/test_pubsubjobs.py b/conn_tests/test_pubsubjobs.py new file mode 100644 index 00000000..73059f6c --- /dev/null +++ b/conn_tests/test_pubsubjobs.py @@ -0,0 +1,174 @@ +import logging +import sleekxmpp +from optparse import OptionParser +from xml.etree import cElementTree as ET +import os +import time +import sys +import unittest +import sleekxmpp.plugins.xep_0004 +from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath +from sleekxmpp.xmlstream.handler.waiter import Waiter +try: + import configparser +except ImportError: + import ConfigParser as configparser +try: + import queue +except ImportError: + import Queue as queue + +class TestClient(sleekxmpp.ClientXMPP): + def __init__(self, jid, password): + sleekxmpp.ClientXMPP.__init__(self, jid, password) + self.add_event_handler("session_start", self.start) + #self.add_event_handler("message", self.message) + self.waitforstart = queue.Queue() + + def start(self, event): + self.getRoster() + self.sendPresence() + self.waitforstart.put(True) + + +class TestPubsubServer(unittest.TestCase): + statev = {} + + def __init__(self, *args, **kwargs): + unittest.TestCase.__init__(self, *args, **kwargs) + + def setUp(self): + pass + + def test001getdefaultconfig(self): + """Get the default node config""" + self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2') + self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode3') + self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode4') + self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode5') + result = self.xmpp1['xep_0060'].getNodeConfig(self.pshost) + self.statev['defaultconfig'] = result + self.failUnless(isinstance(result, sleekxmpp.plugins.xep_0004.Form)) + + def test002createdefaultnode(self): + """Create a node without config""" + self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode1')) + + def test003deletenode(self): + """Delete recently created node""" + self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode1')) + + def test004createnode(self): + """Create a node with a config""" + self.statev['defaultconfig'].field['pubsub#access_model'].setValue('open') + self.statev['defaultconfig'].field['pubsub#notify_retract'].setValue(True) + self.statev['defaultconfig'].field['pubsub#persist_items'].setValue(True) + self.statev['defaultconfig'].field['pubsub#presence_based_delivery'].setValue(True) + p = self.xmpp2.Presence() + p['to'] = self.pshost + p.send() + self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='queue')) + + def test005reconfigure(self): + """Retrieving node config and reconfiguring""" + nconfig = self.xmpp1['xep_0060'].getNodeConfig(self.pshost, 'testnode2') + self.failUnless(nconfig, "No configuration returned") + #print("\n%s ==\n %s" % (nconfig.getValues(), self.statev['defaultconfig'].getValues())) + self.failUnless(nconfig.getValues() == self.statev['defaultconfig'].getValues(), "Configuration does not match") + self.failUnless(self.xmpp1['xep_0060'].setNodeConfig(self.pshost, 'testnode2', nconfig)) + + def test006subscribetonode(self): + """Subscribe to node from account 2""" + self.failUnless(self.xmpp2['xep_0060'].subscribe(self.pshost, "testnode2")) + + def test007publishitem(self): + """Publishing item""" + item = ET.Element('{http://netflint.net/protocol/test}test') + w = Waiter('wait publish', StanzaPath('message/pubsub_event/items')) + self.xmpp2.registerHandler(w) + result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) + msg = w.wait(5) # got to get a result in 5 seconds + self.failUnless(msg != False, "Account #2 did not get message event") + result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) + iq = self.xmpp2.Iq() + iq['to'] = self.pshost + iq['type'] = 'set' + iq['psstate']['node'] = 'testnode2' + iq['psstate']['item'] = 'test1' + iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') + result = iq.send() + time.sleep(10) + iq = self.xmpp2.Iq() + iq['to'] = self.pshost + iq['type'] = 'set' + iq['psstate']['node'] = 'testnode2' + iq['psstate']['item'] = 'test2' + iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') + result = iq.send() + self.failUnless(result['type'] == 'result') + #need to add check for update + + def test900cleanup(self): + "Cleaning up" + self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") + + +if __name__ == '__main__': + #parse command line arguements + optp = OptionParser() + optp.add_option('-q','--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', const=logging.ERROR, default=logging.INFO) + optp.add_option('-d','--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO) + optp.add_option('-v','--verbose', help='set logging to COMM', action='store_const', dest='loglevel', const=5, default=logging.INFO) + optp.add_option("-c","--config", dest="configfile", default="config.xml", help="set config file to use") + optp.add_option("-n","--nodenum", dest="nodenum", default="1", help="set node number to use") + optp.add_option("-p","--pubsub", dest="pubsub", default="1", help="set pubsub host to use") + opts,args = optp.parse_args() + + logging.basicConfig(level=opts.loglevel, format='%(levelname)-8s %(message)s') + + #load xml config + logging.info("Loading config file: %s" % opts.configfile) + config = configparser.RawConfigParser() + config.read(opts.configfile) + + #init + logging.info("Account 1 is %s" % config.get('account1', 'jid')) + xmpp1 = TestClient(config.get('account1','jid'), config.get('account1','pass')) + logging.info("Account 2 is %s" % config.get('account2', 'jid')) + xmpp2 = TestClient(config.get('account2','jid'), config.get('account2','pass')) + + xmpp1.registerPlugin('xep_0004') + xmpp1.registerPlugin('xep_0030') + xmpp1.registerPlugin('xep_0060') + xmpp1.registerPlugin('xep_0199') + xmpp2.registerPlugin('xep_0004') + xmpp2.registerPlugin('xep_0030') + xmpp2.registerPlugin('xep_0060') + xmpp2.registerPlugin('xep_0199') + + if not config.get('account1', 'server'): + # we don't know the server, but the lib can probably figure it out + xmpp1.connect() + else: + xmpp1.connect((config.get('account1', 'server'), 5222)) + xmpp1.process(threaded=True) + + #init + if not config.get('account2', 'server'): + # we don't know the server, but the lib can probably figure it out + xmpp2.connect() + else: + xmpp2.connect((config.get('account2', 'server'), 5222)) + xmpp2.process(threaded=True) + + TestPubsubServer.xmpp1 = xmpp1 + TestPubsubServer.xmpp2 = xmpp2 + TestPubsubServer.pshost = config.get('settings', 'pubsub') + xmpp1.waitforstart.get(True) + xmpp2.waitforstart.get(True) + testsuite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubServer) + + alltests_suite = unittest.TestSuite([testsuite]) + result = unittest.TextTestRunner(verbosity=2).run(alltests_suite) + xmpp1.disconnect() + xmpp2.disconnect() -- cgit v1.2.3 From 2f1ba368e2adbac27434f70753a6497c0a257301 Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Fri, 28 May 2010 19:19:28 -0700 Subject: control-c fixes --- conn_tests/test_pubsubjobs.py | 37 ++++++++++++++--------------- sleekxmpp/xmlstream/scheduler.py | 51 +++++++++++++++++++++++----------------- sleekxmpp/xmlstream/xmlstream.py | 6 ++++- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/conn_tests/test_pubsubjobs.py b/conn_tests/test_pubsubjobs.py index 73059f6c..edf22ccc 100644 --- a/conn_tests/test_pubsubjobs.py +++ b/conn_tests/test_pubsubjobs.py @@ -67,7 +67,7 @@ class TestPubsubServer(unittest.TestCase): p = self.xmpp2.Presence() p['to'] = self.pshost p.send() - self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='queue')) + self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job')) def test005reconfigure(self): """Retrieving node config and reconfiguring""" @@ -86,31 +86,26 @@ class TestPubsubServer(unittest.TestCase): item = ET.Element('{http://netflint.net/protocol/test}test') w = Waiter('wait publish', StanzaPath('message/pubsub_event/items')) self.xmpp2.registerHandler(w) - result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) + #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),)) + result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item) msg = w.wait(5) # got to get a result in 5 seconds self.failUnless(msg != False, "Account #2 did not get message event") - result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) - iq = self.xmpp2.Iq() - iq['to'] = self.pshost - iq['type'] = 'set' - iq['psstate']['node'] = 'testnode2' - iq['psstate']['item'] = 'test1' - iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') - result = iq.send() - time.sleep(10) - iq = self.xmpp2.Iq() - iq['to'] = self.pshost - iq['type'] = 'set' - iq['psstate']['node'] = 'testnode2' - iq['psstate']['item'] = 'test2' - iq['psstate']['payload'] = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') - result = iq.send() - self.failUnless(result['type'] == 'result') + #result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),)) + result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item) + w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items')) + self.xmpp2.registerHandler(w) + self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1') + msg = w.wait(5) # got to get a result in 5 seconds + self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2') + self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1') + self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2') + print result #need to add check for update def test900cleanup(self): "Cleaning up" - self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") + #self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.") + time.sleep(10) if __name__ == '__main__': @@ -141,10 +136,12 @@ if __name__ == '__main__': xmpp1.registerPlugin('xep_0030') xmpp1.registerPlugin('xep_0060') xmpp1.registerPlugin('xep_0199') + xmpp1.registerPlugin('jobs') xmpp2.registerPlugin('xep_0004') xmpp2.registerPlugin('xep_0030') xmpp2.registerPlugin('xep_0060') xmpp2.registerPlugin('xep_0199') + xmpp2.registerPlugin('jobs') if not config.get('account1', 'server'): # we don't know the server, but the lib can probably figure it out diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 7aa59f3d..18d9229e 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -31,11 +31,12 @@ class Task(object): class Scheduler(object): """Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched""" - def __init__(self): + def __init__(self, parentqueue=None): self.addq = queue.Queue() self.schedule = [] self.thread = None self.run = False + self.parentqueue = parentqueue def process(self, threaded=True): if threaded: @@ -47,29 +48,37 @@ class Scheduler(object): def _process(self): self.run = True while self.run: - wait = 5 - updated = False - if self.schedule: - wait = self.schedule[0].next - time.time() try: - newtask = self.addq.get(True, wait) - except queue.Empty: - cleanup = [] - for task in self.schedule: - if time.time() >= task.next: - updated = True - if not task.run(): - cleanup.append(task) + wait = 5 + updated = False + if self.schedule: + wait = self.schedule[0].next - time.time() + try: + if wait <= 0.0: + newtask = self.addq.get(False) else: - break - for task in cleanup: - x = self.schedule.pop(self.schedule.index(task)) - else: - updated = True - self.schedule.append(newtask) - finally: - if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + newtask = self.addq.get(True, wait) + except queue.Empty: + cleanup = [] + for task in self.schedule: + if time.time() >= task.next: + updated = True + if not task.run(): + cleanup.append(task) + else: + break + for task in cleanup: + x = self.schedule.pop(self.schedule.index(task)) + else: + updated = True + self.schedule.append(newtask) + finally: + if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next) + except KeyboardInterrupt: + self.run = False logging.debug("Qutting Scheduler thread") + if self.parentqueue is not None: + self.parentqueue.put(('quit', None, None)) def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None): self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer)) diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 7fbc5f9e..6b92abca 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -76,7 +76,7 @@ class XMLStream(object): self.eventqueue = queue.Queue() self.sendqueue = queue.Queue() - self.scheduler = scheduler.Scheduler() + self.scheduler = scheduler.Scheduler(self.eventqueue) self.namespace_map = {} @@ -149,6 +149,7 @@ class XMLStream(object): def process(self, threaded=True): self.scheduler.process(threaded=True) for t in range(0, HANDLER_THREADS): + logging.debug("Starting HANDLER THREAD") self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) self.__thread['eventhandle%s' % t].start() self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread) @@ -331,6 +332,9 @@ class XMLStream(object): event = self.eventqueue.get(True, timeout=5) except queue.Empty: event = None + except KeyboardInterrupt: + self.run = False + self.scheduler.run = False if event is not None: etype = event[0] handler = event[1] -- cgit v1.2.3 From fd573880ebcf0c4885da3b22ee6d07a34091d9bd Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 26 May 2010 21:34:01 +0800 Subject: Updated the XEP-0030 plugin to work with stanza objects instead of manipulating XML directly. Four new events have been added: disco_info - A disco#info result has been received disco_info_request - A disco#info request has been received disco_items - A disco#items result has been received disco_items_request - A disco#items request has been received For disco_info_request and disco_items_request two default handlers are registered. These handlers will only run if they are the only handler for these two events so that multiple responses are not returned and cause errors. In your own handlers for these two events, you can call the default handlers to preserve the static node behaviour as so: self.plugin['xep_0030'].handle_disco_info(iq, True) The forwarded=True will disable the check for other registered handlers. Agents can now dynamically respond to disco requests by using these events. (cherry picked from commit 0fc3381492a8bd75e6a9858539a972334881d8ff) --- sleekxmpp/plugins/xep_0030.py | 405 ++++++++++++++++++++++++++++++++---------- 1 file changed, 308 insertions(+), 97 deletions(-) diff --git a/sleekxmpp/plugins/xep_0030.py b/sleekxmpp/plugins/xep_0030.py index 5432dd56..4370e588 100644 --- a/sleekxmpp/plugins/xep_0030.py +++ b/sleekxmpp/plugins/xep_0030.py @@ -1,25 +1,184 @@ """ - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2007 Nathanael C. Fritz - This file is part of SleekXMPP. - - SleekXMPP is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - SleekXMPP is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with SleekXMPP; if not, write to the Free Software - Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file license.txt for copying permissio """ -from . import base + import logging -from xml.etree import cElementTree as ET +from . import base +from .. xmlstream.handler.callback import Callback +from .. xmlstream.matcher.xpath import MatchXPath +from .. xmlstream.stanzabase import ElementBase, ET, JID +from .. stanza.iq import Iq + +class DiscoInfo(ElementBase): + namespace = 'http://jabber.org/protocol/disco#info' + name = 'query' + plugin_attrib = 'disco_info' + interfaces = set(('node', 'features', 'identities')) + + def getFeatures(self): + features = [] + featuresXML = self.xml.findall('{%s}feature' % self.namespace) + for feature in featuresXML: + features.append(feature.attrib['var']) + return features + + def setFeatures(self, features): + self.delFeatures() + for name in features: + self.addFeature(name) + + def delFeatures(self): + featuresXML = self.xml.findall('{%s}feature' % self.namespace) + for feature in featuresXML: + self.xml.remove(feature) + + def addFeature(self, feature): + featureXML = ET.Element('{%s}feature' % self.namespace, + {'var': feature}) + self.xml.append(featureXML) + + def delFeature(self, feature): + featuresXML = self.xml.findall('{%s}feature' % self.namespace) + for featureXML in featuresXML: + if featureXML.attrib['var'] == feature: + self.xml.remove(featureXML) + + def getIdentities(self): + ids = [] + idsXML = self.xml.findall('{%s}identity' % self.namespace) + for idXML in idsXML: + idData = (idXML.attrib['category'], + idXML.attrib['type'], + idXML.attrib.get('name', '')) + ids.append(idData) + return ids + + def setIdentities(self, ids): + self.delIdentities() + for idData in ids: + self.addIdentity(*idData) + + def delIdentities(self): + idsXML = self.xml.findall('{%s}identity' % self.namespace) + for idXML in idsXML: + self.xml.remove(idXML) + + def addIdentity(self, category, id_type, name=''): + idXML = ET.Element('{%s}identity' % self.namespace, + {'category': category, + 'type': id_type, + 'name': name}) + self.xml.append(idXML) + + def delIdentity(self, category, id_type, name=''): + idsXML = self.xml.findall('{%s}identity' % self.namespace) + for idXML in idsXML: + idData = (idXML.attrib['category'], + idXML.attrib['type']) + delId = (category, id_type) + if idData == delId: + self.xml.remove(idXML) + + +class DiscoItems(ElementBase): + namespace = 'http://jabber.org/protocol/disco#items' + name = 'query' + plugin_attrib = 'disco_items' + interfaces = set(('node', 'items')) + + def getItems(self): + items = [] + itemsXML = self.xml.findall('{%s}item' % self.namespace) + for item in itemsXML: + itemData = (item.attrib['jid'], + item.attrib.get('node'), + item.attrib.get('name')) + items.append(itemData) + return items + + def setItems(self, items): + self.delItems() + for item in items: + self.addItem(*item) + + def delItems(self): + itemsXML = self.xml.findall('{%s}item' % self.namespace) + for item in itemsXML: + self.xml.remove(item) + + def addItem(self, jid, node='', name=''): + itemXML = ET.Element('{%s}item' % self.namespace, {'jid': jid}) + if name: + itemXML.attrib['name'] = name + if node: + itemXML.attrib['node'] = node + self.xml.append(itemXML) + + def delItem(self, jid, node=''): + itemsXML = self.xml.findall('{%s}item' % self.namespace) + for itemXML in itemsXML: + itemData = (itemXML.attrib['jid'], + itemXML.attrib.get('node', '')) + itemDel = (jid, node) + if itemData == itemDel: + self.xml.remove(itemXML) + + +class DiscoNode(object): + """ + Collection object for grouping info and item information + into nodes. + """ + def __init__(self, name): + self.name = name + self.info = DiscoInfo() + self.items = DiscoItems() + + # This is a bit like poor man's inheritance, but + # to simplify adding information to the node we + # map node functions to either the info or items + # stanza objects. + # + # We don't want to make DiscoNode inherit from + # DiscoInfo and DiscoItems because DiscoNode is + # not an actual stanza, and doing so would create + # confusion and potential bugs. + + self._map(self.items, 'items', ['get', 'set', 'del']) + self._map(self.items, 'item', ['add', 'del']) + self._map(self.info, 'identities', ['get', 'set', 'del']) + self._map(self.info, 'identity', ['add', 'del']) + self._map(self.info, 'features', ['get', 'set', 'del']) + self._map(self.info, 'feature', ['add', 'del']) + + def isEmpty(self): + """ + Test if the node contains any information. Useful for + determining if a node can be deleted. + """ + ids = self.getIdentities() + features = self.getFeatures() + items = self.getItems() + + if not ids and not features and not items: + return True + return False + + def _map(self, obj, interface, access): + """ + Map functions of the form obj.accessInterface + to self.accessInterface for each given access type. + """ + interface = interface.title() + for access_type in access: + method = access_type + interface + if hasattr(obj, method): + setattr(self, method, getattr(obj, method)) + class xep_0030(base.base_plugin): """ @@ -29,85 +188,137 @@ class xep_0030(base.base_plugin): def plugin_init(self): self.xep = '0030' self.description = 'Service Discovery' - self.features = {'main': ['http://jabber.org/protocol/disco#info', 'http://jabber.org/protocol/disco#items']} - self.identities = {'main': [{'category': 'client', 'type': 'pc', 'name': 'SleekXMPP'}]} - self.items = {'main': []} - self.xmpp.add_handler("" % self.xmpp.default_ns, self.info_handler) - self.xmpp.add_handler("" % self.xmpp.default_ns, self.item_handler) + + self.xmpp.registerHandler( + Callback('Disco Items', + MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns, + DiscoItems.namespace)), + self.handle_item_query)) + + self.xmpp.registerHandler( + Callback('Disco Info', + MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns, + DiscoInfo.namespace)), + self.handle_info_query)) + + self.xmpp.stanzaPlugin(Iq, DiscoInfo) + self.xmpp.stanzaPlugin(Iq, DiscoItems) + + self.xmpp.add_event_handler('disco_items_request', self.handle_disco_items) + self.xmpp.add_event_handler('disco_info_request', self.handle_disco_info) + + self.nodes = {'main': DiscoNode('main')} + + def add_node(self, node): + if node not in self.nodes: + self.nodes[node] = DiscoNode(node) + + def del_node(self, node): + if node in self.nodes: + del self.nodes[node] + + def handle_item_query(self, iq): + if iq['type'] == 'get': + logging.debug("Items requested by %s" % iq['from']) + self.xmpp.event('disco_items_request', iq) + elif iq['type'] == 'result': + logging.debug("Items result from %s" % iq['from']) + self.xmpp.event('disco_items', iq) + + def handle_info_query(self, iq): + if iq['type'] == 'get': + logging.debug("Info requested by %s" % iq['from']) + self.xmpp.event('disco_info_request', iq) + elif iq['type'] == 'result': + logging.debug("Info result from %s" % iq['from']) + self.xmpp.event('disco_info', iq) + + def handle_disco_info(self, iq, forwarded=False): + """ + A default handler for disco#info requests. If another + handler is registered, this one will defer and not run. + """ + handlers = self.xmpp.event_handlers['disco_info_request'] + if not forwarded and len(handlers) > 1: + return + + node_name = iq['disco_info']['node'] + if not node_name: + node_name = 'main' + + logging.debug("Using default handler for disco#info on node '%s'." % node_name) + + if node_name in self.nodes: + node = self.nodes[node_name] + iq.reply().setPayload(node.info.xml).send() + else: + logging.debug("Node %s requested, but does not exist." % node_name) + iq.reply().error().setPayload(iq['disco_info'].xml) + iq['error']['code'] = '404' + iq['error']['type'] = 'cancel' + iq['error']['condition'] = 'item-not-found' + iq.send() + + def handle_disco_items(self, iq, forwarded=False): + """ + A default handler for disco#items requests. If another + handler is registered, this one will defer and not run. + + If this handler is called by your own custom handler with + forwarded set to True, then it will run as normal. + """ + handlers = self.xmpp.event_handlers['disco_items_request'] + if not forwarded and len(handlers) > 1: + return + + node_name = iq['disco_items']['node'] + if not node_name: + node_name = 'main' + + logging.debug("Using default handler for disco#items on node '%s'." % node_name) + + if node_name in self.nodes: + node = self.nodes[node_name] + iq.reply().setPayload(node.items.xml).send() + else: + logging.debug("Node %s requested, but does not exist." % node_name) + iq.reply().error().setPayload(iq['disco_items'].xml) + iq['error']['code'] = '404' + iq['error']['type'] = 'cancel' + iq['error']['condition'] = 'item-not-found' + iq.send() + + # Older interface methods for backwards compatibility + + def getInfo(self, jid, node=''): + iq = Iq(self.xmpp) + iq['type'] = 'get' + iq['to'] = jid + iq['from'] = self.xmpp.fulljid + iq['disco_info']['node'] = node + iq.send() + + def getItems(self, jid, node=''): + iq = Iq(self.xmpp) + iq['type'] = 'get' + iq['to'] = jid + iq['from'] = self.xmpp.fulljid + iq['disco_items']['node'] = node + iq.send() def add_feature(self, feature, node='main'): - if not node in self.features: - self.features[node] = [] - self.features[node].append(feature) - - def add_identity(self, category=None, itype=None, name=None, node='main'): - if not node in self.identities: - self.identities[node] = [] - self.identities[node].append({'category': category, 'type': itype, 'name': name}) + self.add_node(node) + self.nodes[node].addFeature(feature) - def add_item(self, jid=None, name=None, node='main', subnode=''): - if not node in self.items: - self.items[node] = [] - self.items[node].append({'jid': jid, 'name': name, 'node': subnode}) - - def info_handler(self, xml): - logging.debug("Info request from %s" % xml.get('from', '')) - iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId())) - iq.attrib['from'] = xml.get('to') - iq.attrib['to'] = xml.get('from', self.xmpp.server) - query = xml.find('{http://jabber.org/protocol/disco#info}query') - node = query.get('node', 'main') - for identity in self.identities.get(node, []): - idxml = ET.Element('identity') - for attrib in identity: - if identity[attrib]: - idxml.attrib[attrib] = identity[attrib] - query.append(idxml) - for feature in self.features.get(node, []): - featxml = ET.Element('feature') - featxml.attrib['var'] = feature - query.append(featxml) - iq.append(query) - #print ET.tostring(iq) - self.xmpp.send(iq) - - def item_handler(self, xml): - logging.debug("Item request from %s" % xml.get('from', '')) - iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId())) - iq.attrib['from'] = xml.get('to') - iq.attrib['to'] = xml.get('from', self.xmpp.server) - query = self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items').find('{http://jabber.org/protocol/disco#items}query') - node = xml.find('{http://jabber.org/protocol/disco#items}query').get('node', 'main') - for item in self.items.get(node, []): - itemxml = ET.Element('item') - itemxml.attrib = item - if itemxml.attrib['jid'] is None: - itemxml.attrib['jid'] = xml.get('to') - query.append(itemxml) - self.xmpp.send(iq) + def add_identity(self, category='', itype='', name='', node='main'): + self.add_node(node) + self.nodes[node].addIdentity(category=category, + id_type=itype, + name=name) - def getItems(self, jid, node=None): - iq = self.xmpp.makeIqGet() - iq.attrib['from'] = self.xmpp.fulljid - iq.attrib['to'] = jid - self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items') - if node: - iq.find('{http://jabber.org/protocol/disco#items}query').attrib['node'] = node - return iq.send() - - def getInfo(self, jid, node=None): - iq = self.xmpp.makeIqGet() - iq.attrib['from'] = self.xmpp.fulljid - iq.attrib['to'] = jid - self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#info') - if node: - iq.find('{http://jabber.org/protocol/disco#info}query').attrib['node'] = node - return iq.send() - - def parseInfo(self, xml): - result = {'identity': {}, 'feature': []} - for identity in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}identity'): - result['identity'][identity['name']] = identity.attrib - for feature in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}feature'): - result['feature'].append(feature.get('var', '__unknown__')) - return result + def add_item(self, jid=None, name='', node='main', subnode=''): + self.add_node(node) + self.add_node(subnode) + if jid is None: + jid = self.xmpp.fulljid + self.nodes[node].addItem(jid=jid, name=name, node=subnode) -- cgit v1.2.3 From 9fee87c25863956370a36ac87741c1f4cec5e23d Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Wed, 26 May 2010 21:40:12 +0800 Subject: Added unit tests for the new XEP-0030 stanza objects. All pass. (cherry picked from commit e1b814f27bf160f20bb30c315ca30769d217482d) --- tests/test_disco.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 tests/test_disco.py diff --git a/tests/test_disco.py b/tests/test_disco.py new file mode 100644 index 00000000..bbe285a6 --- /dev/null +++ b/tests/test_disco.py @@ -0,0 +1,155 @@ +import unittest +from xml.etree import cElementTree as ET +from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath +from . import xmlcompare + +import sleekxmpp.plugins.xep_0030 as sd + +def stanzaPlugin(stanza, plugin): + stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin + stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin + +class testdisco(unittest.TestCase): + + def setUp(self): + self.sd = sd + stanzaPlugin(self.sd.Iq, self.sd.DiscoInfo) + stanzaPlugin(self.sd.Iq, self.sd.DiscoItems) + + def try3Methods(self, xmlstring, iq): + iq2 = self.sd.Iq(None, self.sd.ET.fromstring(xmlstring)) + values = iq2.getValues() + iq3 = self.sd.Iq() + iq3.setValues(values) + self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3), str(iq)+"3 methods for creating stanza don't match") + + def testCreateInfoQueryNoNode(self): + """Testing disco#info query with no node.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = '' + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testCreateInfoQueryWithNode(self): + """Testing disco#info query with a node.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testCreateInfoQueryNoNode(self): + """Testing disco#items query with no node.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = '' + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testCreateItemsQueryWithNode(self): + """Testing disco#items query with a node.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = 'foo' + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testInfoIdentities(self): + """Testing adding identities to disco#info.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + iq['disco_info'].addIdentity('conference', 'text', 'Chatroom') + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testInfoFeatures(self): + """Testing adding features to disco#info.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + iq['disco_info'].addFeature('foo') + iq['disco_info'].addFeature('bar') + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testItems(self): + """Testing adding features to disco#info.""" + iq = self.sd.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = 'foo' + iq['disco_items'].addItem('user@localhost') + iq['disco_items'].addItem('user@localhost', 'foo') + iq['disco_items'].addItem('user@localhost', 'bar', 'Testing') + xmlstring = """""" + self.try3Methods(xmlstring, iq) + + def testAddRemoveIdentities(self): + """Test adding and removing identities to disco#info stanza""" + ids = [('automation', 'commands', 'AdHoc'), + ('conference', 'text', 'ChatRoom')] + + info = self.sd.DiscoInfo() + info.addIdentity(*ids[0]) + self.failUnless(info.getIdentities() == [ids[0]]) + + info.delIdentity('automation', 'commands') + self.failUnless(info.getIdentities() == []) + + info.setIdentities(ids) + self.failUnless(info.getIdentities() == ids) + + info.delIdentity('automation', 'commands') + self.failUnless(info.getIdentities() == [ids[1]]) + + info.delIdentities() + self.failUnless(info.getIdentities() == []) + + def testAddRemoveFeatures(self): + """Test adding and removing features to disco#info stanza""" + features = ['foo', 'bar', 'baz'] + + info = self.sd.DiscoInfo() + info.addFeature(features[0]) + self.failUnless(info.getFeatures() == [features[0]]) + + info.delFeature('foo') + self.failUnless(info.getFeatures() == []) + + info.setFeatures(features) + self.failUnless(info.getFeatures() == features) + + info.delFeature('bar') + self.failUnless(info.getFeatures() == ['foo', 'baz']) + + info.delFeatures() + self.failUnless(info.getFeatures() == []) + + def testAddRemoveItems(self): + """Test adding and removing items to disco#items stanza""" + items = [('user@localhost', None, None), + ('user@localhost', 'foo', None), + ('user@localhost', 'bar', 'Test')] + + info = self.sd.DiscoItems() + self.failUnless(True, ""+str(items[0])) + + info.addItem(*(items[0])) + self.failUnless(info.getItems() == [items[0]], info.getItems()) + + info.delItem('user@localhost') + self.failUnless(info.getItems() == []) + + info.setItems(items) + self.failUnless(info.getItems() == items) + + info.delItem('user@localhost', 'foo') + self.failUnless(info.getItems() == [items[0], items[2]]) + + info.delItems() + self.failUnless(info.getItems() == []) + + + +suite = unittest.TestLoader().loadTestsFromTestCase(testdisco) -- cgit v1.2.3 From 938066bd504ea9b1c47c1ead757fac42e1ef5726 Mon Sep 17 00:00:00 2001 From: Lance stout Date: Fri, 28 May 2010 22:26:20 +0800 Subject: Added 'resource-constraint' to the list of error conditions. --- sleekxmpp/stanza/error.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sleekxmpp/stanza/error.py b/sleekxmpp/stanza/error.py index f87b6490..1f4b3d88 100644 --- a/sleekxmpp/stanza/error.py +++ b/sleekxmpp/stanza/error.py @@ -11,7 +11,7 @@ class Error(ElementBase): namespace = 'jabber:client' name = 'error' plugin_attrib = 'error' - conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request')) + conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'resource-constraint', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request')) interfaces = set(('code', 'condition', 'text', 'type')) types = set(('cancel', 'continue', 'modify', 'auth', 'wait')) sub_interfaces = set(('text',)) -- cgit v1.2.3 From 4f864a07f5629b685b4c21c350916ffd4fba68bf Mon Sep 17 00:00:00 2001 From: Lance stout Date: Fri, 28 May 2010 22:45:48 +0800 Subject: Touched up the style of creating an Iq stanza. --- sleekxmpp/plugins/xep_0030.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sleekxmpp/plugins/xep_0030.py b/sleekxmpp/plugins/xep_0030.py index 4370e588..6a31d243 100644 --- a/sleekxmpp/plugins/xep_0030.py +++ b/sleekxmpp/plugins/xep_0030.py @@ -291,7 +291,7 @@ class xep_0030(base.base_plugin): # Older interface methods for backwards compatibility def getInfo(self, jid, node=''): - iq = Iq(self.xmpp) + iq = self.xmpp.Iq() iq['type'] = 'get' iq['to'] = jid iq['from'] = self.xmpp.fulljid @@ -299,7 +299,7 @@ class xep_0030(base.base_plugin): iq.send() def getItems(self, jid, node=''): - iq = Iq(self.xmpp) + iq = self.xmpp.Iq() iq['type'] = 'get' iq['to'] = jid iq['from'] = self.xmpp.fulljid -- cgit v1.2.3 From cff3079a04f9f12da5530c68d385d0c0f03af69e Mon Sep 17 00:00:00 2001 From: Lance stout Date: Sat, 29 May 2010 03:22:35 +0800 Subject: Added missing 'internal-server-error' condition to error stanza interface. --- sleekxmpp/stanza/error.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sleekxmpp/stanza/error.py b/sleekxmpp/stanza/error.py index 1f4b3d88..ee46722a 100644 --- a/sleekxmpp/stanza/error.py +++ b/sleekxmpp/stanza/error.py @@ -11,7 +11,7 @@ class Error(ElementBase): namespace = 'jabber:client' name = 'error' plugin_attrib = 'error' - conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'resource-constraint', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request')) + conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'internal-server-error', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'resource-constraint', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request')) interfaces = set(('code', 'condition', 'text', 'type')) types = set(('cancel', 'continue', 'modify', 'auth', 'wait')) sub_interfaces = set(('text',)) -- cgit v1.2.3 From 82a3918aa475dbd4abc8941b8c5601f61ab2e508 Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Mon, 31 May 2010 03:36:25 -0700 Subject: Scheduler waits too longer, and pubsubstate registration was backwards --- sleekxmpp/plugins/stanza_pubsub.py | 4 ++-- sleekxmpp/xmlstream/scheduler.py | 2 +- sleekxmpp/xmlstream/stanzabase.py | 3 +++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sleekxmpp/plugins/stanza_pubsub.py b/sleekxmpp/plugins/stanza_pubsub.py index 0a75e1e7..1a1526f0 100644 --- a/sleekxmpp/plugins/stanza_pubsub.py +++ b/sleekxmpp/plugins/stanza_pubsub.py @@ -40,8 +40,8 @@ class PubsubStateEvent(ElementBase): plugin_attrib_map = {} plugin_tag_map = {} -stanzaPlugin(Message, PubsubState) -stanzaPlugin(PubsubState, PubsubStateEvent) +stanzaPlugin(Message, PubsubStateEvent) +stanzaPlugin(PubsubStateEvent, PubsubState) class Pubsub(ElementBase): namespace = 'http://jabber.org/protocol/pubsub' diff --git a/sleekxmpp/xmlstream/scheduler.py b/sleekxmpp/xmlstream/scheduler.py index 18d9229e..945d9fad 100644 --- a/sleekxmpp/xmlstream/scheduler.py +++ b/sleekxmpp/xmlstream/scheduler.py @@ -49,7 +49,7 @@ class Scheduler(object): self.run = True while self.run: try: - wait = 5 + wait = 1 updated = False if self.schedule: wait = self.schedule[0].next - time.time() diff --git a/sleekxmpp/xmlstream/stanzabase.py b/sleekxmpp/xmlstream/stanzabase.py index c40922be..64020c8f 100644 --- a/sleekxmpp/xmlstream/stanzabase.py +++ b/sleekxmpp/xmlstream/stanzabase.py @@ -78,6 +78,9 @@ class ElementBase(tostring.ToString): def __iter__(self): self.idx = 0 return self + + def __bool__(self): + return True def __next__(self): self.idx += 1 -- cgit v1.2.3 From aa916c9ac893a976c8e26ee07ca4a9768a5e1680 Mon Sep 17 00:00:00 2001 From: Nathan Fritz Date: Mon, 31 May 2010 13:57:39 -0700 Subject: included jobs plugin --- sleekxmpp/plugins/jobs.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 sleekxmpp/plugins/jobs.py diff --git a/sleekxmpp/plugins/jobs.py b/sleekxmpp/plugins/jobs.py new file mode 100644 index 00000000..bb2e2554 --- /dev/null +++ b/sleekxmpp/plugins/jobs.py @@ -0,0 +1,44 @@ +from . import base +import logging +from xml.etree import cElementTree as ET + +class jobs(base.base_plugin): + def plugin_init(self): + self.xep = 'pubsubjob' + self.description = "Job distribution over Pubsub" + + def post_init(self): + pass + #TODO add event + + def createJobNode(self, host, jid, node, config=None): + pass + + def createJob(self, host, node, jobid=None, payload=None): + return self.xmpp.plugin['xep_0060'].setItem(host, node, ((jobid, payload),)) + + def claimJob(self, host, node, jobid, ifrom=None): + return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}claimed')) + + def unclaimJob(self, jobid): + return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}unclaimed')) + + def finishJob(self, host, node, jobid, payload=None): + finished = ET.Element('{http://andyet.net/protocol/pubsubjob}finished') + if payload is not None: + finished.append(payload) + return self._setState(host, node, jobid, finished) + + def _setState(self, host, node, jobid, state, ifrom=None): + iq = self.xmpp.Iq() + iq['to'] = host + if ifrom: iq['from'] = ifrom + iq['type'] = 'set' + iq['psstate']['node'] = node + iq['psstate']['item'] = jobid + iq['psstate']['payload'] = state + result = iq.send() + if result is None or result['type'] != 'result': + return False + return True + -- cgit v1.2.3