blob: 2a81480515d0ccc328326346d22c62a791aac675 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import threading
import time
from sleekxmpp.test import *
class TestStreamChatStates(SleekTest):
def tearDown(self):
self.stream_close()
def testChatStates(self):
self.stream_start(mode='client', plugins=['xep_0030', 'xep_0085'])
results = []
def handle_state(msg):
results.append(msg['chat_state'])
self.xmpp.add_event_handler('chatstate_active', handle_state)
self.xmpp.add_event_handler('chatstate_inactive', handle_state)
self.xmpp.add_event_handler('chatstate_paused', handle_state)
self.xmpp.add_event_handler('chatstate_gone', handle_state)
self.xmpp.add_event_handler('chatstate_composing', handle_state)
self.recv("""
<message>
<active xmlns="http://jabber.org/protocol/chatstates" />
</message>
""")
self.recv("""
<message>
<inactive xmlns="http://jabber.org/protocol/chatstates" />
</message>
""")
self.recv("""
<message>
<paused xmlns="http://jabber.org/protocol/chatstates" />
</message>
""")
self.recv("""
<message>
<composing xmlns="http://jabber.org/protocol/chatstates" />
</message>
""")
self.recv("""
<message>
<gone xmlns="http://jabber.org/protocol/chatstates" />
</message>
""")
# Give event queue time to process
time.sleep(0.3)
expected = ['active', 'inactive', 'paused', 'composing', 'gone']
self.failUnless(results == expected,
"Chat state event not handled: %s" % results)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamChatStates)
|