summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/api/plugins/xep_0047.rst4
-rw-r--r--docs/api/plugins/xep_0363.rst6
-rw-r--r--itests/test_bob.py35
-rw-r--r--itests/test_httpupload.py37
-rw-r--r--itests/test_ibb.py40
-rw-r--r--itests/test_pep.py64
-rw-r--r--slixmpp/plugins/xep_0047/ibb.py89
-rw-r--r--slixmpp/plugins/xep_0047/stanza.py3
-rw-r--r--slixmpp/plugins/xep_0047/stream.py86
-rw-r--r--slixmpp/plugins/xep_0222.py49
-rw-r--r--slixmpp/plugins/xep_0223.py49
-rw-r--r--slixmpp/plugins/xep_0231/bob.py51
-rw-r--r--slixmpp/plugins/xep_0280/carbons.py38
-rw-r--r--slixmpp/plugins/xep_0363/__init__.py9
-rw-r--r--slixmpp/plugins/xep_0363/http_upload.py109
-rw-r--r--tests/test_stream_xep_0047.py14
16 files changed, 512 insertions, 171 deletions
diff --git a/docs/api/plugins/xep_0047.rst b/docs/api/plugins/xep_0047.rst
index 4efded9b..c8aea741 100644
--- a/docs/api/plugins/xep_0047.rst
+++ b/docs/api/plugins/xep_0047.rst
@@ -8,6 +8,10 @@ XEP-0047: In-band Bytestreams
:members:
:exclude-members: session_bind, plugin_init, plugin_end
+.. module:: slixmpp.plugins.xep_0047
+
+.. autoclass:: IBBytestream
+ :members:
Stanza elements
---------------
diff --git a/docs/api/plugins/xep_0363.rst b/docs/api/plugins/xep_0363.rst
index ebbfdba1..4bbf95fa 100644
--- a/docs/api/plugins/xep_0363.rst
+++ b/docs/api/plugins/xep_0363.rst
@@ -8,6 +8,12 @@ XEP-0363: HTTP File Upload
:members:
:exclude-members: session_bind, plugin_init, plugin_end
+.. autoclass:: UploadServiceNotFound
+
+.. autoclass:: FileTooBig
+
+.. autoclass:: HTTPError
+
Stanza elements
---------------
diff --git a/itests/test_bob.py b/itests/test_bob.py
new file mode 100644
index 00000000..d0827df0
--- /dev/null
+++ b/itests/test_bob.py
@@ -0,0 +1,35 @@
+import asyncio
+import unittest
+from slixmpp.test.integration import SlixIntegration
+
+
+class TestBOB(SlixIntegration):
+ async def asyncSetUp(self):
+ await super().asyncSetUp()
+ self.add_client(
+ self.envjid('CI_ACCOUNT1'),
+ self.envstr('CI_ACCOUNT1_PASSWORD'),
+ )
+ self.add_client(
+ self.envjid('CI_ACCOUNT2'),
+ self.envstr('CI_ACCOUNT2_PASSWORD'),
+ )
+ self.register_plugins(['xep_0231'])
+ self.data = b'to' * 257
+ await self.connect_clients()
+
+ async def test_bob(self):
+ """Check we can send and receive a BOB."""
+ cid = self.clients[0]['xep_0231'].set_bob(
+ self.data,
+ 'image/jpeg',
+ )
+ recv = await self.clients[1]['xep_0231'].get_bob(
+ jid=self.clients[0].boundjid,
+ cid=cid,
+ )
+
+ self.assertEqual(self.data, recv['bob']['data'])
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestBOB)
diff --git a/itests/test_httpupload.py b/itests/test_httpupload.py
new file mode 100644
index 00000000..09e85c1d
--- /dev/null
+++ b/itests/test_httpupload.py
@@ -0,0 +1,37 @@
+try:
+ import aiohttp
+except ImportError:
+ aiohttp = None
+import unittest
+from io import BytesIO
+from slixmpp.test.integration import SlixIntegration
+
+
+class TestHTTPUpload(SlixIntegration):
+ async def asyncSetUp(self):
+ await super().asyncSetUp()
+ self.add_client(
+ self.envjid('CI_ACCOUNT1'),
+ self.envstr('CI_ACCOUNT1_PASSWORD'),
+ )
+ self.register_plugins(['xep_0363'])
+ # Minimal data, we do not want to clutter the remote server
+ self.data = b'tototo'
+ await self.connect_clients()
+
+
+ @unittest.skipIf(aiohttp is None, "aiohttp is not installed")
+ async def test_httpupload(self):
+ """Check we can upload a file properly."""
+ url = await self.clients[0]['xep_0363'].upload_file(
+ 'toto.txt',
+ input_file=BytesIO(self.data),
+ size=len(self.data),
+ )
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as resp:
+ text = await resp.text()
+ self.assertEqual(text.encode('utf-8'), self.data)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestHTTPUpload)
diff --git a/itests/test_ibb.py b/itests/test_ibb.py
new file mode 100644
index 00000000..7cda8e22
--- /dev/null
+++ b/itests/test_ibb.py
@@ -0,0 +1,40 @@
+import asyncio
+import unittest
+from slixmpp.test.integration import SlixIntegration
+
+
+class TestIBB(SlixIntegration):
+ async def asyncSetUp(self):
+ await super().asyncSetUp()
+ self.add_client(
+ self.envjid('CI_ACCOUNT1'),
+ self.envstr('CI_ACCOUNT1_PASSWORD'),
+ )
+ self.add_client(
+ self.envjid('CI_ACCOUNT2'),
+ self.envstr('CI_ACCOUNT2_PASSWORD'),
+ )
+ config = {'block_size': 256, 'auto_accept': True}
+ self.register_plugins(['xep_0047'], [config])
+ self.data = b'to' * 257
+ await self.connect_clients()
+
+ async def test_ibb(self):
+ """Check we can send and receive data through ibb"""
+ coro_in = self.clients[1].wait_until('ibb_stream_start')
+ coro_out = self.clients[0]['xep_0047'].open_stream(
+ self.clients[1].boundjid,
+ sid='toto'
+ )
+ instream, outstream = await asyncio.gather(coro_in, coro_out)
+
+ async def send_and_close():
+ await outstream.sendall(self.data)
+ await outstream.close()
+
+ in_data, _ = await asyncio.gather(instream.gather(), send_and_close())
+
+ self.assertEqual(self.data, in_data)
+
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB)
diff --git a/itests/test_pep.py b/itests/test_pep.py
new file mode 100644
index 00000000..a674a348
--- /dev/null
+++ b/itests/test_pep.py
@@ -0,0 +1,64 @@
+import asyncio
+import unittest
+from uuid import uuid4
+from slixmpp.exceptions import IqError
+from slixmpp.test.integration import SlixIntegration
+from slixmpp.xmlstream import ElementBase, register_stanza_plugin
+from slixmpp.plugins.xep_0060.stanza import Item
+
+class Mystanza(ElementBase):
+ namespace = 'random-ns'
+ name = 'mystanza'
+ plugin_attrib = 'mystanza'
+ interfaces = {'test'}
+
+register_stanza_plugin(Item, Mystanza)
+
+class TestPEP(SlixIntegration):
+ async def asyncSetUp(self):
+ await super().asyncSetUp()
+ self.add_client(
+ self.envjid('CI_ACCOUNT1'),
+ self.envstr('CI_ACCOUNT1_PASSWORD'),
+ )
+ self.add_client(
+ self.envjid('CI_ACCOUNT2'),
+ self.envstr('CI_ACCOUNT2_PASSWORD'),
+ )
+ self.register_plugins(['xep_0222', 'xep_0223'])
+ for client in self.clients:
+ client.auto_authorize = True
+ await self.connect_clients()
+
+ async def test_pep_public(self):
+ """Check we can get and set public PEP data"""
+ stanza = Mystanza()
+ stanza['test'] = str(uuid4().hex)
+ await self.clients[0]['xep_0222'].store(stanza, id='toto')
+ fetched = await self.clients[0]['xep_0222'].retrieve(
+ stanza.namespace,
+ )
+ fetched_stanza = fetched['pubsub']['items']['item']['mystanza']
+ self.assertEqual(fetched_stanza['test'], stanza['test'])
+
+ async def test_pep_private(self):
+ """Check we can get and set private PEP data"""
+ stanza = Mystanza()
+ stanza['test'] = str(uuid4().hex)
+ await self.clients[0]['xep_0223'].store(
+ stanza, node='private-random', id='toto'
+ )
+ fetched = await self.clients[0]['xep_0223'].retrieve(
+ 'private-random',
+ )
+ fetched_stanza = fetched['pubsub']['items']['item']['mystanza']
+ self.assertEqual(fetched_stanza['test'], stanza['test'])
+
+ with self.assertRaises(IqError):
+ fetched = await self.clients[1]['xep_0060'].get_item(
+ jid=self.clients[0].boundjid.bare,
+ node='private-random',
+ item_id='toto',
+ )
+
+suite = unittest.TestLoader().loadTestsFromTestCase(TestPEP)
diff --git a/slixmpp/plugins/xep_0047/ibb.py b/slixmpp/plugins/xep_0047/ibb.py
index bd96eca2..ec08a8b3 100644
--- a/slixmpp/plugins/xep_0047/ibb.py
+++ b/slixmpp/plugins/xep_0047/ibb.py
@@ -1,8 +1,17 @@
+# Slixmpp: The Slick XMPP Library
+# This file is part of Slixmpp
+# See the file LICENSE for copying permission
import asyncio
import uuid
import logging
-from slixmpp import Message, Iq
+from typing import (
+ Optional,
+ Union,
+)
+
+from slixmpp import JID
+from slixmpp.stanza import Message, Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@@ -15,9 +24,27 @@ log = logging.getLogger(__name__)
class XEP_0047(BasePlugin):
+ """
+ XEP-0047: In-Band Bytestreams
+
+ Events registered by this plugin:
+
+ - :term:`ibb_stream_start`
+ - :term:`ibb_stream_end`
+ - :term:`ibb_stream_data`
+ - :term:`stream:[stream id]:[peer jid]`
+
+ Plugin Parameters:
+
+ - ``block_size`` (default: ``4096``): default block size to negociate
+ - ``max_block_size`` (default: ``8192``): max block size to accept
+ - ``auto_accept`` (default: ``False``): if incoming streams should be
+ accepted automatically.
+
+ """
name = 'xep_0047'
- description = 'XEP-0047: In-band Bytestreams'
+ description = 'XEP-0047: In-Band Bytestreams'
dependencies = {'xep_0030'}
stanza = stanza
default_config = {
@@ -105,17 +132,29 @@ class XEP_0047(BasePlugin):
def _preauthorize_sid(self, jid, sid, ifrom, data):
self._preauthed_sids[(jid, sid, ifrom)] = True
- def open_stream(self, jid, block_size=None, sid=None, use_messages=False,
- ifrom=None, timeout=None, callback=None):
+ async def open_stream(self, jid: JID, *, block_size: Optional[int] = None,
+ sid: Optional[str] = None, use_messages: bool = False,
+ ifrom: Optional[JID] = None,
+ **iqkwargs) -> IBBytestream:
+ """Open an IBB stream with a peer JID.
+
+ .. versionchanged:: 1.8.0
+ This function is now a coroutine and must be awaited.
+ All parameters except ``jid`` are keyword-args only.
+
+ :param jid: The remote JID to initiate the stream with.
+ :param block_size: The block size to advertise.
+ :param sid: The IBB stream id (if not provided, will be auto-generated).
+ :param use_messages: If the stream should use message stanzas instead of iqs.
+ :returns: The opened byte stream with the remote JID
+ :raises .IqError: When the remote entity denied the stream.
+ """
if sid is None:
sid = str(uuid.uuid4())
if block_size is None:
block_size = self.block_size
- iq = self.xmpp.Iq()
- iq['type'] = 'set'
- iq['to'] = jid
- iq['from'] = ifrom
+ iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
iq['ibb_open']['block_size'] = block_size
iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
@@ -123,25 +162,21 @@ class XEP_0047(BasePlugin):
stream = IBBytestream(self.xmpp, sid, block_size,
iq['from'], iq['to'], use_messages)
- stream_future = asyncio.Future()
+ callback = iqkwargs.pop('callback', None)
+ result = await iq.send(**iqkwargs)
- def _handle_opened_stream(iq):
- log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from'])
- stream.self_jid = iq['to']
- stream.peer_jid = iq['from']
- stream.stream_started = True
- self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
- stream_future.set_result(stream)
- if callback is not None:
- callback(stream)
- self.xmpp.event('ibb_stream_start', stream)
- self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
-
- iq.send(timeout=timeout, callback=_handle_opened_stream)
-
- return stream_future
+ log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from'])
+ stream.self_jid = result['to']
+ stream.peer_jid = result['from']
+ stream.stream_started = True
+ self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
+ if callback is not None:
+ self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True)
+ self.xmpp.event('ibb_stream_start', stream)
+ self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
+ return stream
- def _handle_open_request(self, iq):
+ def _handle_open_request(self, iq: Iq):
sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size'] or self.block_size
@@ -165,7 +200,7 @@ class XEP_0047(BasePlugin):
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
- def _handle_data(self, stanza):
+ def _handle_data(self, stanza: Union[Iq, Message]):
sid = stanza['ibb_data']['sid']
stream = self.api['get_stream'](stanza['to'], sid, stanza['from'])
if stream is not None and stanza['from'] == stream.peer_jid:
@@ -173,7 +208,7 @@ class XEP_0047(BasePlugin):
else:
raise XMPPError('item-not-found')
- def _handle_close(self, iq):
+ def _handle_close(self, iq: Iq):
sid = iq['ibb_close']['sid']
stream = self.api['get_stream'](iq['to'], sid, iq['from'])
if stream is not None and iq['from'] == stream.peer_jid:
diff --git a/slixmpp/plugins/xep_0047/stanza.py b/slixmpp/plugins/xep_0047/stanza.py
index 5c47508a..ac2935ac 100644
--- a/slixmpp/plugins/xep_0047/stanza.py
+++ b/slixmpp/plugins/xep_0047/stanza.py
@@ -1,3 +1,6 @@
+# Slixmpp: The Slick XMPP Library
+# This file is part of Slixmpp
+# See the file LICENSE for copying permission
import re
import base64
diff --git a/slixmpp/plugins/xep_0047/stream.py b/slixmpp/plugins/xep_0047/stream.py
index 535ba82b..f020ea68 100644
--- a/slixmpp/plugins/xep_0047/stream.py
+++ b/slixmpp/plugins/xep_0047/stream.py
@@ -1,17 +1,32 @@
+# Slixmpp: The Slick XMPP Library
+# This file is part of Slixmpp
+# See the file LICENSE for copying permission
import asyncio
import socket
import logging
-from slixmpp.stanza import Iq
-from slixmpp.exceptions import XMPPError
+from typing import (
+ Optional,
+ IO,
+ Union,
+)
+
+from slixmpp import JID
+from slixmpp.stanza import Iq, Message
+from slixmpp.exceptions import XMPPError, IqTimeout
log = logging.getLogger(__name__)
class IBBytestream(object):
+ """XEP-0047 Stream abstraction. Created by the ibb plugin automatically.
+
+ Provides send methods and triggers :term:`ibb_stream_data` events.
+ """
- def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False):
+ def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID,
+ use_messages: bool = False):
self.xmpp = xmpp
self.sid = sid
self.block_size = block_size
@@ -31,7 +46,12 @@ class IBBytestream(object):
self.recv_queue = asyncio.Queue()
- async def send(self, data, timeout=None):
+ async def send(self, data: bytes, timeout: Optional[int] = None) -> int:
+ """Send a single block of data.
+
+ :param data: Data to send (will be truncated if above block size).
+ :returns: Number of bytes sent.
+ """
if not self.stream_started or self.stream_out_closed:
raise socket.error
if len(data) > self.block_size:
@@ -58,19 +78,62 @@ class IBBytestream(object):
await iq.send(timeout=timeout)
return len(data)
- async def sendall(self, data, timeout=None):
+ async def sendall(self, data: bytes, timeout: Optional[int] = None):
+ """Send all the contents of ``data`` in chunks.
+
+ :param data: Raw data to send.
+ """
sent_len = 0
while sent_len < len(data):
- sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout)
-
- async def sendfile(self, file, timeout=None):
+ sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout)
+
+ async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes:
+ """Gather all data sent on a stream until it is closed, and return it.
+
+ .. versionadded:: 1.8.0
+
+ :param max_data: Max number of bytes to receive. (received data may be
+ over this limit depending on block_size)
+ :param timeout: Timeout after which an error will be raised.
+ :raises .IqTimeout: If the timeout is reached.
+ :returns: All bytes accumulated in the stream.
+ """
+ result = b''
+ end_future = asyncio.Future()
+
+ def on_close(stream):
+ if stream is self:
+ end_future.set_result(True)
+
+ def on_data(stream):
+ nonlocal result
+ if stream is self:
+ result += stream.read()
+ if max_data and len(result) > max_data:
+ end_future.set_result(True)
+
+ self.xmpp.add_event_handler('ibb_stream_end', on_close)
+ self.xmpp.add_event_handler('ibb_stream_data', on_data)
+ try:
+ await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop)
+ except asyncio.TimeoutError:
+ raise IqTimeout(result)
+ finally:
+ self.xmpp.del_event_handler('ibb_stream_end', on_close)
+ self.xmpp.del_event_handler('ibb_stream_data', on_data)
+ return result
+
+ async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None):
+ """Send the contents of a file over the wire, in chunks.
+
+ :param file: The opened file (or file-like) object, in bytes mode."""
while True:
data = file.read(self.block_size)
if not data:
break
await self.send(data, timeout=timeout)
- def _recv_data(self, stanza):
+ def _recv_data(self, stanza: Union[Message, Iq]):
new_seq = stanza['ibb_data']['seq']
if new_seq != (self.recv_seq + 1) % 65536:
self.close()
@@ -96,7 +159,8 @@ class IBBytestream(object):
raise socket.error
return self.recv_queue.get_nowait()
- def close(self, timeout=None):
+ def close(self, timeout: Optional[int] = None) -> asyncio.Future:
+ """Close the stream."""
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.peer_jid
@@ -109,7 +173,7 @@ class IBBytestream(object):
self.xmpp.event('ibb_stream_end', self)
return future
- def _closed(self, iq):
+ def _closed(self, iq: Iq):
self.stream_in_closed = True
self.stream_out_closed = True
iq.reply().send()
diff --git a/slixmpp/plugins/xep_0222.py b/slixmpp/plugins/xep_0222.py
index 6b612e14..6a371046 100644
--- a/slixmpp/plugins/xep_0222.py
+++ b/slixmpp/plugins/xep_0222.py
@@ -5,6 +5,7 @@
# See the file LICENSE for copying permission.
import logging
+from asyncio import Future
from typing import Optional, Callable, List
from slixmpp import JID
from slixmpp.xmlstream import register_stanza_plugin, ElementBase
@@ -28,9 +29,11 @@ class XEP_0222(BasePlugin):
profile = {'pubsub#persist_items': True,
'pubsub#send_last_published_item': 'never'}
- def configure(self, node, ifrom=None, callback=None, timeout=None):
+ def configure(self, node: str, **iqkwargs) -> Future:
"""
Update a node's configuration to match the public storage profile.
+
+ :param node: Node to set the configuration at.
"""
config = self.xmpp['xep_0004'].Form()
config['type'] = 'submit'
@@ -38,29 +41,26 @@ class XEP_0222(BasePlugin):
for field, value in self.profile.items():
config.add_field(var=field, value=value)
- return self.xmpp['xep_0060'].set_node_config(None, node, config,
- ifrom=ifrom,
- callback=callback,
- timeout=timeout)
+ return self.xmpp['xep_0060'].set_node_config(
+ jid=None, node=node, config=config, **iqkwargs
+ )
def store(self, stanza: ElementBase, node: Optional[str] = None,
- id: Optional[str] = None, ifrom: Optional[JID] = None,
- options: Optional[Form] = None,
- callback: Optional[Callable] = None,
- timeout: Optional[int] = None):
+ id: Optional[str] = None, **pubsubkwargs) -> Future:
"""
Store public data via PEP.
This is just a (very) thin wrapper around the XEP-0060 publish()
method to set the defaults expected by PEP.
- :param stanza: The private content to store.
+ :param stanza: The public content to store.
:param node: The node to publish the content to. If not specified,
the stanza's namespace will be used.
:param id: Optionally specify the ID of the item.
:param options: Publish options to use, which will be modified to
fit the persistent storage option profile.
"""
+ options = pubsubkwargs.pop('options', None)
if not options:
options = self.xmpp['xep_0004'].stanza.Form()
options['type'] = 'submit'
@@ -75,17 +75,12 @@ class XEP_0222(BasePlugin):
options.add_field(var=field)
options.get_fields()[field]['value'] = value
- return self.xmpp['xep_0163'].publish(stanza, node,
- options=options,
- ifrom=ifrom,
- callback=callback,
- timeout=timeout)
+ pubsubkwargs['options'] = options
+
+ return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs)
def retrieve(self, node: str, id: Optional[str] = None,
- item_ids: Optional[List[str]] = None,
- ifrom: Optional[JID] = None,
- callback: Optional[Callable] = None,
- timeout: Optional[int] = None):
+ item_ids: Optional[List[str]] = None, **iqkwargs) -> Future:
"""
Retrieve public data via PEP.
@@ -96,23 +91,17 @@ class XEP_0222(BasePlugin):
:param id: Optionally specify the ID of the item.
:param item_ids: Specify a group of IDs. If id is also specified, it
will be included in item_ids.
- :param ifrom: Specify the sender's JID.
- :param timeout: The length of time (in seconds) to wait for a response
- before exiting the send call if blocking is used.
- Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT
- :param callback: Optional reference to a stream handler function. Will
- be executed when a reply stanza is received.
"""
if item_ids is None:
item_ids = []
if id is not None:
item_ids.append(id)
- return self.xmpp['xep_0060'].get_items(None, node,
- item_ids=item_ids,
- ifrom=ifrom,
- callback=callback,
- timeout=timeout)
+ return self.xmpp['xep_0060'].get_items(
+ jid=None, node=node,
+ item_ids=item_ids,
+ **iqkwargs
+ )
register_plugin(XEP_0222)
diff --git a/slixmpp/plugins/xep_0223.py b/slixmpp/plugins/xep_0223.py
index 27437913..6ed39285 100644
--- a/slixmpp/plugins/xep_0223.py
+++ b/slixmpp/plugins/xep_0223.py
@@ -5,6 +5,7 @@
# See the file LICENSE for copying permission.
import logging
+from asyncio import Future
from typing import Optional, Callable, List
from slixmpp import JID
from slixmpp.xmlstream import register_stanza_plugin, ElementBase
@@ -28,28 +29,24 @@ class XEP_0223(BasePlugin):
profile = {'pubsub#persist_items': True,
'pubsub#access_model': 'whitelist'}
- def configure(self, node, ifrom=None, callback=None, timeout=None):
+ def configure(self, node: str, **iqkwargs) -> Future:
"""
- Update a node's configuration to match the public storage profile.
+ Update a node's configuration to match the private storage profile.
+
+ :param node: Node to set the configuration at.
"""
- # TODO: that cannot possibly work, why is this here?
config = self.xmpp['xep_0004'].Form()
config['type'] = 'submit'
for field, value in self.profile.items():
config.add_field(var=field, value=value)
- return self.xmpp['xep_0060'].set_node_config(None, node, config,
- ifrom=ifrom,
- callback=callback,
- timeout=timeout)
+ return self.xmpp['xep_0060'].set_node_config(
+ jid=None, node=node, config=config, **iqkwargs
+ )
def store(self, stanza: ElementBase, node: Optional[str] = None,
- id: Optional[str] = None, ifrom: Optional[JID] = None,
- options: Optional[Form] = None,
- callback: Optional[Callable] = None,
- timeout: Optional[int] = None,
- timeout_callback: Optional[Callable] = None):
+ id: Optional[str] = None, **pubsubkwargs) -> Future:
"""
Store private data via PEP.
@@ -63,6 +60,7 @@ class XEP_0223(BasePlugin):
:param options: Publish options to use, which will be modified to
fit the persistent storage option profile.
"""
+ options = pubsubkwargs.pop('options', None)
if not options:
options = self.xmpp['xep_0004'].stanza.Form()
options['type'] = 'submit'
@@ -77,17 +75,11 @@ class XEP_0223(BasePlugin):
options.add_field(var=field)
options.get_fields()[field]['value'] = value
- return self.xmpp['xep_0163'].publish(stanza, node, options=options,
- ifrom=ifrom, callback=callback,
- timeout=timeout,
- timeout_callback=timeout_callback)
+ pubsubkwargs['options'] = options
+ return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs)
def retrieve(self, node: str, id: Optional[str] = None,
- item_ids: Optional[List[str]] = None,
- ifrom: Optional[JID] = None,
- callback: Optional[Callable] = None,
- timeout: Optional[int] = None,
- timeout_callback: Optional[Callable] = None):
+ item_ids: Optional[List[str]] = None, **iqkwargs) -> Future:
"""
Retrieve private data via PEP.
@@ -98,22 +90,17 @@ class XEP_0223(BasePlugin):
:param id: Optionally specify the ID of the item.
:param item_ids: Specify a group of IDs. If id is also specified, it
will be included in item_ids.
- :param ifrom: Specify the sender's JID.
- :param timeout: The length of time (in seconds) to wait for a response
- before exiting the send call if blocking is used.
- Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT
- :param callback: Optional reference to a stream handler function. Will
- be executed when a reply stanza is received.
"""
if item_ids is None:
item_ids = []
if id is not None:
item_ids.append(id)
- return self.xmpp['xep_0060'].get_items(None, node,
- item_ids=item_ids, ifrom=ifrom,
- callback=callback, timeout=timeout,
- timeout_callback=timeout_callback)
+ return self.xmpp['xep_0060'].get_items(
+ jid=None, node=node,
+ item_ids=item_ids,
+ **iqkwargs
+ )
register_plugin(XEP_0223)
diff --git a/slixmpp/plugins/xep_0231/bob.py b/slixmpp/plugins/xep_0231/bob.py
index b7e990b3..e554c38c 100644
--- a/slixmpp/plugins/xep_0231/bob.py
+++ b/slixmpp/plugins/xep_0231/bob.py
@@ -1,4 +1,3 @@
-
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz,
# Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
@@ -7,7 +6,10 @@
import logging
import hashlib
-from slixmpp import future_wrapper
+from asyncio import Future
+from typing import Optional
+
+from slixmpp import future_wrapper, JID
from slixmpp.stanza import Iq, Message, Presence
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback
@@ -65,7 +67,20 @@ class XEP_0231(BasePlugin):
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('urn:xmpp:bob')
- def set_bob(self, data, mtype, cid=None, max_age=None):
+ def set_bob(self, data: bytes, mtype: str, cid: Optional[str] = None,
+ max_age: Optional[int] = None) -> str:
+ """Register a blob of binary data as a BOB.
+
+ .. versionchanged:: 1.8.0
+ If ``max_age`` is specified, the registered data will be destroyed
+ after that time.
+
+ :param data: Data to register.
+ :param mtype: Mime Type of the data (e.g. ``image/jpeg``).
+ :param cid: Content-ID (will be auto-generated if left out).
+ :param max_age: Duration of content availability.
+ :returns: The cid value.
+ """
if cid is None:
cid = 'sha1+%s@bob.xmpp.org' % hashlib.sha1(data).hexdigest()
@@ -76,12 +91,24 @@ class XEP_0231(BasePlugin):
bob['max_age'] = max_age
self.api['set_bob'](args=bob)
-
+ # Schedule destruction of the data
+ if max_age is not None and max_age > 0:
+ self.xmpp.loop.call_later(max_age, self.del_bob, cid)
return cid
@future_wrapper
- def get_bob(self, jid=None, cid=None, cached=True, ifrom=None,
- timeout=None, callback=None):
+ def get_bob(self, jid: Optional[JID] = None, cid: Optional[str] = None,
+ cached: bool = True, ifrom: Optional[JID] = None,
+ **iqkwargs) -> Future:
+ """Get a BOB.
+
+ .. versionchanged:: 1.8.0
+ Results not in cache do not raise an error when ``cached`` is True.
+
+ :param jid: JID to fetch the BOB from.
+ :param cid: Content ID (actually required).
+ :param cached: To fetch the BOB from the local cache first (from CID only)
+ """
if cached:
data = self.api['get_bob'](None, None, ifrom, args=cid)
if data is not None:
@@ -91,17 +118,14 @@ class XEP_0231(BasePlugin):
return iq
return data
- iq = self.xmpp.Iq()
- iq['to'] = jid
- iq['from'] = ifrom
- iq['type'] = 'get'
+ iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq['bob']['cid'] = cid
- return iq.send(timeout=timeout, callback=callback)
+ return iq.send(**iqkwargs)
- def del_bob(self, cid):
+ def del_bob(self, cid: str):
self.api['del_bob'](args=cid)
- def _handle_bob_iq(self, iq):
+ def _handle_bob_iq(self, iq: Iq):
cid = iq['bob']['cid']
if iq['type'] == 'result':
@@ -131,7 +155,6 @@ class XEP_0231(BasePlugin):
def _get_bob(self, jid, node, ifrom, cid):
if cid in self._cids:
return self._cids[cid]
- raise XMPPError('item-not-found')
def _del_bob(self, jid, node, ifrom, cid):
if cid in self._cids:
diff --git a/slixmpp/plugins/xep_0280/carbons.py b/slixmpp/plugins/xep_0280/carbons.py
index 6a35bf84..c67e3fd9 100644
--- a/slixmpp/plugins/xep_0280/carbons.py
+++ b/slixmpp/plugins/xep_0280/carbons.py
@@ -5,7 +5,10 @@
# See the file LICENSE for copying permissio
import logging
-import slixmpp
+from asyncio import Future
+from typing import Optional
+
+from slixmpp import JID
from slixmpp.stanza import Message, Iq
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@@ -21,6 +24,11 @@ class XEP_0280(BasePlugin):
"""
XEP-0280 Message Carbons
+
+ Events triggered by this plugin:
+
+ - :term:`carbon_received`
+ - :term:`carbon_sent`
"""
name = 'xep_0280'
@@ -57,28 +65,22 @@ class XEP_0280(BasePlugin):
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature('urn:xmpp:carbons:2')
- def _handle_carbon_received(self, msg):
+ def _handle_carbon_received(self, msg: Message):
if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_received', msg)
- def _handle_carbon_sent(self, msg):
+ def _handle_carbon_sent(self, msg: Message):
if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_sent', msg)
- def enable(self, ifrom=None, timeout=None, callback=None,
- timeout_callback=None):
- iq = self.xmpp.Iq()
- iq['type'] = 'set'
- iq['from'] = ifrom
+ def enable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
+ """Enable carbons."""
+ iq = self.xmpp.make_iq_set(ifrom=ifrom)
iq.enable('carbon_enable')
- return iq.send(timeout_callback=timeout_callback, timeout=timeout,
- callback=callback)
-
- def disable(self, ifrom=None, timeout=None, callback=None,
- timeout_callback=None):
- iq = self.xmpp.Iq()
- iq['type'] = 'set'
- iq['from'] = ifrom
+ return iq.send(**iqkwargs)
+
+ def disable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
+ """Disable carbons."""
+ iq = self.xmpp.make_iq_set(ifrom=ifrom)
iq.enable('carbon_disable')
- return iq.send(timeout_callback=timeout_callback, timeout=timeout,
- callback=callback)
+ return iq.send(**iqkwargs)
diff --git a/slixmpp/plugins/xep_0363/__init__.py b/slixmpp/plugins/xep_0363/__init__.py
index 0ed1d7c8..f693eb92 100644
--- a/slixmpp/plugins/xep_0363/__init__.py
+++ b/slixmpp/plugins/xep_0363/__init__.py
@@ -1,4 +1,3 @@
-
# slixmpp: The Slick XMPP Library
# Copyright (C) 2018 Emmanuel Gil Peyrot
# This file is part of slixmpp.
@@ -6,6 +5,12 @@
from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0363.stanza import Request, Slot, Put, Get, Header
-from slixmpp.plugins.xep_0363.http_upload import XEP_0363
+from slixmpp.plugins.xep_0363.http_upload import (
+ XEP_0363,
+ UploadServiceNotFound,
+ FileTooBig,
+ HTTPError,
+ FileUploadError,
+)
register_plugin(XEP_0363)
diff --git a/slixmpp/plugins/xep_0363/http_upload.py b/slixmpp/plugins/xep_0363/http_upload.py
index 04b066cd..bae3ee7d 100644
--- a/slixmpp/plugins/xep_0363/http_upload.py
+++ b/slixmpp/plugins/xep_0363/http_upload.py
@@ -1,18 +1,21 @@
-"""
- slixmpp: The Slick XMPP Library
- Copyright (C) 2018 Emmanuel Gil Peyrot
- This file is part of slixmpp.
-
- See the file LICENSE for copying permission.
-"""
+# slixmpp: The Slick XMPP Library
+# Copyright (C) 2018 Emmanuel Gil Peyrot
+# This file is part of slixmpp.
+# See the file LICENSE for copying permission.
import logging
import os.path
from aiohttp import ClientSession
+from asyncio import Future
from mimetypes import guess_type
+from typing import (
+ Optional,
+ IO,
+)
-from slixmpp import Iq, __version__
+from slixmpp import JID, __version__
+from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
@@ -25,19 +28,39 @@ class FileUploadError(Exception):
pass
class UploadServiceNotFound(FileUploadError):
- pass
+ """
+ Raised if no upload service can be found.
+ """
class FileTooBig(FileUploadError):
+ """
+ Raised if the file size is above advertised server limits.
+
+ args:
+
+ - size of the file
+ - max file size allowed
+ """
def __str__(self):
return 'File size too large: {} (max: {} bytes)' \
.format(self.args[0], self.args[1])
class HTTPError(FileUploadError):
+ """
+ Raised when we receive an HTTP error response during upload.
+
+ args:
+
+ - HTTP Error code
+ - Content of the HTTP response
+ """
def __str__(self):
return 'Could not upload file: %d (%s)' % (self.args[0], self.args[1])
class XEP_0363(BasePlugin):
- ''' This plugin only supports Python 3.5+ '''
+ """
+ XEP-0363: HTTP File Upload
+ """
name = 'xep_0363'
description = 'XEP-0363: HTTP File Upload'
@@ -62,9 +85,7 @@ class XEP_0363(BasePlugin):
self._handle_request))
def plugin_end(self):
- self._http_session.close()
self.xmpp.remove_handler('HTTP Upload Request')
- self.xmpp.remove_handler('HTTP Upload Slot')
self.xmpp['xep_0030'].del_feature(feature=Request.namespace)
def session_bind(self, jid):
@@ -73,9 +94,14 @@ class XEP_0363(BasePlugin):
def _handle_request(self, iq):
self.xmpp.event('http_upload_request', iq)
- async def find_upload_service(self, domain=None, timeout=None):
+ async def find_upload_service(self, domain: Optional[JID] = None, **iqkwargs) -> Optional[Iq]:
+ """Find an upload service on a domain (our own by default).
+
+ :param domain: Domain to disco to find a service.
+ """
results = await self.xmpp['xep_0030'].get_info_from_domain(
- domain=domain, timeout=timeout)
+ domain=domain, **iqkwargs
+ )
candidates = []
for info in results:
@@ -87,26 +113,49 @@ class XEP_0363(BasePlugin):
if feature == Request.namespace:
return info
- def request_slot(self, jid, filename, size, content_type=None, ifrom=None,
- timeout=None, callback=None, timeout_callback=None):
- iq = self.xmpp.Iq()
- iq['to'] = jid
- iq['from'] = ifrom
- iq['type'] = 'get'
+ def request_slot(self, jid: JID, filename: str, size: int,
+ content_type: Optional[str] = None, *,
+ ifrom: Optional[JID] = None, **iqkwargs) -> Future:
+ """Request an HTTP upload slot from a service.
+
+ :param jid: Service to request the slot from.
+ :param filename: Name of the file that will be uploaded.
+ :param size: size of the file in bytes.
+ :param content_type: Type of the file that will be uploaded.
+ """
+ iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
request = iq['http_upload_request']
request['filename'] = filename
request['size'] = str(size)
request['content-type'] = content_type or self.default_content_type
- return iq.send(timeout=timeout, callback=callback,
- timeout_callback=timeout_callback)
-
- async def upload_file(self, filename, size=None, content_type=None, *,
- input_file=None, ifrom=None, domain=None, timeout=None,
- callback=None, timeout_callback=None):
- ''' Helper function which does all of the uploading process. '''
+ return iq.send(**iqkwargs)
+
+ async def upload_file(self, filename: str, size: Optional[int] = None,
+ content_type: Optional[str] = None, *,
+ input_file: Optional[IO[bytes]]=None,
+ domain: Optional[JID] = None,
+ **iqkwargs) -> str:
+ '''Helper function which does all of the uploading discovery and
+ process.
+
+ :param filename: Path to the file to upload (or only the name if
+ ``input_file`` is provided.
+ :param size: size of the file in bytes.
+ :param content_type: Type of the file that will be uploaded.
+ :param input_file: Binary file stream on the file.
+ :param domain: Domain to query to find an HTTP upload service.
+ :raises .UploadServiceNotFound: If slixmpp is unable to find an
+ an available upload service.
+ :raises .FileTooBig: If the filesize is above what is accepted by
+ the service.
+ :raises .HTTPError: If there is an error in the HTTP operation.
+ :returns: The URL of the uploaded file.
+ '''
+ timeout = iqkwargs.get('timeout', None)
if self.upload_service is None:
info_iq = await self.find_upload_service(
- domain=domain, timeout=timeout)
+ domain=domain, **iqkwargs
+ )
if info_iq is None:
raise UploadServiceNotFound()
self.upload_service = info_iq['from']
@@ -137,9 +186,7 @@ class XEP_0363(BasePlugin):
basename = os.path.basename(filename)
slot_iq = await self.request_slot(self.upload_service, basename, size,
- content_type, ifrom, timeout,
- callback=callback,
- timeout_callback=timeout_callback)
+ content_type, **iqkwargs)
slot = slot_iq['http_upload_slot']
headers = {
diff --git a/tests/test_stream_xep_0047.py b/tests/test_stream_xep_0047.py
index f7276c0f..53225df5 100644
--- a/tests/test_stream_xep_0047.py
+++ b/tests/test_stream_xep_0047.py
@@ -14,7 +14,7 @@ class TestInBandByteStreams(SlixTest):
def tearDown(self):
self.stream_close()
- def testOpenStream(self):
+ async def testOpenStream(self):
"""Test requesting a stream, successfully"""
events = []
@@ -25,8 +25,8 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
- self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
- sid='testing')
+ await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
+ sid='testing')
self.send("""
<iq type="set" to="tester@localhost/receiver" id="1">
@@ -45,7 +45,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, ['ibb_stream_start'])
- def testAysncOpenStream(self):
+ async def testAysncOpenStream(self):
"""Test requesting a stream, aysnc"""
events = set()
@@ -58,9 +58,9 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
- self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
- sid='testing',
- callback=stream_callback)
+ await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
+ sid='testing',
+ callback=stream_callback)
self.send("""
<iq type="set" to="tester@localhost/receiver" id="1">