summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/architecture.rst4
-rw-r--r--slixmpp/componentxmpp.py2
-rw-r--r--slixmpp/features/feature_bind/bind.py2
-rw-r--r--slixmpp/features/feature_mechanisms/mechanisms.py8
-rw-r--r--slixmpp/plugins/xep_0077/register.py2
-rw-r--r--slixmpp/plugins/xep_0078/legacyauth.py10
-rw-r--r--slixmpp/xmlstream/xmlstream.py86
7 files changed, 40 insertions, 74 deletions
diff --git a/docs/architecture.rst b/docs/architecture.rst
index 52bb8d9c..2d0170d2 100644
--- a/docs/architecture.rst
+++ b/docs/architecture.rst
@@ -108,10 +108,6 @@ when this bit of XML is received (with an assumed namespace of
handlers <event handler>`. Each stanza/handler pair is then put into the
event queue.
- .. note::
- It is possible to skip the event queue and process an event immediately
- by using ``direct=True`` when raising the event.
-
The code for :meth:`BaseXMPP._handle_message` follows this pattern, and
raises a ``'message'`` event::
diff --git a/slixmpp/componentxmpp.py b/slixmpp/componentxmpp.py
index 632db189..52829dfa 100644
--- a/slixmpp/componentxmpp.py
+++ b/slixmpp/componentxmpp.py
@@ -152,7 +152,7 @@ class ComponentXMPP(BaseXMPP):
"""
self.session_bind_event.set()
self.session_started_event.set()
- self.event('session_bind', self.boundjid, direct=True)
+ self.event('session_bind', self.boundjid)
self.event('session_start')
def _handle_probe(self, pres):
diff --git a/slixmpp/features/feature_bind/bind.py b/slixmpp/features/feature_bind/bind.py
index e26c3ce6..d41bbc3f 100644
--- a/slixmpp/features/feature_bind/bind.py
+++ b/slixmpp/features/feature_bind/bind.py
@@ -54,7 +54,7 @@ class FeatureBind(BasePlugin):
def _on_bind_response(self, response):
self.xmpp.boundjid = JID(response['bind']['jid'], cache_lock=True)
self.xmpp.bound = True
- self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True)
+ self.xmpp.event('session_bind', self.xmpp.boundjid)
self.xmpp.session_bind_event.set()
self.xmpp.features.add('bind')
diff --git a/slixmpp/features/feature_mechanisms/mechanisms.py b/slixmpp/features/feature_mechanisms/mechanisms.py
index ee7db41e..fe38fbd5 100644
--- a/slixmpp/features/feature_mechanisms/mechanisms.py
+++ b/slixmpp/features/feature_mechanisms/mechanisms.py
@@ -172,8 +172,8 @@ class FeatureMechanisms(BasePlugin):
min_mech=self.min_mech)
except sasl.SASLNoAppropriateMechanism:
log.error("No appropriate login method.")
- self.xmpp.event("no_auth", direct=True)
- self.xmpp.event("failed_auth", direct=True)
+ self.xmpp.event("no_auth")
+ self.xmpp.event("failed_auth")
self.attempted_mechs = set()
return self.xmpp.disconnect()
except StringPrepError:
@@ -232,7 +232,7 @@ class FeatureMechanisms(BasePlugin):
self.attempted_mechs = set()
self.xmpp.authenticated = True
self.xmpp.features.add('mechanisms')
- self.xmpp.event('auth_success', stanza, direct=True)
+ self.xmpp.event('auth_success', stanza)
# Restart the stream
self.xmpp.init_parser()
self.xmpp.send_raw(self.xmpp.stream_header)
@@ -241,6 +241,6 @@ class FeatureMechanisms(BasePlugin):
"""SASL authentication failed. Disconnect and shutdown."""
self.attempted_mechs.add(self.mech.name)
log.info("Authentication failed: %s", stanza['condition'])
- self.xmpp.event("failed_auth", stanza, direct=True)
+ self.xmpp.event("failed_auth", stanza)
self._send_auth()
return True
diff --git a/slixmpp/plugins/xep_0077/register.py b/slixmpp/plugins/xep_0077/register.py
index d83ff1a7..7c6d99a0 100644
--- a/slixmpp/plugins/xep_0077/register.py
+++ b/slixmpp/plugins/xep_0077/register.py
@@ -77,7 +77,7 @@ class XEP_0077(BasePlugin):
if self.create_account and self.xmpp.event_handled('register'):
form = self.get_registration()
- self.xmpp.event('register', form, direct=True)
+ self.xmpp.event('register', form)
return True
return False
diff --git a/slixmpp/plugins/xep_0078/legacyauth.py b/slixmpp/plugins/xep_0078/legacyauth.py
index 8d2ea230..eac1b57e 100644
--- a/slixmpp/plugins/xep_0078/legacyauth.py
+++ b/slixmpp/plugins/xep_0078/legacyauth.py
@@ -82,12 +82,12 @@ class XEP_0078(BasePlugin):
resp = iq.send(now=True)
except IqError as err:
log.info("Authentication failed: %s", err.iq['error']['condition'])
- self.xmpp.event('failed_auth', direct=True)
+ self.xmpp.event('failed_auth')
self.xmpp.disconnect()
return True
except IqTimeout:
log.info("Authentication failed: %s", 'timeout')
- self.xmpp.event('failed_auth', direct=True)
+ self.xmpp.event('failed_auth')
self.xmpp.disconnect()
return True
@@ -123,11 +123,11 @@ class XEP_0078(BasePlugin):
result = iq.send(now=True)
except IqError as err:
log.info("Authentication failed")
- self.xmpp.event("failed_auth", direct=True)
+ self.xmpp.event("failed_auth")
self.xmpp.disconnect()
except IqTimeout:
log.info("Authentication failed")
- self.xmpp.event("failed_auth", direct=True)
+ self.xmpp.event("failed_auth")
self.xmpp.disconnect()
self.xmpp.features.add('auth')
@@ -137,7 +137,7 @@ class XEP_0078(BasePlugin):
self.xmpp.boundjid = JID(self.xmpp.requested_jid,
resource=resource,
cache_lock=True)
- self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True)
+ self.xmpp.event('session_bind', self.xmpp.boundjid)
log.debug("Established Session")
self.xmpp.sessionstarted = True
diff --git a/slixmpp/xmlstream/xmlstream.py b/slixmpp/xmlstream/xmlstream.py
index 838f3649..3df98862 100644
--- a/slixmpp/xmlstream/xmlstream.py
+++ b/slixmpp/xmlstream/xmlstream.py
@@ -612,16 +612,13 @@ class XMLStream(object):
"""
return len(self.__event_handlers.get(name, []))
- def event(self, name, data={}, direct=False):
+ def event(self, name, data={}):
"""Manually trigger a custom event.
:param name: The name of the event to trigger.
:param data: Data that will be passed to each event handler.
Defaults to an empty dictionary, but is usually
a stanza object.
- :param direct: Runs the event directly if True, skipping the
- event queue. All event handlers will run in the
- same thread.
"""
log.debug("Event triggered: " + name)
@@ -633,18 +630,13 @@ class XMLStream(object):
out_data = copy.copy(data) if len(handlers) > 1 else data
old_exception = getattr(data, 'exception', None)
- if direct:
- try:
- handler_callback(out_data)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(handler_callback))
- if old_exception:
- old_exception(e)
- else:
- self.exception(e)
- else:
- self.run_event(('event', handler, out_data))
+ try:
+ handler_callback(out_data)
+ except Exception as e:
+ if old_exception:
+ old_exception(e)
+ else:
+ self.exception(e)
if disposable:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
@@ -687,11 +679,19 @@ class XMLStream(object):
except KeyError:
log.debug("Tried to cancel unscheduled event: %s" % (name,))
+ def _safe_cb_run(self, name, cb):
+ log.debug('Scheduled event: %s', name)
+ try:
+ cb()
+ except Exception as e:
+ log.exception('Error processing scheduled task')
+ self.exception(e)
+
def _execute_and_reschedule(self, name, cb, seconds):
"""Simple method that calls the given callback, and then schedule itself to
be called after the given number of seconds.
"""
- cb()
+ self._safe_cb_run(name, cb)
loop = asyncio.get_event_loop()
handle = loop.call_later(seconds, self._execute_and_reschedule,
name, cb, seconds)
@@ -701,7 +701,7 @@ class XMLStream(object):
"""
Execute the callback and remove the handler for it.
"""
- cb()
+ self._safe_cb_run(name, cb)
del self.scheduled_events[name]
def incoming_filter(self, xml):
@@ -817,7 +817,7 @@ class XMLStream(object):
# Match the stanza against registered handlers. Handlers marked
# to run "in stream" will be executed immediately; the rest will
# be queued.
- unhandled = True
+ handled = False
matched_handlers = [h for h in self.__handlers if h.match(stanza)]
for handler in matched_handlers:
if len(matched_handlers) > 1:
@@ -825,51 +825,21 @@ class XMLStream(object):
else:
stanza_copy = stanza
handler.prerun(stanza_copy)
- self.run_event(('stanza', handler, stanza_copy))
try:
- if handler.check_delete():
- self.__handlers.remove(handler)
- except:
- pass # not thread safe
- unhandled = False
+ handler.run(stanza_copy)
+ except Exception as e:
+ error_msg = 'Error processing stream handler: %s'
+ log.exception(error_msg, handler.name)
+ stanza_copy.exception(e)
+ if handler.check_delete():
+ self.__handlers.remove(handler)
+ handled = True
# Some stanzas require responses, such as Iq queries. A default
# handler will be executed immediately for this case.
- if unhandled:
+ if not handled:
stanza.unhandled()
- def run_event(self, event):
- etype, handler = event[0:2]
- args = event[2:]
- orig = copy.copy(args[0])
-
- if etype == 'stanza':
- try:
- handler.run(args[0])
- except Exception as e:
- error_msg = 'Error processing stream handler: %s'
- log.exception(error_msg, handler.name)
- orig.exception(e)
- elif etype == 'schedule':
- name = args[2]
- try:
- log.debug('Scheduled event: %s: %s', name, args[0])
- handler(*args[0], **args[1])
- except Exception as e:
- log.exception('Error processing scheduled task')
- self.exception(e)
- elif etype == 'event':
- func, disposable = handler
- try:
- func(*args)
- except Exception as e:
- error_msg = 'Error processing event handler: %s'
- log.exception(error_msg, str(func))
- if hasattr(orig, 'exception'):
- orig.exception(e)
- else:
- self.exception(e)
-
def exception(self, exception):
"""Process an unknown exception.