1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
import sys
import sleekxmpp
from sleekxmpp.exceptions import XMPPError
from sleekxmpp.test import *
class TestStreamExceptions(SleekTest):
"""
Test handling roster updates.
"""
def tearDown(self):
self.stream_close()
def testXMPPErrorException(self):
"""Test raising an XMPPError exception."""
def message(msg):
raise XMPPError(condition='feature-not-implemented',
text="We don't do things that way here.",
etype='cancel',
extension='foo',
extension_ns='foo:error',
extension_args={'test': 'true'})
self.stream_start()
self.xmpp.add_event_handler('message', message)
self.stream_recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
self.stream_send_message("""
<message type="error">
<error type="cancel">
<feature-not-implemented
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
We don't do things that way here.
</text>
<foo xmlns="foo:error" test="true" />
</error>
</message>
""", use_values=False)
def testThreadedXMPPErrorException(self):
"""Test raising an XMPPError exception in a threaded handler."""
def message(msg):
raise XMPPError(condition='feature-not-implemented',
text="We don't do things that way here.",
etype='cancel')
self.stream_start()
self.xmpp.add_event_handler('message', message,
threaded=True)
self.stream_recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
self.stream_send_message("""
<message type="error">
<error type="cancel">
<feature-not-implemented
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
We don't do things that way here.
</text>
</error>
</message>
""")
def testUnknownException(self):
"""Test raising an generic exception in a threaded handler."""
def message(msg):
raise ValueError("Did something wrong")
self.stream_start()
self.xmpp.add_event_handler('message', message)
self.stream_recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
if sys.version_info < (3, 0):
self.stream_send_message("""
<message type="error">
<error type="cancel">
<undefined-condition
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
SleekXMPP got into trouble.
</text>
</error>
</message>
""")
else:
# Unfortunately, tracebacks do not make for very portable tests.
pass
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamExceptions)
|