1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import asyncio
import unittest
from slixmpp import JID
from slixmpp.test.integration import SlixIntegration
class TestUserAvatar(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.register_plugins(['xep_0084', 'xep_0115'])
self.data = b'coucou coucou'
await self.connect_clients()
await asyncio.gather(
self.clients[0]['xep_0115'].update_caps(),
)
async def _clear_avatar(self):
"""Utility for purging remote state"""
await self.clients[0]['xep_0084'].stop()
await self.clients[0]['xep_0084'].publish_avatar(b'')
async def test_set_avatar(self):
"""Check we can set and get a PEP avatar and metadata"""
await self._clear_avatar()
await self.clients[0]['xep_0084'].publish_avatar(
self.data
)
metadata = {
'id': self.clients[0]['xep_0084'].generate_id(self.data),
'bytes': 13,
'type': 'image/jpeg',
}
# Wait for metadata publish event
event = self.clients[0].wait_until('avatar_metadata_publish')
publish = self.clients[0]['xep_0084'].publish_avatar_metadata(
metadata,
)
res = await asyncio.gather(
event,
publish,
)
message = res[0]
recv_meta = message['pubsub_event']['items']['item']['avatar_metadata']
info = recv_meta['info']
self.assertEqual(info['bytes'], metadata['bytes'])
self.assertEqual(info['type'], metadata['type'])
self.assertEqual(info['id'], metadata['id'])
recv = await self.clients[0]['xep_0084'].retrieve_avatar(
JID(self.clients[0].boundjid.bare),
info['id']
)
avatar = recv['pubsub']['items']['item']['avatar_data']['value']
self.assertEqual(avatar, self.data)
await self._clear_avatar()
suite = unittest.TestLoader().loadTestsFromTestCase(TestUserAvatar)
|