From 67235c42145ef40d841e9cb7ffa4bbcac490568a Mon Sep 17 00:00:00 2001 From: Joe Hildebrand Date: Mon, 29 Oct 2012 10:03:32 -0600 Subject: Allow IQ timeouts to be asynchronous, by passing a timeout_callback parameter to send(). An example modification of disco is included. If this approach is approved, I'll go through and update the other plugins. --- sleekxmpp/plugins/xep_0030/disco.py | 10 ++++++++-- sleekxmpp/stanza/iq.py | 37 ++++++++++++++++++++++++++++++++----- tests/test_stream_handlers.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/sleekxmpp/plugins/xep_0030/disco.py b/sleekxmpp/plugins/xep_0030/disco.py index 278b4a34..8a397923 100644 --- a/sleekxmpp/plugins/xep_0030/disco.py +++ b/sleekxmpp/plugins/xep_0030/disco.py @@ -324,6 +324,8 @@ class XEP_0030(BasePlugin): callback -- Optional callback to execute when a reply is received instead of blocking and waiting for the reply. + timeout_callback -- Optional callback to execute when no result + has been received in timeout seconds. """ if local is None: if jid is not None and not isinstance(jid, JID): @@ -364,7 +366,8 @@ class XEP_0030(BasePlugin): iq['disco_info']['node'] = node if node else '' return iq.send(timeout=kwargs.get('timeout', None), block=kwargs.get('block', True), - callback=kwargs.get('callback', None)) + callback=kwargs.get('callback', None), + timeout_callback=kwargs.get('timeout_callback', None)) def set_info(self, jid=None, node=None, info=None): """ @@ -405,6 +408,8 @@ class XEP_0030(BasePlugin): iterator -- If True, return a result set iterator using the XEP-0059 plugin, if the plugin is loaded. Otherwise the parameter is ignored. + timeout_callback -- Optional callback to execute when no result + has been received in timeout seconds. """ if local or local is None and jid is None: items = self.api['get_items'](jid, node, @@ -423,7 +428,8 @@ class XEP_0030(BasePlugin): else: return iq.send(timeout=kwargs.get('timeout', None), block=kwargs.get('block', True), - callback=kwargs.get('callback', None)) + callback=kwargs.get('callback', None), + timeout_callback=kwargs.get('timeout_callback', None)) def set_items(self, jid=None, node=None, **kwargs): """ diff --git a/sleekxmpp/stanza/iq.py b/sleekxmpp/stanza/iq.py index f45b3c67..71c0444d 100644 --- a/sleekxmpp/stanza/iq.py +++ b/sleekxmpp/stanza/iq.py @@ -154,7 +154,7 @@ class Iq(RootStanza): StanzaBase.reply(self, clear) return self - def send(self, block=True, timeout=None, callback=None, now=False): + def send(self, block=True, timeout=None, callback=None, now=False, timeout_callback=None): """ Send an stanza over the XML stream. @@ -181,15 +181,32 @@ class Iq(RootStanza): now -- Indicates if the send queue should be skipped and send the stanza immediately. Used during stream initialization. Defaults to False. + timeout_callback -- Optional reference to a stream handler function. + Will be executed when the timeout expires before a + response has been received with the originally-sent IQ + stanza. Only called if there is a callback parameter + (and therefore are in async mode). """ if timeout is None: timeout = self.stream.response_timeout if callback is not None and self['type'] in ('get', 'set'): handler_name = 'IqCallback_%s' % self['id'] - handler = Callback(handler_name, - MatcherId(self['id']), - callback, - once=True) + if timeout_callback: + self.callback = callback + self.timeout_callback = timeout_callback + self.stream.schedule('IqTimeout_%s' % self['id'], + timeout, + self._fire_timeout, + repeat=False) + handler = Callback(handler_name, + MatcherId(self['id']), + self._handle_result, + once=True) + else: + handler = Callback(handler_name, + MatcherId(self['id']), + callback, + once=True) self.stream.register_handler(handler) StanzaBase.send(self, now=now) return handler_name @@ -206,6 +223,16 @@ class Iq(RootStanza): else: return StanzaBase.send(self, now=now) + def _handle_result(self, iq): + # we got the IQ, so don't fire the timeout + self.stream.scheduler.remove('IqTimeout_%s' % self['id']) + self.callback(iq) + + def _fire_timeout(self): + # don't fire the handler for the IQ, if it finally does come in + self.stream.remove_handler('IqCallback_%s' % self['id']) + self.timeout_callback(self) + def _set_stanza_values(self, values): """ Set multiple stanza interface values using a dictionary. diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py index 7fd4e648..cdd128bb 100644 --- a/tests/test_stream_handlers.py +++ b/tests/test_stream_handlers.py @@ -153,6 +153,35 @@ class TestHandlers(SleekTest): self.failUnless(events == ['foo'], "Iq callback was not executed: %s" % events) + def testIqTimeoutCallback(self): + """Test that iq.send(tcallback=handle_foo, timeout_callback=handle_timeout) works.""" + events = [] + + def handle_foo(iq): + events.append('foo') + + def handle_timeout(iq): + events.append('timeout') + + iq = self.Iq() + iq['type'] = 'get' + iq['id'] = 'test-foo' + iq['to'] = 'user@localhost' + iq['query'] = 'foo' + iq.send(callback=handle_foo, timeout_callback=handle_timeout, timeout=0.05) + + self.send(""" + + + + """) + + # Give event queue time to process + time.sleep(0.1) + + self.failUnless(events == ['timeout'], + "Iq timeout was not executed: %s" % events) + def testMultipleHandlersForStanza(self): """ Test that multiple handlers for a single stanza work -- cgit v1.2.3