import sys
import time
import threading
from sleekxmpp.test import *
from sleekxmpp.stanza.atom import AtomEntry
from sleekxmpp.xmlstream import register_stanza_plugin
class TestStreamPubsub(SleekTest):
"""
Test using the XEP-0030 plugin.
"""
def setUp(self):
self.stream_start()
def tearDown(self):
self.stream_close()
def testCreateInstantNode(self):
"""Test creating an instant node"""
t = threading.Thread(name='create_node',
target=self.xmpp['xep_0060'].create_node,
args=('pubsub.example.com', None))
t.start()
self.send("""
""")
self.recv("""
""")
t.join()
def testCreateNodeNoConfig(self):
"""Test creating a node without a config"""
self.xmpp['xep_0060'].create_node(
'pubsub.example.com',
'princely_musings',
block=False)
self.send("""
""")
def testCreateNodeConfig(self):
"""Test creating a node with a config"""
form = self.xmpp['xep_0004'].stanza.Form()
form['type'] = 'submit'
form.add_field(var='pubsub#access_model', value='whitelist')
self.xmpp['xep_0060'].create_node(
'pubsub.example.com',
'princely_musings',
config=form, block=False)
self.send("""
whitelist
http://jabber.org/protocol/pubsub#node_config
""")
def testDeleteNode(self):
"""Test deleting a node"""
self.xmpp['xep_0060'].delete_node(
'pubsub.example.com',
'some_node',
block=False)
self.send("""
""")
def testSubscribeCase1(self):
"""
Test subscribing to a node: Case 1:
No subscribee, default 'from' JID, bare JID
"""
self.xmpp['xep_0060'].subscribe(
'pubsub.example.com',
'somenode',
block=False)
self.send("""
""")
def testSubscribeCase2(self):
"""
Test subscribing to a node: Case 2:
No subscribee, given 'from' JID, bare JID
"""
self.xmpp['xep_0060'].subscribe(
'pubsub.example.com',
'somenode',
ifrom='foo@comp.example.com/bar',
block=False)
self.send("""
""")
def testSubscribeCase3(self):
"""
Test subscribing to a node: Case 3:
No subscribee, given 'from' JID, full JID
"""
self.xmpp['xep_0060'].subscribe(
'pubsub.example.com',
'somenode',
ifrom='foo@comp.example.com/bar',
bare=False,
block=False)
self.send("""
""")
def testSubscribeCase4(self):
"""
Test subscribing to a node: Case 4:
No subscribee, no 'from' JID, full JID
"""
self.stream_close()
self.stream_start(jid='tester@localhost/full')
self.xmpp['xep_0060'].subscribe(
'pubsub.example.com',
'somenode',
bare=False,
block=False)
self.send("""
""")
def testSubscribeCase5(self):
"""
Test subscribing to a node: Case 5:
Subscribee given
"""
self.xmpp['xep_0060'].subscribe(
'pubsub.example.com',
'somenode',
subscribee='user@example.com/foo',
ifrom='foo@comp.example.com/bar',
block=False)
self.send("""
""")
def testSubscribeWithOptions(self):
pass
def testUnsubscribeCase1(self):
"""
Test unsubscribing from a node: Case 1:
No subscribee, default 'from' JID, bare JID
"""
self.xmpp['xep_0060'].unsubscribe(
'pubsub.example.com',
'somenode',
block=False)
self.send("""
""")
def testUnsubscribeCase2(self):
"""
Test unsubscribing from a node: Case 2:
No subscribee, given 'from' JID, bare JID
"""
self.xmpp['xep_0060'].unsubscribe(
'pubsub.example.com',
'somenode',
ifrom='foo@comp.example.com/bar',
block=False)
self.send("""
""")
def testUnsubscribeCase3(self):
"""
Test unsubscribing from a node: Case 3:
No subscribee, given 'from' JID, full JID
"""
self.xmpp['xep_0060'].unsubscribe(
'pubsub.example.com',
'somenode',
ifrom='foo@comp.example.com/bar',
bare=False,
block=False)
self.send("""
""")
def testUnsubscribeCase4(self):
"""
Test unsubscribing from a node: Case 4:
No subscribee, no 'from' JID, full JID
"""
self.stream_close()
self.stream_start(jid='tester@localhost/full')
self.xmpp['xep_0060'].unsubscribe(
'pubsub.example.com',
'somenode',
bare=False,
block=False)
self.send("""
""")
def testUnsubscribeCase5(self):
"""
Test unsubscribing from a node: Case 5:
Subscribee given
"""
self.xmpp['xep_0060'].unsubscribe(
'pubsub.example.com',
'somenode',
subscribee='user@example.com/foo',
ifrom='foo@comp.example.com/bar',
block=False)
self.send("""
""")
def testGetDefaultNodeConfig(self):
"""Test retrieving the default node config for a pubsub service."""
self.xmpp['xep_0060'].get_node_config(
'pubsub.example.com',
block=False)
self.send("""
""", use_values=False)
def testGetNodeConfig(self):
"""Test getting the config for a given node."""
self.xmpp['xep_0060'].get_node_config(
'pubsub.example.com',
'somenode',
block=False)
self.send("""
""", use_values=False)
def testSetNodeConfig(self):
"""Test setting the configuration for a node."""
form = self.xmpp['xep_0004'].make_form()
form.add_field(var='FORM_TYPE', ftype='hidden',
value='http://jabber.org/protocol/pubsub#node_config')
form.add_field(var='pubsub#title', ftype='text-single',
value='This is awesome!')
form['type'] = 'submit'
self.xmpp['xep_0060'].set_node_config(
'pubsub.example.com',
'somenode',
form,
block=False)
self.send("""
http://jabber.org/protocol/pubsub#node_config
This is awesome!
""")
def testPublishSingle(self):
"""Test publishing a single item."""
payload = AtomEntry()
payload['title'] = 'Test'
register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
self.xmpp['xep_0060'].publish(
'pubsub.example.com',
'somenode',
item_id='ID42',
payload=payload,
block=False)
self.send("""
-
Test
""")
def testPublishSingleOptions(self):
"""Test publishing a single item, with options."""
payload = AtomEntry()
payload['title'] = 'Test'
register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
options = self.xmpp['xep_0004'].make_form()
options.add_field(var='FORM_TYPE', ftype='hidden',
value='http://jabber.org/protocol/pubsub#publish-options')
options.add_field(var='pubsub#access_model', ftype='text-single',
value='presence')
options['type'] = 'submit'
self.xmpp['xep_0060'].publish(
'pubsub.example.com',
'somenode',
item_id='ID42',
payload=payload,
options=options,
block=False)
self.send("""
-
Test
http://jabber.org/protocol/pubsub#publish-options
presence
""", use_values=False)
def testPublishMulti(self):
"""Test publishing multiple items."""
payload1 = AtomEntry()
payload1['title'] = 'Test 1'
payload2 = AtomEntry()
payload2['title'] = 'Test 2'
register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
self.xmpp['xep_0060'].publish(
'pubsub.example.com',
'somenode',
items=[('ID1', payload1),
('ID2', payload2)],
block=False)
self.send("""
-
Test 1
-
Test 2
""", use_values=False)
def testPublishMultiOptions(self):
"""Test publishing multiple items, with options."""
payload1 = AtomEntry()
payload1['title'] = 'Test 1'
payload2 = AtomEntry()
payload2['title'] = 'Test 2'
register_stanza_plugin(self.xmpp['xep_0060'].stanza.Item, AtomEntry)
options = self.xmpp['xep_0004'].make_form()
options.add_field(var='FORM_TYPE', ftype='hidden',
value='http://jabber.org/protocol/pubsub#publish-options')
options.add_field(var='pubsub#access_model', ftype='text-single',
value='presence')
options['type'] = 'submit'
self.xmpp['xep_0060'].publish(
'pubsub.example.com',
'somenode',
items=[('ID1', payload1),
('ID2', payload2)],
options=options,
block=False)
self.send("""
-
Test 1
-
Test 2
http://jabber.org/protocol/pubsub#publish-options
presence
""", use_values=False)
def testRetract(self):
"""Test deleting an item."""
self.xmpp['xep_0060'].retract(
'pubsub.example.com',
'somenode',
'ID1',
block=False)
self.send("""
""", use_values=False)
def testPurge(self):
"""Test removing all items from a node."""
self.xmpp['xep_0060'].purge(
'pubsub.example.com',
'somenode',
block=False)
self.send("""
""")
def testGetItem(self):
"""Test retrieving a single item."""
pass
def testGetLatestItems(self):
"""Test retrieving the most recent N items."""
pass
def testGetAllItems(self):
"""Test retrieving all items."""
pass
def testGetSpecificItems(self):
"""Test retrieving a specific set of items."""
pass
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamPubsub)