import sys import time import threading from sleekxmpp.test import * class TestStreamPubsub(SleekTest): """ Test using the XEP-0030 plugin. """ def setUp(self): self.stream_start() def tearDown(self): self.stream_close() def testCreateInstantNode(self): """Test creating an instant node""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', None)) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeNoConfig(self): """Test creating a node without a config""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings')) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeConfig(self): """Test creating a node with a config""" form = self.xmpp['xep_0004'].stanza.Form() form['type'] = 'submit' form.add_field(var='pubsub#access_model', value='whitelist') t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings'), kwargs={'config': form}) t.start() self.send(""" whitelist http://jabber.org/protocol/pubsub#node_config """) self.recv(""" """) t.join() def testDeleteNode(self): """Test deleting a node""" t = threading.Thread(name='delete_node', target=self.xmpp['xep_0060'].delete_node, args=('pubsub.example.com', 'some_node')) t.start() self.send(""" """) self.recv(""" """) t.join() suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)