diff options
-rw-r--r-- | docs/differences.rst | 10 | ||||
-rw-r--r-- | docs/using_asyncio.rst | 40 | ||||
-rw-r--r-- | slixmpp/__init__.py | 2 | ||||
-rw-r--r-- | slixmpp/stanza/iq.py | 118 | ||||
-rw-r--r-- | slixmpp/xmlstream/asyncio.py | 23 |
5 files changed, 87 insertions, 106 deletions
diff --git a/docs/differences.rst b/docs/differences.rst index e6e26d2c..f6e005b5 100644 --- a/docs/differences.rst +++ b/docs/differences.rst @@ -32,12 +32,12 @@ Differences from SleekXMPP handlers, which will be also handled in the event loop. The :class:`~.slixmpp.stanza.Iq` object’s :meth:`~.slixmpp.stanza.Iq.send` - method now takes a *coroutine* parameter which, if set to ``True``, - will return a coroutine which will (asyncio-)block until the reply - is received. + method now **always** return a :class:`~.asyncio.Future` which result will be set + to the IQ reply when it is received, or to ``None`` if the IQ is not of + type ``get`` or ``set``. - Many plugins (WIP) calls which retrieve information also accept this - ``coroutine`` parameter. + Many plugins (WIP) calls which retrieve information also return the same + future. **Architectural differences** slixmpp does not have an event queue anymore, and instead processes diff --git a/docs/using_asyncio.rst b/docs/using_asyncio.rst index 7f63d29d..c84148e8 100644 --- a/docs/using_asyncio.rst +++ b/docs/using_asyncio.rst @@ -7,23 +7,46 @@ Using asyncio Block on IQ sending ~~~~~~~~~~~~~~~~~~~ -:meth:`.Iq.send` now accepts a ``coroutine`` parameter which, if ``True``, -will return a coroutine waiting for the IQ reply to be received. +:meth:`.Iq.send` now returns a :class:`~.Future` so you can easily block with: .. code-block:: python - result = yield from iq.send(coroutine=True) + result = yield from iq.send() + +.. warning:: + + If the reply is an IQ with an ``error`` type, this will raise an + :class:`.IqError`, and if it timeouts, it will raise an + :class:`.IqTimeout`. Don't forget to catch it. + +You can still use callbacks instead. XEP plugin integration ~~~~~~~~~~~~~~~~~~~~~~ -Many XEP plugins have been modified to accept this ``coroutine`` parameter as -well, so you can do things like: +The same changes from the SleekXMPP API apply, so you can do: + +.. code-block:: python + + iq_info = yield from self.xmpp['xep_0030'].get_info(jid) + +But the following will only return a Future: .. code-block:: python - iq_info = yield from self.xmpp['xep_0030'].get_info(jid, coroutine=True) + iq_info = self.xmpp['xep_0030'].get_info(jid) + + +Callbacks, Event Handlers, and Stream Handlers +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +IQ callbacks and :term:`Event Handlers <event handler>` can be coroutine +functions; in this case, they will be scheduled in the event loop using +:meth:`.asyncio.async` and not ran immediately. +A :class:`.CoroutineCallback` class has been added as well for +:term:`Stream Handlers <stream handler>`, which will use +:meth:`.asyncio.async` to schedule the callback. Running the event loop ~~~~~~~~~~~~~~~~~~~~~~ @@ -52,7 +75,7 @@ callbacks while everything is not ready. client = slixmpp.ClientXMPP('jid@example', 'password') client.connected_event = asyncio.Event() - callback = lambda event: client.connected_event.set() + callback = lambda _: client.connected_event.set() client.add_event_handler('session_start', callback) client.connect() loop.run_until_complete(event.wait()) @@ -112,8 +135,7 @@ JID indicating its findings. def on_message(self, event): # You should probably handle IqError and IqTimeout exceptions here # but this is an example. - version = yield from self['xep_0092'].get_version(message['from'], - coroutine=True) + version = yield from self['xep_0092'].get_version(message['from']) text = "%s sent me a message, he runs %s" % (message['from'], version['software_version']['name']) self.send_message(mto='master@example.tld', mbody=text) diff --git a/slixmpp/__init__.py b/slixmpp/__init__.py index 7164846f..0730cc60 100644 --- a/slixmpp/__init__.py +++ b/slixmpp/__init__.py @@ -16,7 +16,7 @@ from slixmpp.xmlstream.stanzabase import ET, ElementBase, register_stanza_plugin from slixmpp.xmlstream.handler import * from slixmpp.xmlstream import XMLStream from slixmpp.xmlstream.matcher import * -from slixmpp.xmlstream.asyncio import asyncio, coroutine_wrapper +from slixmpp.xmlstream.asyncio import asyncio, future_wrapper from slixmpp.basexmpp import BaseXMPP from slixmpp.clientxmpp import ClientXMPP from slixmpp.componentxmpp import ComponentXMPP diff --git a/slixmpp/stanza/iq.py b/slixmpp/stanza/iq.py index f7b222e6..2730314f 100644 --- a/slixmpp/stanza/iq.py +++ b/slixmpp/stanza/iq.py @@ -152,59 +152,15 @@ class Iq(RootStanza): new_iq['type'] = 'result' return new_iq - @asyncio.coroutine - def _send_coroutine(self, matcher=None, timeout=None): - """Send an <iq> stanza over the XML stream. - - Blocks (with asyncio) until a the reply is received. - Use with yield from iq.send(coroutine=True). - - Overrides StanzaBase.send - - :param int timeout: The length of time (in seconds) to wait for a - response before an IqTimeout is raised - """ - - future = asyncio.Future() - - def callback(result): - future.set_result(result) - - def callback_timeout(): - future.set_result(None) - - handler_name = 'IqCallback_%s' % self['id'] - - if timeout: - self.callback = callback - self.stream.schedule('IqTimeout_%s' % self['id'], - timeout, - callback_timeout, - repeat=False) - handler = Callback(handler_name, - matcher, - self._handle_result, - once=True) - else: - handler = Callback(handler_name, - matcher, - callback, - once=True) - self.stream.register_handler(handler) - StanzaBase.send(self) - result = yield from future - if result is None: - raise IqTimeout(self) - if result['type'] == 'error': - raise IqError(result) - return result - def send(self, callback=None, timeout=None, timeout_callback=None, coroutine=False): """Send an <iq> stanza over the XML stream. A callback handler can be provided that will be executed when the Iq stanza's result reply is received. + Returns a future which result will be set to the result Iq if it is of type 'get' or 'set' + (when it is received), or a future with the result set to None if it has another type. + Overrides StanzaBase.send :param function callback: Optional reference to a stream handler @@ -218,8 +174,7 @@ class Iq(RootStanza): timeout expires before a response has been received for the originally-sent IQ stanza. - :param bool coroutine: This function will return a coroutine if this - argument is True. + :rtype: asyncio.Future """ if self.stream.session_bind_event.is_set(): matcher = MatchIDSender({ @@ -230,36 +185,45 @@ class Iq(RootStanza): else: matcher = MatcherId(self['id']) - if not coroutine: - if callback is not None and self['type'] in ('get', 'set'): - handler_name = 'IqCallback_%s' % self['id'] - if asyncio.iscoroutinefunction(callback): - constr = CoroutineCallback - else: - constr = Callback - if timeout_callback: - self.callback = callback - self.timeout_callback = timeout_callback - self.stream.schedule('IqTimeout_%s' % self['id'], - timeout, - self._fire_timeout, - repeat=False) - handler = constr(handler_name, - matcher, - self._handle_result, - once=True) - else: - handler = constr(handler_name, - matcher, - callback, - once=True) - self.stream.register_handler(handler) - StanzaBase.send(self) - return handler_name + future = asyncio.Future() + + def callback_success(result): + if result['type'] == 'error': + future.set_exception(IqError(result)) else: - return StanzaBase.send(self) + future.set_result(result) + + if timeout_callback is not None and timeout is not None: + self.stream.cancel_schedule('IqTimeout_%s' % self['id']) + if callback is not None: + callback(result) + + def callback_timeout(): + future.set_exception(IqTimeout(self)) + self.stream.remove_handler('IqCallback_%s' % self['id']) + if timeout_callback is not None: + timeout_callback(self) + + if self['type'] in ('get', 'set'): + handler_name = 'IqCallback_%s' % self['id'] + if asyncio.iscoroutinefunction(callback): + constr = CoroutineCallback + else: + constr = Callback + if timeout_callback is not None and timeout is not None: + self.stream.schedule('IqTimeout_%s' % self['id'], + timeout, + callback_timeout, + repeat=False) + handler = constr(handler_name, + matcher, + callback_success, + once=True) + self.stream.register_handler(handler) else: - return self._send_coroutine(timeout=timeout, matcher=matcher) + future.set_result(None) + StanzaBase.send(self) + return future def _handle_result(self, iq): # we got the IQ, so don't fire the timeout diff --git a/slixmpp/xmlstream/asyncio.py b/slixmpp/xmlstream/asyncio.py index 76195237..0e0f610a 100644 --- a/slixmpp/xmlstream/asyncio.py +++ b/slixmpp/xmlstream/asyncio.py @@ -33,23 +33,18 @@ cls.idle_call = idle_call real_run_once = cls._run_once cls._run_once = my_run_once - -def coroutine_wrapper(func): +def future_wrapper(func): """ - Make sure the result of a function call is a coroutine - if the ``coroutine`` keyword argument is true. + Make sure the result of a function call is an asyncio.Future() + object. """ - def wrap_coro(result): - if asyncio.iscoroutinefunction(result): - return result - else: - return asyncio.coroutine(lambda: result)() - @wraps(func) def wrapper(*args, **kwargs): - if kwargs.get('coroutine', False): - return wrap_coro(func(*args, **kwargs)) - else: - return func(*args, **kwargs) + result = func(*args, **kwargs) + if isinstance(result, asyncio.Future): + return result + future = asyncio.Future() + future.set_result(result) + return future return wrapper |