From 9a34c9a9a1cb4ee6e34089f9e2aa204f7ed8c0c0 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Fri, 1 Oct 2010 13:49:58 -0400 Subject: Modified event handling to use the event queue. Updated tests to match. (Needed to add a small wait to make sure the event got through the queue before checking the results.) --- sleekxmpp/xmlstream/xmlstream.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) (limited to 'sleekxmpp/xmlstream/xmlstream.py') diff --git a/sleekxmpp/xmlstream/xmlstream.py b/sleekxmpp/xmlstream/xmlstream.py index 401865c6..948b19db 100644 --- a/sleekxmpp/xmlstream/xmlstream.py +++ b/sleekxmpp/xmlstream/xmlstream.py @@ -398,6 +398,11 @@ class XMLStream(object): stream processing and not during normal event processing. """ + # To prevent circular dependencies, we must load the matcher + # and handler classes here. + from sleekxmpp.xmlstream.matcher import MatchXMLMask + from sleekxmpp.xmlstream.handler import XMLCallback + if name is None: name = 'add_handler_%s' % self.getNewId() self.registerHandler(XMLCallback(name, MatchXMLMask(mask), pointer, @@ -478,19 +483,13 @@ class XMLStream(object): Defaults to an empty dictionary. """ for handler in self.__event_handlers.get(name, []): - func, threaded, disposable = handler - - handler_data = copy.copy(data) - if threaded: - x = threading.Thread(name="Event_%s" % str(func), - target=func, - args=(handler_data,)) - x.start() - else: - func(handler_data) - if disposable: + self.event_queue.put(('event', handler, copy.copy(data))) + if handler[2]: + # If the handler is disposable, we will go ahead and + # remove it now instead of waiting for it to be + # processed in the queue. with self.__event_handlers_lock: - handler_index = self.event_handlers[name].index(handler) + handler_index = self.__event_handlers[name].index(handler) self.__event_handlers[name].pop(handler_index) def schedule(self, name, seconds, callback, args=None, @@ -522,7 +521,7 @@ class XMLStream(object): """ return xml - def send(self, data, mask, timeout=RESPONSE_TIMEOUT): + def send(self, data, mask=None, timeout=RESPONSE_TIMEOUT): """ A wrapper for send_raw for sending stanza objects. @@ -772,7 +771,7 @@ class XMLStream(object): try: handler.run(args[0]) except Exception as e: - error_msg = 'Error processing event handler: %s' + error_msg = 'Error processing stream handler: %s' logging.exception(error_msg % handler.name) args[0].exception(e) elif etype == 'schedule': @@ -781,6 +780,18 @@ class XMLStream(object): handler(*args[0]) except: logging.exception('Error processing scheduled task') + elif etype == 'event': + func, threaded, disposable = handler + try: + if threaded: + x = threading.Thread(name="Event_%s" % str(func), + target=func, + args=args) + x.start() + else: + func(*args) + except: + logging.exception('Error processing event handler: %s') elif etype == 'quit': logging.debug("Quitting event runner thread") return False -- cgit v1.2.3