summaryrefslogtreecommitdiff
path: root/tests/test_stream_presence.py
blob: 135d8eed00872672327425a6e0d8f5cbe8561910 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import time
from sleekxmpp.test import *


class TestStreamPresence(SleekTest):
    """
    Test handling roster updates.
    """

    def tearDown(self):
        self.stream_close()

    def testInitialUnavailablePresences(self):
        """
        Test receiving unavailable presences from JIDs that
        are not online.
        """
        events = set()

        def got_offline(presence):
            # The got_offline event should not be triggered.
            events.add('got_offline')

        def unavailable(presence):
            # The presence_unavailable event should be triggered.
            events.add('unavailable')

        self.stream_start()
        self.xmpp.add_event_handler('got_offline', got_offline)
        self.xmpp.add_event_handler('presence_unavailable', unavailable)

        self.stream_recv("""
          <presence type="unavailable" from="otheruser@localhost" />
        """)

        # Give event queue time to process.
        time.sleep(0.1)

        self.assertEqual(events, set(('unavailable',)),
                "Got offline incorrectly triggered: %s." % events)

    def testGotOffline(self):
        """Test that got_offline is triggered properly."""
        events = []
        def got_offline(presence):
            events.append('got_offline')

        self.stream_start()
        self.xmpp.add_event_handler('got_offline', got_offline)

        # Setup roster. Use a 'set' instead of 'result' so we
        # don't have to handle get_roster() blocking.
        #
        # We use the stream to initialize the roster to make
        # the test independent of the roster implementation.
        self.stream_recv("""
          <iq type="set">
            <query xmlns="jabber:iq:roster">
              <item jid="otheruser@localhost"
                    name="Other User"
                    subscription="both">
                <group>Testers</group>
              </item>
            </query>
          </iq>
        """)

        # Contact comes online.
        self.stream_recv("""
          <presence from="otheruser@localhost/foobar" />
        """)

        # Contact goes offline, should trigger got_offline.
        self.stream_recv("""
          <presence from="otheruser@localhost/foobar"
                    type="unavailable" />
        """)

        # Give event queue time to process.
        time.sleep(0.1)

        self.assertEqual(events, ['got_offline'],
                "Got offline incorrectly triggered: %s" % events)


suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPresence)