1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
import sys
import sleekxmpp
from sleekxmpp.exceptions import XMPPError
from sleekxmpp.test import *
class TestStreamExceptions(SleekTest):
"""
Test handling roster updates.
"""
def tearDown(self):
sys.excepthook = sys.__excepthook__
self.stream_close()
def testXMPPErrorException(self):
"""Test raising an XMPPError exception."""
def message(msg):
raise XMPPError(condition='feature-not-implemented',
text="We don't do things that way here.",
etype='cancel',
extension='foo',
extension_ns='foo:error',
extension_args={'test': 'true'})
self.stream_start()
self.xmpp.add_event_handler('message', message)
self.recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
self.send("""
<message type="error">
<error type="cancel">
<feature-not-implemented
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
We don't do things that way here.
</text>
<foo xmlns="foo:error" test="true" />
</error>
</message>
""", use_values=False)
def testThreadedXMPPErrorException(self):
"""Test raising an XMPPError exception in a threaded handler."""
def message(msg):
raise XMPPError(condition='feature-not-implemented',
text="We don't do things that way here.",
etype='cancel')
self.stream_start()
self.xmpp.add_event_handler('message', message,
threaded=True)
self.recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
self.send("""
<message type="error">
<error type="cancel">
<feature-not-implemented
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
We don't do things that way here.
</text>
</error>
</message>
""")
def testUnknownException(self):
"""Test raising an generic exception in a threaded handler."""
raised_errors = []
def message(msg):
raise ValueError("Did something wrong")
def catch_error(*args, **kwargs):
raised_errors.append(True)
sys.excepthook = catch_error
self.stream_start()
self.xmpp.add_event_handler('message', message)
self.recv("""
<message>
<body>This is going to cause an error.</body>
</message>
""")
self.send("""
<message type="error">
<error type="cancel">
<undefined-condition
xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
<text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
SleekXMPP got into trouble.
</text>
</error>
</message>
""")
self.assertEqual(raised_errors, [True], "Exception was not raised: %s" % raised_errors)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamExceptions)
|