summaryrefslogtreecommitdiff
path: root/slixmpp/xmlstream/xmlstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'slixmpp/xmlstream/xmlstream.py')
-rw-r--r--slixmpp/xmlstream/xmlstream.py86
1 files changed, 28 insertions, 58 deletions
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 838f3649..3df98862 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -612,16 +612,13 @@ class XMLStream(object):
"""
return len(self.__event_handlers.get(name, []))
- def event(self, name, data={}, direct=False):
+ def event(self, name, data={}):
"""Manually trigger a custom event.
:param name: The name of the event to trigger.
:param data: Data that will be passed to each event handler.
Defaults to an empty dictionary, but is usually
a stanza object.
- :param direct: Runs the event directly if True, skipping the
- event queue. All event handlers will run in the
- same thread.
"""
log.debug("Event triggered: " + name)
@@ -633,18 +630,13 @@ class XMLStream(object):
out_data = copy.copy(data) if len(handlers) > 1 else data
old_exception = getattr(data, 'exception', None)
- if direct:
- try:
- handler_callback(out_data)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(handler_callback))
- if old_exception:
- old_exception(e)
- else:
- self.exception(e)
- else:
- self.run_event(('event', handler, out_data))
+ try:
+ handler_callback(out_data)
+ except Exception as e:
+ if old_exception:
+ old_exception(e)
+ else:
+ self.exception(e)
if disposable:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
@@ -687,11 +679,19 @@ class XMLStream(object):
except KeyError:
log.debug("Tried to cancel unscheduled event: %s" % (name,))
+ def _safe_cb_run(self, name, cb):
+ log.debug('Scheduled event: %s', name)
+ try:
+ cb()
+ except Exception as e:
+ log.exception('Error processing scheduled task')
+ self.exception(e)
+
def _execute_and_reschedule(self, name, cb, seconds):
"""Simple method that calls the given callback, and then schedule itself to
be called after the given number of seconds.
"""
- cb()
+ self._safe_cb_run(name, cb)
loop = asyncio.get_event_loop()
handle = loop.call_later(seconds, self._execute_and_reschedule,
name, cb, seconds)
@@ -701,7 +701,7 @@ class XMLStream(object):
"""
Execute the callback and remove the handler for it.
"""
- cb()
+ self._safe_cb_run(name, cb)
del self.scheduled_events[name]
def incoming_filter(self, xml):
@@ -817,7 +817,7 @@ class XMLStream(object):
# Match the stanza against registered handlers. Handlers marked
# to run "in stream" will be executed immediately; the rest will
# be queued.
- unhandled = True
+ handled = False
matched_handlers = [h for h in self.__handlers if h.match(stanza)]
for handler in matched_handlers:
if len(matched_handlers) > 1:
@@ -825,51 +825,21 @@ class XMLStream(object):
else:
stanza_copy = stanza
handler.prerun(stanza_copy)
- self.run_event(('stanza', handler, stanza_copy))
try:
- if handler.check_delete():
- self.__handlers.remove(handler)
- except:
- pass # not thread safe
- unhandled = False
+ handler.run(stanza_copy)
+ except Exception as e:
+ error_msg = 'Error processing stream handler: %s'
+ log.exception(error_msg, handler.name)
+ stanza_copy.exception(e)
+ if handler.check_delete():
+ self.__handlers.remove(handler)
+ handled = True
# Some stanzas require responses, such as Iq queries. A default
# handler will be executed immediately for this case.
- if unhandled:
+ if not handled:
stanza.unhandled()
- def run_event(self, event):
- etype, handler = event[0:2]
- args = event[2:]
- orig = copy.copy(args[0])
-
- if etype == 'stanza':
- try:
- handler.run(args[0])
- except Exception as e:
- error_msg = 'Error processing stream handler: %s'
- log.exception(error_msg, handler.name)
- orig.exception(e)
- elif etype == 'schedule':
- name = args[2]
- try:
- log.debug('Scheduled event: %s: %s', name, args[0])
- handler(*args[0], **args[1])
- except Exception as e:
- log.exception('Error processing scheduled task')
- self.exception(e)
- elif etype == 'event':
- func, disposable = handler
- try:
- func(*args)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(func))
- if hasattr(orig, 'exception'):
- orig.exception(e)
- else:
- self.exception(e)
-
def exception(self, exception):
"""Process an unknown exception.