import sys import time import threading from sleekxmpp.test import * class TestStreamPubsub(SleekTest): """ Test using the XEP-0030 plugin. """ def setUp(self): self.stream_start() def tearDown(self): self.stream_close() def testCreateInstantNode(self): """Test creating an instant node""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', None)) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeNoConfig(self): """Test creating a node without a config""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings')) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeConfig(self): """Test creating a node with a config""" form = self.xmpp['xep_0004'].stanza.Form() form['type'] = 'submit' form.add_field(var='pubsub#access_model', value='whitelist') t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings'), kwargs={'config': form}) t.start() self.send(""" whitelist http://jabber.org/protocol/pubsub#node_config """) self.recv(""" """) t.join() def testDeleteNode(self): """Test deleting a node""" t = threading.Thread(name='delete_node', target=self.xmpp['xep_0060'].delete_node, args=('pubsub.example.com', 'some_node')) t.start() self.send(""" """) self.recv(""" """) t.join() def testSubscribe(self): """Test subscribing to a node""" def run_test(jid, bare, ifrom, send, recv): t = threading.Thread(name='subscribe', target=self.xmpp['xep_0060'].subscribe, args=('pubsub.example.com', 'some_node'), kwargs={'subscribee': jid, 'bare': bare, 'ifrom': ifrom}) t.start() self.send(send) self.recv(recv) t.join() # Case 1: No subscribee, default 'from' JID, bare JID run_test(None, True, None, """ """, """ """) # Case 2: No subscribee, given 'from' JID, bare JID run_test(None, True, 'foo@comp.example.com/bar', """ """, """ """) # Case 3: No subscribee, given 'from' JID, full JID run_test(None, False, 'foo@comp.example.com/bar', """ """, """ """) # Case 4: Subscribee run_test('user@example.com/foo', True, 'foo@comp.example.com/bar', """ """, """ """) def testSubscribeWithOptions(self): pass def testUnubscribe(self): """Test unsubscribing from a node""" def run_test(jid, bare, ifrom, send, recv): t = threading.Thread(name='unsubscribe', target=self.xmpp['xep_0060'].unsubscribe, args=('pubsub.example.com', 'some_node'), kwargs={'subscribee': jid, 'bare': bare, 'ifrom': ifrom}) t.start() self.send(send) self.recv(recv) t.join() # Case 1: No subscribee, default 'from' JID, bare JID run_test(None, True, None, """ """, """ """) # Case 2: No subscribee, given 'from' JID, bare JID run_test(None, True, 'foo@comp.example.com/bar', """ """, """ """) # Case 3: No subscribee, given 'from' JID, full JID run_test(None, False, 'foo@comp.example.com/bar', """ """, """ """) # Case 4: Subscribee run_test('user@example.com/foo', True, 'foo@comp.example.com/bar', """ """, """ """) suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)