1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
import asyncio
import unittest
from uuid import uuid4
from slixmpp import JID
from slixmpp.test.integration import SlixIntegration
UNIQUE = uuid4().hex
class TestMUC(SlixIntegration):
async def asyncSetUp(self):
self.mucserver = self.envjid('CI_MUC_SERVER')
self.muc = JID('%s@%s' % (UNIQUE, self.mucserver))
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(['xep_0045'])
await self.connect_clients()
async def test_initial_join(self):
"""Check that we can connect to a new muc"""
self.clients[0]['xep_0045'].join_muc(self.muc, 'client1')
presence = await self.clients[0].wait_until('muc::%s::got_online' % self.muc)
self.assertEqual(presence['muc']['affiliation'], 'owner')
async def test_setup_muc(self):
"""Check that sending the initial room config and affiliation list works"""
self.clients[0]['xep_0045'].join_muc(self.muc, 'client1')
presence = await self.clients[0].wait_until('muc::%s::got_online' % self.muc)
self.assertEqual(presence['muc']['affiliation'], 'owner')
# Send initial configuration
config = await self.clients[0]['xep_0045'].get_room_config(self.muc)
values = config.get_values()
values['muc#roomconfig_persistentroom'] = False
values['muc#roomconfig_membersonly'] = True
config['values'] = values
config.reply()
config = await self.clients[0]['xep_0045'].set_room_config(self.muc, config)
# Send affiliation list including client 2
await self.clients[0]['xep_0045'].send_affiliation_list(
self.muc,
[
(self.clients[1].boundjid.bare, 'member'),
],
)
async def test_join_after_config(self):
"""Join a room after being added to the affiliation list"""
await self.test_setup_muc()
self.clients[1]['xep_0045'].join_muc(self.muc, 'client2')
await self.clients[1].wait_until('muc::%s::got_online' % self.muc)
async def test_leave(self):
"""Check that we leave properly"""
await self.test_join_after_config()
self.clients[0]['xep_0045'].leave_muc(self.muc, 'client1', 'boooring')
pres = await self.clients[1].wait_until('muc::%s::got_offline' % self.muc)
self.assertEqual(pres['status'], 'boooring')
self.assertEqual(pres['type'], 'unavailable')
async def test_kick(self):
"""Test kicking a user"""
await self.test_join_after_config()
await asyncio.gather(
self.clients[0].wait_until('muc::%s::got_offline' % self.muc),
self.clients[0]['xep_0045'].set_role(self.muc, 'client2', 'none')
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMUC)
|