import sys import time import threading from sleekxmpp.test import * from sleekxmpp.stanza.atom import AtomEntry from sleekxmpp.xmlstream import register_stanza_plugin class TestStreamPubsub(SleekTest): """ Test using the XEP-0030 plugin. """ def setUp(self): self.stream_start() def tearDown(self): self.stream_close() def testCreateInstantNode(self): """Test creating an instant node""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', None)) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeNoConfig(self): """Test creating a node without a config""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings')) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeConfig(self): """Test creating a node with a config""" form = self.xmpp['xep_0004'].stanza.Form() form['type'] = 'submit' form.add_field(var='pubsub#access_model', value='whitelist') t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', 'princely_musings'), kwargs={'config': form}) t.start() self.send(""" whitelist http://jabber.org/protocol/pubsub#node_config """) self.recv(""" """) t.join() def testDeleteNode(self): """Test deleting a node""" t = threading.Thread(name='delete_node', target=self.xmpp['xep_0060'].delete_node, args=('pubsub.example.com', 'some_node')) t.start() self.send(""" """) self.recv(""" """) t.join() def testSubscribe(self): """Test subscribing to a node""" def run_test(jid, bare, ifrom, send, recv): t = threading.Thread(name='subscribe', target=self.xmpp['xep_0060'].subscribe, args=('pubsub.example.com', 'some_node'), kwargs={'subscribee': jid, 'bare': bare, 'ifrom': ifrom}) t.start() self.send(send) self.recv(recv) t.join() # Case 1: No subscribee, default 'from' JID, bare JID run_test(None, True, None, """ """, """ """) # Case 2: No subscribee, given 'from' JID, bare JID run_test(None, True, 'foo@comp.example.com/bar', """ """, """ """) # Case 3: No subscribee, given 'from' JID, full JID run_test(None, False, 'foo@comp.example.com/bar', """ """, """ """) # Case 4: Subscribee run_test('user@example.com/foo', True, 'foo@comp.example.com/bar', """ """, """ """) def testSubscribeWithOptions(self): pass def testUnubscribe(self): """Test unsubscribing from a node""" def run_test(jid, bare, ifrom, send, recv): t = threading.Thread(name='unsubscribe', target=self.xmpp['xep_0060'].unsubscribe, args=('pubsub.example.com', 'some_node'), kwargs={'subscribee': jid, 'bare': bare, 'ifrom': ifrom}) t.start() self.send(send) self.recv(recv) t.join() # Case 1: No subscribee, default 'from' JID, bare JID run_test(None, True, None, """ """, """ """) # Case 2: No subscribee, given 'from' JID, bare JID run_test(None, True, 'foo@comp.example.com/bar', """ """, """ """) # Case 3: No subscribee, given 'from' JID, full JID run_test(None, False, 'foo@comp.example.com/bar', """ """, """ """) # Case 4: Subscribee run_test('user@example.com/foo', True, 'foo@comp.example.com/bar', """ """, """ """) def testGetDefaultConfig(self): """Test retrieving the default node configuration.""" t = threading.Thread(name='default_config', target=self.xmpp['xep_0060'].get_node_config, args=('pubsub.example.com',)) t.start() self.send(""" """, use_values=False) self.recv(""" """) t.join() def testGetDefaultNodeConfig(self): """Test retrieving the default node config for a pubsub service.""" t = threading.Thread(name='default_config', target=self.xmpp['xep_0060'].get_node_config, args=('pubsub.example.com', None)) t.start() self.send(""" """, use_values=False) self.recv(""" """) t.join() def testGetNodeConfig(self): """Test getting the config for a given node.""" t = threading.Thread(name='node_config', target=self.xmpp['xep_0060'].get_node_config, args=('pubsub.example.com', 'somenode')) t.start() self.send(""" """, use_values=False) self.recv(""" """) t.join() def testSetNodeConfig(self): """Test setting the configuration for a node.""" form = self.xmpp['xep_0004'].make_form() form.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#node_config') form.add_field(var='pubsub#title', ftype='text-single', value='This is awesome!') form['type'] = 'submit' t = threading.Thread(name='set_config', target=self.xmpp['xep_0060'].set_node_config, args=('pubsub.example.com', 'somenode', form)) t.start() self.send(""" http://jabber.org/protocol/pubsub#node_config This is awesome! """) self.recv(""" """) t.join() def testPublishSingle(self): """Test publishing a single item.""" payload = AtomEntry() payload['title'] = 'Test' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) t = threading.Thread(name='publish_single', target=self.xmpp['xep_0060'].publish, args=('pubsub.example.com', 'somenode'), kwargs={'item_id': 'ID42', 'payload': payload}) t.start() self.send(""" Test """) self.recv(""" """) t.join() def testPublishSingleOptions(self): """Test publishing a single item, with options.""" payload = AtomEntry() payload['title'] = 'Test' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) options = self.xmpp['xep_0004'].make_form() options.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#publish-options') options.add_field(var='pubsub#access_model', ftype='text-single', value='presence') options['type'] = 'submit' t = threading.Thread(name='publish_single_options', target=self.xmpp['xep_0060'].publish, args=('pubsub.example.com', 'somenode'), kwargs={'item_id': 'ID42', 'payload': payload, 'options': options}) t.start() self.send(""" Test http://jabber.org/protocol/pubsub#publish-options presence """, use_values=False) self.recv(""" """) t.join() def testPublishMulti(self): """Test publishing multiple items.""" payload1 = AtomEntry() payload1['title'] = 'Test 1' payload2 = AtomEntry() payload2['title'] = 'Test 2' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) t = threading.Thread(name='publish_multi', target=self.xmpp['xep_0060'].publish, args=('pubsub.example.com', 'somenode'), kwargs={'items': [('ID1', payload1), ('ID2', payload2)]}) t.start() self.send(""" Test 1 Test 2 """, use_values=False) self.recv(""" """) t.join() def testPublishMultiOptions(self): """Test publishing multiple items, with options.""" payload1 = AtomEntry() payload1['title'] = 'Test 1' payload2 = AtomEntry() payload2['title'] = 'Test 2' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) options = self.xmpp['xep_0004'].make_form() options.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#publish-options') options.add_field(var='pubsub#access_model', ftype='text-single', value='presence') options['type'] = 'submit' t = threading.Thread(name='publish_multi_options', target=self.xmpp['xep_0060'].publish, args=('pubsub.example.com', 'somenode'), kwargs={'items': [('ID1', payload1), ('ID2', payload2)], 'options': options}) t.start() self.send(""" Test 1 Test 2 http://jabber.org/protocol/pubsub#publish-options presence """, use_values=False) self.recv(""" """) t.join() def testRetract(self): """Test deleting an item.""" t = threading.Thread(name='retract', target=self.xmpp['xep_0060'].retract, args=('pubsub.example.com', 'somenode', 'ID1')) t.start() self.send(""" """, use_values=False) self.recv(""" """) t.join() def testPurge(self): """Test removing all items from a node.""" t = threading.Thread(name='purge', target=self.xmpp['xep_0060'].purge, args=('pubsub.example.com', 'somenode')) t.start() self.send(""" """, use_values=False) self.recv(""" """) t.join() def testGetItem(self): """Test retrieving a single item.""" pass def testGetLatestItems(self): """Test retrieving the most recent N items.""" pass def testGetAllItems(self): """Test retrieving all items.""" pass def testGetSpecificItems(self): """Test retrieving a specific set of items.""" pass suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)