diff options
Diffstat (limited to 'slixmpp')
-rw-r--r-- | slixmpp/__init__.py | 2 | ||||
-rw-r--r-- | slixmpp/stanza/iq.py | 118 | ||||
-rw-r--r-- | slixmpp/xmlstream/asyncio.py | 23 |
3 files changed, 51 insertions, 92 deletions
diff --git a/slixmpp/__init__.py b/slixmpp/__init__.py index 7164846f..0730cc60 100644 --- a/slixmpp/__init__.py +++ b/slixmpp/__init__.py @@ -16,7 +16,7 @@ from slixmpp.xmlstream.stanzabase import ET, ElementBase, register_stanza_plugin from slixmpp.xmlstream.handler import * from slixmpp.xmlstream import XMLStream from slixmpp.xmlstream.matcher import * -from slixmpp.xmlstream.asyncio import asyncio, coroutine_wrapper +from slixmpp.xmlstream.asyncio import asyncio, future_wrapper from slixmpp.basexmpp import BaseXMPP from slixmpp.clientxmpp import ClientXMPP from slixmpp.componentxmpp import ComponentXMPP diff --git a/slixmpp/stanza/iq.py b/slixmpp/stanza/iq.py index f7b222e6..2730314f 100644 --- a/slixmpp/stanza/iq.py +++ b/slixmpp/stanza/iq.py @@ -152,59 +152,15 @@ class Iq(RootStanza): new_iq['type'] = 'result' return new_iq - @asyncio.coroutine - def _send_coroutine(self, matcher=None, timeout=None): - """Send an <iq> stanza over the XML stream. - - Blocks (with asyncio) until a the reply is received. - Use with yield from iq.send(coroutine=True). - - Overrides StanzaBase.send - - :param int timeout: The length of time (in seconds) to wait for a - response before an IqTimeout is raised - """ - - future = asyncio.Future() - - def callback(result): - future.set_result(result) - - def callback_timeout(): - future.set_result(None) - - handler_name = 'IqCallback_%s' % self['id'] - - if timeout: - self.callback = callback - self.stream.schedule('IqTimeout_%s' % self['id'], - timeout, - callback_timeout, - repeat=False) - handler = Callback(handler_name, - matcher, - self._handle_result, - once=True) - else: - handler = Callback(handler_name, - matcher, - callback, - once=True) - self.stream.register_handler(handler) - StanzaBase.send(self) - result = yield from future - if result is None: - raise IqTimeout(self) - if result['type'] == 'error': - raise IqError(result) - return result - def send(self, callback=None, timeout=None, timeout_callback=None, coroutine=False): """Send an <iq> stanza over the XML stream. A callback handler can be provided that will be executed when the Iq stanza's result reply is received. + Returns a future which result will be set to the result Iq if it is of type 'get' or 'set' + (when it is received), or a future with the result set to None if it has another type. + Overrides StanzaBase.send :param function callback: Optional reference to a stream handler @@ -218,8 +174,7 @@ class Iq(RootStanza): timeout expires before a response has been received for the originally-sent IQ stanza. - :param bool coroutine: This function will return a coroutine if this - argument is True. + :rtype: asyncio.Future """ if self.stream.session_bind_event.is_set(): matcher = MatchIDSender({ @@ -230,36 +185,45 @@ class Iq(RootStanza): else: matcher = MatcherId(self['id']) - if not coroutine: - if callback is not None and self['type'] in ('get', 'set'): - handler_name = 'IqCallback_%s' % self['id'] - if asyncio.iscoroutinefunction(callback): - constr = CoroutineCallback - else: - constr = Callback - if timeout_callback: - self.callback = callback - self.timeout_callback = timeout_callback - self.stream.schedule('IqTimeout_%s' % self['id'], - timeout, - self._fire_timeout, - repeat=False) - handler = constr(handler_name, - matcher, - self._handle_result, - once=True) - else: - handler = constr(handler_name, - matcher, - callback, - once=True) - self.stream.register_handler(handler) - StanzaBase.send(self) - return handler_name + future = asyncio.Future() + + def callback_success(result): + if result['type'] == 'error': + future.set_exception(IqError(result)) else: - return StanzaBase.send(self) + future.set_result(result) + + if timeout_callback is not None and timeout is not None: + self.stream.cancel_schedule('IqTimeout_%s' % self['id']) + if callback is not None: + callback(result) + + def callback_timeout(): + future.set_exception(IqTimeout(self)) + self.stream.remove_handler('IqCallback_%s' % self['id']) + if timeout_callback is not None: + timeout_callback(self) + + if self['type'] in ('get', 'set'): + handler_name = 'IqCallback_%s' % self['id'] + if asyncio.iscoroutinefunction(callback): + constr = CoroutineCallback + else: + constr = Callback + if timeout_callback is not None and timeout is not None: + self.stream.schedule('IqTimeout_%s' % self['id'], + timeout, + callback_timeout, + repeat=False) + handler = constr(handler_name, + matcher, + callback_success, + once=True) + self.stream.register_handler(handler) else: - return self._send_coroutine(timeout=timeout, matcher=matcher) + future.set_result(None) + StanzaBase.send(self) + return future def _handle_result(self, iq): # we got the IQ, so don't fire the timeout diff --git a/slixmpp/xmlstream/asyncio.py b/slixmpp/xmlstream/asyncio.py index 76195237..0e0f610a 100644 --- a/slixmpp/xmlstream/asyncio.py +++ b/slixmpp/xmlstream/asyncio.py @@ -33,23 +33,18 @@ cls.idle_call = idle_call real_run_once = cls._run_once cls._run_once = my_run_once - -def coroutine_wrapper(func): +def future_wrapper(func): """ - Make sure the result of a function call is a coroutine - if the ``coroutine`` keyword argument is true. + Make sure the result of a function call is an asyncio.Future() + object. """ - def wrap_coro(result): - if asyncio.iscoroutinefunction(result): - return result - else: - return asyncio.coroutine(lambda: result)() - @wraps(func) def wrapper(*args, **kwargs): - if kwargs.get('coroutine', False): - return wrap_coro(func(*args, **kwargs)) - else: - return func(*args, **kwargs) + result = func(*args, **kwargs) + if isinstance(result, asyncio.Future): + return result + future = asyncio.Future() + future.set_result(result) + return future return wrapper |