import sys
import time
import threading
from sleekxmpp.test import *
class TestStreamPubsub(SleekTest):
"""
Test using the XEP-0030 plugin.
"""
def setUp(self):
self.stream_start()
def tearDown(self):
self.stream_close()
def testCreateInstantNode(self):
"""Test creating an instant node"""
t = threading.Thread(name='create_node',
target=self.xmpp['xep_0060'].create_node,
args=('pubsub.example.com', None))
t.start()
self.send("""
""")
self.recv("""
""")
t.join()
def testCreateNodeNoConfig(self):
"""Test creating a node without a config"""
t = threading.Thread(name='create_node',
target=self.xmpp['xep_0060'].create_node,
args=('pubsub.example.com', 'princely_musings'))
t.start()
self.send("""
""")
self.recv("""
""")
t.join()
def testCreateNodeConfig(self):
"""Test creating a node with a config"""
form = self.xmpp['xep_0004'].stanza.Form()
form['type'] = 'submit'
form.add_field(var='pubsub#access_model', value='whitelist')
t = threading.Thread(name='create_node',
target=self.xmpp['xep_0060'].create_node,
args=('pubsub.example.com', 'princely_musings'),
kwargs={'config': form})
t.start()
self.send("""
whitelist
http://jabber.org/protocol/pubsub#node_config
""")
self.recv("""
""")
t.join()
def testDeleteNode(self):
"""Test deleting a node"""
t = threading.Thread(name='delete_node',
target=self.xmpp['xep_0060'].delete_node,
args=('pubsub.example.com', 'some_node'))
t.start()
self.send("""
""")
self.recv("""
""")
t.join()
def testSubscribe(self):
"""Test subscribing to a node"""
def run_test(jid, bare, ifrom, send, recv):
t = threading.Thread(name='subscribe',
target=self.xmpp['xep_0060'].subscribe,
args=('pubsub.example.com', 'some_node'),
kwargs={'subscribee': jid,
'bare': bare,
'ifrom': ifrom})
t.start()
self.send(send)
self.recv(recv)
t.join()
# Case 1: No subscribee, default 'from' JID, bare JID
run_test(None, True, None,
"""
""",
"""
""")
# Case 2: No subscribee, given 'from' JID, bare JID
run_test(None, True, 'foo@comp.example.com/bar',
"""
""",
"""
""")
# Case 3: No subscribee, given 'from' JID, full JID
run_test(None, False, 'foo@comp.example.com/bar',
"""
""",
"""
""")
# Case 4: Subscribee
run_test('user@example.com/foo', True, 'foo@comp.example.com/bar',
"""
""",
"""
""")
def testSubscribeWithOptions(self):
pass
def testUnubscribe(self):
"""Test unsubscribing from a node"""
def run_test(jid, bare, ifrom, send, recv):
t = threading.Thread(name='unsubscribe',
target=self.xmpp['xep_0060'].unsubscribe,
args=('pubsub.example.com', 'some_node'),
kwargs={'subscribee': jid,
'bare': bare,
'ifrom': ifrom})
t.start()
self.send(send)
self.recv(recv)
t.join()
# Case 1: No subscribee, default 'from' JID, bare JID
run_test(None, True, None,
"""
""",
"""
""")
# Case 2: No subscribee, given 'from' JID, bare JID
run_test(None, True, 'foo@comp.example.com/bar',
"""
""",
"""
""")
# Case 3: No subscribee, given 'from' JID, full JID
run_test(None, False, 'foo@comp.example.com/bar',
"""
""",
"""
""")
# Case 4: Subscribee
run_test('user@example.com/foo', True, 'foo@comp.example.com/bar',
"""
""",
"""
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)