import sys import time import threading from sleekxmpp.test import * from sleekxmpp.stanza.atom import AtomEntry from sleekxmpp.xmlstream import register_stanza_plugin class TestStreamPubsub(SleekTest): """ Test using the XEP-0030 plugin. """ def setUp(self): self.stream_start() def tearDown(self): self.stream_close() def testCreateInstantNode(self): """Test creating an instant node""" t = threading.Thread(name='create_node', target=self.xmpp['xep_0060'].create_node, args=('pubsub.example.com', None)) t.start() self.send(""" """) self.recv(""" """) t.join() def testCreateNodeNoConfig(self): """Test creating a node without a config""" self.xmpp['xep_0060'].create_node( 'pubsub.example.com', 'princely_musings', block=False) self.send(""" """) def testCreateNodeConfig(self): """Test creating a node with a config""" form = self.xmpp['xep_0004'].stanza.Form() form['type'] = 'submit' form.add_field(var='pubsub#access_model', value='whitelist') self.xmpp['xep_0060'].create_node( 'pubsub.example.com', 'princely_musings', config=form, block=False) self.send(""" whitelist http://jabber.org/protocol/pubsub#node_config """) def testDeleteNode(self): """Test deleting a node""" self.xmpp['xep_0060'].delete_node( 'pubsub.example.com', 'some_node', block=False) self.send(""" """) def testSubscribeCase1(self): """ Test subscribing to a node: Case 1: No subscribee, default 'from' JID, bare JID """ self.xmpp['xep_0060'].subscribe( 'pubsub.example.com', 'somenode', block=False) self.send(""" """) def testSubscribeCase2(self): """ Test subscribing to a node: Case 2: No subscribee, given 'from' JID, bare JID """ self.xmpp['xep_0060'].subscribe( 'pubsub.example.com', 'somenode', ifrom='foo@comp.example.com/bar', block=False) self.send(""" """) def testSubscribeCase3(self): """ Test subscribing to a node: Case 3: No subscribee, given 'from' JID, full JID """ self.xmpp['xep_0060'].subscribe( 'pubsub.example.com', 'somenode', ifrom='foo@comp.example.com/bar', bare=False, block=False) self.send(""" """) def testSubscribeCase4(self): """ Test subscribing to a node: Case 4: No subscribee, no 'from' JID, full JID """ self.stream_close() self.stream_start(jid='tester@localhost/full') self.xmpp['xep_0060'].subscribe( 'pubsub.example.com', 'somenode', bare=False, block=False) self.send(""" """) def testSubscribeCase5(self): """ Test subscribing to a node: Case 5: Subscribee given """ self.xmpp['xep_0060'].subscribe( 'pubsub.example.com', 'somenode', subscribee='user@example.com/foo', ifrom='foo@comp.example.com/bar', block=False) self.send(""" """) def testSubscribeWithOptions(self): pass def testUnsubscribeCase1(self): """ Test unsubscribing from a node: Case 1: No subscribee, default 'from' JID, bare JID """ self.xmpp['xep_0060'].unsubscribe( 'pubsub.example.com', 'somenode', block=False) self.send(""" """) def testUnsubscribeCase2(self): """ Test unsubscribing from a node: Case 2: No subscribee, given 'from' JID, bare JID """ self.xmpp['xep_0060'].unsubscribe( 'pubsub.example.com', 'somenode', ifrom='foo@comp.example.com/bar', block=False) self.send(""" """) def testUnsubscribeCase3(self): """ Test unsubscribing from a node: Case 3: No subscribee, given 'from' JID, full JID """ self.xmpp['xep_0060'].unsubscribe( 'pubsub.example.com', 'somenode', ifrom='foo@comp.example.com/bar', bare=False, block=False) self.send(""" """) def testUnsubscribeCase4(self): """ Test unsubscribing from a node: Case 4: No subscribee, no 'from' JID, full JID """ self.stream_close() self.stream_start(jid='tester@localhost/full') self.xmpp['xep_0060'].unsubscribe( 'pubsub.example.com', 'somenode', bare=False, block=False) self.send(""" """) def testUnsubscribeCase5(self): """ Test unsubscribing from a node: Case 5: Subscribee given """ self.xmpp['xep_0060'].unsubscribe( 'pubsub.example.com', 'somenode', subscribee='user@example.com/foo', ifrom='foo@comp.example.com/bar', block=False) self.send(""" """) def testGetDefaultNodeConfig(self): """Test retrieving the default node config for a pubsub service.""" self.xmpp['xep_0060'].get_node_config( 'pubsub.example.com', block=False) self.send(""" """, use_values=False) def testGetNodeConfig(self): """Test getting the config for a given node.""" self.xmpp['xep_0060'].get_node_config( 'pubsub.example.com', 'somenode', block=False) self.send(""" """, use_values=False) def testSetNodeConfig(self): """Test setting the configuration for a node.""" form = self.xmpp['xep_0004'].make_form() form.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#node_config') form.add_field(var='pubsub#title', ftype='text-single', value='This is awesome!') form['type'] = 'submit' self.xmpp['xep_0060'].set_node_config( 'pubsub.example.com', 'somenode', form, block=False) self.send(""" http://jabber.org/protocol/pubsub#node_config This is awesome! """) def testPublishSingle(self): """Test publishing a single item.""" payload = AtomEntry() payload['title'] = 'Test' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) self.xmpp['xep_0060'].publish( 'pubsub.example.com', 'somenode', item_id='ID42', payload=payload, block=False) self.send(""" Test """) def testPublishSingleOptions(self): """Test publishing a single item, with options.""" payload = AtomEntry() payload['title'] = 'Test' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) options = self.xmpp['xep_0004'].make_form() options.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#publish-options') options.add_field(var='pubsub#access_model', ftype='text-single', value='presence') options['type'] = 'submit' self.xmpp['xep_0060'].publish( 'pubsub.example.com', 'somenode', item_id='ID42', payload=payload, options=options, block=False) self.send(""" Test http://jabber.org/protocol/pubsub#publish-options presence """, use_values=False) def testPublishMulti(self): """Test publishing multiple items.""" payload1 = AtomEntry() payload1['title'] = 'Test 1' payload2 = AtomEntry() payload2['title'] = 'Test 2' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) self.xmpp['xep_0060'].publish( 'pubsub.example.com', 'somenode', items=[('ID1', payload1), ('ID2', payload2)], block=False) self.send(""" Test 1 Test 2 """, use_values=False) def testPublishMultiOptions(self): """Test publishing multiple items, with options.""" payload1 = AtomEntry() payload1['title'] = 'Test 1' payload2 = AtomEntry() payload2['title'] = 'Test 2' register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry) options = self.xmpp['xep_0004'].make_form() options.add_field(var='FORM_TYPE', ftype='hidden', value='http://jabber.org/protocol/pubsub#publish-options') options.add_field(var='pubsub#access_model', ftype='text-single', value='presence') options['type'] = 'submit' self.xmpp['xep_0060'].publish( 'pubsub.example.com', 'somenode', items=[('ID1', payload1), ('ID2', payload2)], options=options, block=False) self.send(""" Test 1 Test 2 http://jabber.org/protocol/pubsub#publish-options presence """, use_values=False) def testRetract(self): """Test deleting an item.""" self.xmpp['xep_0060'].retract( 'pubsub.example.com', 'somenode', 'ID1', block=False) self.send(""" """, use_values=False) def testPurge(self): """Test removing all items from a node.""" self.xmpp['xep_0060'].purge( 'pubsub.example.com', 'somenode', block=False) self.send(""" """) def testGetItem(self): """Test retrieving a single item.""" pass def testGetLatestItems(self): """Test retrieving the most recent N items.""" pass def testGetAllItems(self): """Test retrieving all items.""" pass def testGetSpecificItems(self): """Test retrieving a specific set of items.""" pass suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)