From 0fffbb82000a1a6c3c23d62fedcbd8e8141f8994 Mon Sep 17 00:00:00 2001 From: Lance Stout Date: Thu, 7 Oct 2010 10:58:13 -0400 Subject: Unit test reorganization. Moved SleekTest to sleekxmpp.test. Organized test suites by their focus. - Suites focused on testing stanza objects are named test_stanza_X.py - Suites focused on testing stream behavior are name test_stream_X.py --- tests/__init__.py | 0 tests/sleektest.py | 650 ----------------------------------------- tests/test_addresses.py | 111 ------- tests/test_chatstates.py | 44 --- tests/test_disco.py | 176 ----------- tests/test_elementbase.py | 659 ----------------------------------------- tests/test_errorstanzas.py | 56 ---- tests/test_events.py | 3 +- tests/test_forms.py | 115 -------- tests/test_gmail.py | 88 ------ tests/test_handlers.py | 112 ------- tests/test_iqstanzas.py | 90 ------ tests/test_jid.py | 4 +- tests/test_messagestanzas.py | 57 ---- tests/test_presencestanzas.py | 67 ----- tests/test_pubsubstanzas.py | 511 -------------------------------- tests/test_roster.py | 84 ------ tests/test_stanza_base.py | 79 +++++ tests/test_stanza_element.py | 660 ++++++++++++++++++++++++++++++++++++++++++ tests/test_stanza_error.py | 57 ++++ tests/test_stanza_gmail.py | 88 ++++++ tests/test_stanza_iq.py | 90 ++++++ tests/test_stanza_message.py | 57 ++++ tests/test_stanza_presence.py | 66 +++++ tests/test_stanza_roster.py | 84 ++++++ tests/test_stanza_xep_0004.py | 115 ++++++++ tests/test_stanza_xep_0030.py | 176 +++++++++++ tests/test_stanza_xep_0033.py | 111 +++++++ tests/test_stanza_xep_0060.py | 511 ++++++++++++++++++++++++++++++++ tests/test_stanza_xep_0085.py | 44 +++ tests/test_stanzabase.py | 79 ----- tests/test_stream.py | 60 ++++ tests/test_stream_handlers.py | 112 +++++++ tests/test_streamtester.py | 60 ---- tests/test_tostring.py | 2 +- 35 files changed, 2315 insertions(+), 2963 deletions(-) delete mode 100644 tests/__init__.py delete mode 100644 tests/sleektest.py delete mode 100644 tests/test_addresses.py delete mode 100644 tests/test_chatstates.py delete mode 100644 tests/test_disco.py delete mode 100644 tests/test_elementbase.py delete mode 100644 tests/test_errorstanzas.py delete mode 100644 tests/test_forms.py delete mode 100644 tests/test_gmail.py delete mode 100644 tests/test_handlers.py delete mode 100644 tests/test_iqstanzas.py delete mode 100644 tests/test_messagestanzas.py delete mode 100644 tests/test_presencestanzas.py delete mode 100644 tests/test_pubsubstanzas.py delete mode 100644 tests/test_roster.py create mode 100644 tests/test_stanza_base.py create mode 100644 tests/test_stanza_element.py create mode 100644 tests/test_stanza_error.py create mode 100644 tests/test_stanza_gmail.py create mode 100644 tests/test_stanza_iq.py create mode 100644 tests/test_stanza_message.py create mode 100644 tests/test_stanza_presence.py create mode 100644 tests/test_stanza_roster.py create mode 100644 tests/test_stanza_xep_0004.py create mode 100644 tests/test_stanza_xep_0030.py create mode 100644 tests/test_stanza_xep_0033.py create mode 100644 tests/test_stanza_xep_0060.py create mode 100644 tests/test_stanza_xep_0085.py delete mode 100644 tests/test_stanzabase.py create mode 100644 tests/test_stream.py create mode 100644 tests/test_stream_handlers.py delete mode 100644 tests/test_streamtester.py (limited to 'tests') diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/sleektest.py b/tests/sleektest.py deleted file mode 100644 index 28901877..00000000 --- a/tests/sleektest.py +++ /dev/null @@ -1,650 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -import unittest -import socket -try: - import queue -except ImportError: - import Queue as queue - -import sleekxmpp -from sleekxmpp import ClientXMPP, ComponentXMPP -from sleekxmpp.stanza import Message, Iq, Presence -from sleekxmpp.xmlstream.stanzabase import registerStanzaPlugin, ET -from sleekxmpp.xmlstream.tostring import tostring - - -class TestSocket(object): - - """ - A dummy socket that reads and writes to queues instead - of an actual networking socket. - - Methods: - next_sent -- Return the next sent stanza. - recv_data -- Make a stanza available to read next. - recv -- Read the next stanza from the socket. - send -- Write a stanza to the socket. - makefile -- Dummy call, returns self. - read -- Read the next stanza from the socket. - """ - - def __init__(self, *args, **kwargs): - """ - Create a new test socket. - - Arguments: - Same as arguments for socket.socket - """ - self.socket = socket.socket(*args, **kwargs) - self.recv_queue = queue.Queue() - self.send_queue = queue.Queue() - - def __getattr__(self, name): - """ - Return attribute values of internal, dummy socket. - - Some attributes and methods are disabled to prevent the - socket from connecting to the network. - - Arguments: - name -- Name of the attribute requested. - """ - - def dummy(*args): - """Method to do nothing and prevent actual socket connections.""" - return None - - overrides = {'connect': dummy, - 'close': dummy, - 'shutdown': dummy} - - return overrides.get(name, getattr(self.socket, name)) - - # ------------------------------------------------------------------ - # Testing Interface - - def next_sent(self, timeout=None): - """ - Get the next stanza that has been 'sent'. - - Arguments: - timeout -- Optional timeout for waiting for a new value. - """ - args = {'block': False} - if timeout is not None: - args = {'block': True, 'timeout': timeout} - try: - return self.send_queue.get(**args) - except: - return None - - def recv_data(self, data): - """ - Add data to the receiving queue. - - Arguments: - data -- String data to 'write' to the socket to be received - by the XMPP client. - """ - self.recv_queue.put(data) - - # ------------------------------------------------------------------ - # Socket Interface - - def recv(self, *args, **kwargs): - """ - Read a value from the received queue. - - Arguments: - Placeholders. Same as for socket.Socket.recv. - """ - return self.read(block=True) - - def send(self, data): - """ - Send data by placing it in the send queue. - - Arguments: - data -- String value to write. - """ - self.send_queue.put(data) - - # ------------------------------------------------------------------ - # File Socket - - def makefile(self, *args, **kwargs): - """ - File socket version to use with ElementTree. - - Arguments: - Placeholders, same as socket.Socket.makefile() - """ - return self - - def read(self, block=True, timeout=None, **kwargs): - """ - Implement the file socket interface. - - Arguments: - block -- Indicate if the read should block until a - value is ready. - timeout -- Time in seconds a block should last before - returning None. - """ - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None - - -class SleekTest(unittest.TestCase): - - """ - A SleekXMPP specific TestCase class that provides - methods for comparing message, iq, and presence stanzas. - - Methods: - Message -- Create a Message stanza object. - Iq -- Create an Iq stanza object. - Presence -- Create a Presence stanza object. - check_stanza -- Compare a generic stanza against an XML string. - check_message -- Compare a Message stanza against an XML string. - check_iq -- Compare an Iq stanza against an XML string. - check_presence -- Compare a Presence stanza against an XML string. - stream_start -- Initialize a dummy XMPP client. - stream_recv -- Queue data for XMPP client to receive. - stream_make_header -- Create a stream header. - stream_send_header -- Check that the given header has been sent. - stream_send_message -- Check that the XMPP client sent the given - Message stanza. - stream_send_iq -- Check that the XMPP client sent the given - Iq stanza. - stream_send_presence -- Check thatt the XMPP client sent the given - Presence stanza. - stream_send_stanza -- Check that the XMPP client sent the given - generic stanza. - stream_close -- Disconnect the XMPP client. - fix_namespaces -- Add top-level namespace to an XML object. - compare -- Compare XML objects against each other. - """ - - # ------------------------------------------------------------------ - # Shortcut methods for creating stanza objects - - def Message(self, *args, **kwargs): - """ - Create a Message stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Message's values. - """ - return Message(None, *args, **kwargs) - - def Iq(self, *args, **kwargs): - """ - Create an Iq stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Iq's values. - """ - return Iq(None, *args, **kwargs) - - def Presence(self, *args, **kwargs): - """ - Create a Presence stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Iq's values. - """ - return Presence(None, *args, **kwargs) - - # ------------------------------------------------------------------ - # Methods for comparing stanza objects to XML strings - - def check_stanza(self, stanza_class, stanza, xml_string, - defaults=None, use_values=True): - """ - Create and compare several stanza objects to a correct XML string. - - If use_values is False, test using getStanzaValues() and - setStanzaValues() will not be used. - - Some stanzas provide default values for some interfaces, but - these defaults can be problematic for testing since they can easily - be forgotten when supplying the XML string. A list of interfaces that - use defaults may be provided and the generated stanzas will use the - default values for those interfaces if needed. - - However, correcting the supplied XML is not possible for interfaces - that add or remove XML elements. Only interfaces that map to XML - attributes may be set using the defaults parameter. The supplied XML - must take into account any extra elements that are included by default. - - Arguments: - stanza_class -- The class of the stanza being tested. - stanza -- The stanza object to test. - xml_string -- A string version of the correct XML expected. - defaults -- A list of stanza interfaces that have default - values. These interfaces will be set to their - defaults for the given and generated stanzas to - prevent unexpected test failures. - use_values -- Indicates if testing using getStanzaValues() and - setStanzaValues() should be used. Defaults to - True. - """ - xml = ET.fromstring(xml_string) - - # Ensure that top level namespaces are used, even if they - # were not provided. - self.fix_namespaces(stanza.xml, 'jabber:client') - self.fix_namespaces(xml, 'jabber:client') - - stanza2 = stanza_class(xml=xml) - - if use_values: - # Using getStanzaValues() and setStanzaValues() will add - # XML for any interface that has a default value. We need - # to set those defaults on the existing stanzas and XML - # so that they will compare correctly. - default_stanza = stanza_class() - if defaults is None: - defaults = [] - for interface in defaults: - stanza[interface] = stanza[interface] - stanza2[interface] = stanza2[interface] - # Can really only automatically add defaults for top - # level attribute values. Anything else must be accounted - # for in the provided XML string. - if interface not in xml.attrib: - if interface in default_stanza.xml.attrib: - value = default_stanza.xml.attrib[interface] - xml.attrib[interface] = value - - values = stanza2.getStanzaValues() - stanza3 = stanza_class() - stanza3.setStanzaValues(values) - - debug = "Three methods for creating stanzas do not match.\n" - debug += "Given XML:\n%s\n" % tostring(xml) - debug += "Given stanza:\n%s\n" % tostring(stanza.xml) - debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml) - debug += "Second generated stanza:\n%s\n" % tostring(stanza3.xml) - result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) - else: - debug = "Two methods for creating stanzas do not match.\n" - debug += "Given XML:\n%s\n" % tostring(xml) - debug += "Given stanza:\n%s\n" % tostring(stanza.xml) - debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml) - result = self.compare(xml, stanza.xml, stanza2.xml) - - self.failUnless(result, debug) - - def check_message(self, msg, xml_string, use_values=True): - """ - Create and compare several message stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - msg -- The Message stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - - return self.check_stanza(Message, msg, xml_string, - defaults=['type'], - use_values=use_values) - - def check_iq(self, iq, xml_string, use_values=True): - """ - Create and compare several iq stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - iq -- The Iq stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - return self.check_stanza(Iq, iq, xml_string, use_values=use_values) - - def check_presence(self, pres, xml_string, use_values=True): - """ - Create and compare several presence stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - iq -- The Iq stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - return self.check_stanza(Presence, pres, xml_string, - defaults=['priority'], - use_values=use_values) - - # ------------------------------------------------------------------ - # Methods for simulating stanza streams. - - def stream_start(self, mode='client', skip=True, header=None): - """ - Initialize an XMPP client or component using a dummy XML stream. - - Arguments: - mode -- Either 'client' or 'component'. Defaults to 'client'. - skip -- Indicates if the first item in the sent queue (the - stream header) should be removed. Tests that wish - to test initializing the stream should set this to - False. Otherwise, the default of True should be used. - """ - if mode == 'client': - self.xmpp = ClientXMPP('tester@localhost', 'test') - elif mode == 'component': - self.xmpp = ComponentXMPP('tester.localhost', 'test', - 'localhost', 8888) - else: - raise ValueError("Unknown XMPP connection mode.") - - self.xmpp.setSocket(TestSocket()) - self.xmpp.state.set('reconnect', False) - self.xmpp.state.set('is client', True) - self.xmpp.state.set('connected', True) - - # Must have the stream header ready for xmpp.process() to work. - if not header: - header = self.xmpp.stream_header - self.xmpp.socket.recv_data(header) - - self.xmpp.connect = lambda a=None, b=None, c=None, d=None: True - self.xmpp.process(threaded=True) - if skip: - # Clear startup stanzas - self.xmpp.socket.next_sent(timeout=0.01) - if mode == 'component': - self.xmpp.socket.next_sent(timeout=0.01) - - def stream_recv(self, data): - """ - Pass data to the dummy XMPP client as if it came from an XMPP server. - - Arguments: - data -- String stanza XML to be received and processed by the - XMPP client or component. - """ - data = str(data) - self.xmpp.socket.recv_data(data) - - def stream_make_header(self, sto='', - sfrom='', - sid='', - stream_ns="http://etherx.jabber.org/streams", - default_ns="jabber:client", - version="1.0", - xml_header=True): - """ - Create a stream header to be received by the test XMPP agent. - - The header must be saved and passed to stream_start. - - Arguments: - sto -- The recipient of the stream header. - sfrom -- The agent sending the stream header. - sid -- The stream's id. - stream_ns -- The namespace of the stream's root element. - default_ns -- The default stanza namespace. - version -- The stream version. - xml_header -- Indicates if the XML version header should be - appended before the stream header. - """ - header = '' - parts = [] - if xml_header: - header = '' + header - if sto: - parts.append('to="%s"' % sto) - if sfrom: - parts.append('from="%s"' % sfrom) - if sid: - parts.append('id="%s"' % sid) - parts.append('version="%s"' % version) - parts.append('xmlns:stream="%s"' % stream_ns) - parts.append('xmlns="%s"' % default_ns) - return header % ' '.join(parts) - - def stream_send_header(self, sto='', - sfrom='', - sid='', - stream_ns="http://etherx.jabber.org/streams", - default_ns="jabber:client", - version="1.0", - xml_header=False, - timeout=0.1): - """ - Check that a given stream header was sent. - - Arguments: - sto -- The recipient of the stream header. - sfrom -- The agent sending the stream header. - sid -- The stream's id. - stream_ns -- The namespace of the stream's root element. - default_ns -- The default stanza namespace. - version -- The stream version. - xml_header -- Indicates if the XML version header should be - appended before the stream header. - timeout -- Length of time to wait in seconds for a - response. - """ - header = self.stream_make_header(sto, sfrom, sid, - stream_ns=stream_ns, - default_ns=default_ns, - version=version, - xml_header=xml_header) - sent_header = self.xmpp.socket.next_sent(timeout) - if sent_header is None: - raise ValueError("Socket did not return data.") - - # Apply closing elements so that we can construct - # XML objects for comparison. - header2 = header + '' - sent_header2 = sent_header + '' - - xml = ET.fromstring(header2) - sent_xml = ET.fromstring(sent_header2) - - self.failUnless( - self.compare(xml, sent_xml), - "Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % ( - header, sent_header)) - - def stream_send_stanza(self, stanza_class, data, defaults=None, - use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_stanza. - - Arguments: - stanza_class -- The class of the sent stanza object. - data -- The XML string of the expected Message stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_message. - defaults -- A list of stanza interfaces that have defaults - values which may interfere with comparisons. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isintance(data, str): - data = stanza_class(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_stanza(stanza_class, data, sent, - defaults=defaults, - use_values=use_values) - - def stream_send_message(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_message. - - Arguments: - data -- The XML string of the expected Message stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_message. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Message(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_message(data, sent, use_values) - - def stream_send_iq(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_iq. - - Arguments: - data -- The XML string of the expected Iq stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_iq. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Iq(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_iq(data, sent, use_values) - - def stream_send_presence(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_presence. - - Arguments: - data -- The XML string of the expected Presence stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_presence. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Presence(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_presence(data, sent, use_values) - - def stream_close(self): - """ - Disconnect the dummy XMPP client. - - Can be safely called even if stream_start has not been called. - - Must be placed in the tearDown method of a test class to ensure - that the XMPP client is disconnected after an error. - """ - if hasattr(self, 'xmpp') and self.xmpp is not None: - self.xmpp.disconnect() - self.xmpp.socket.recv_data(self.xmpp.stream_footer) - - # ------------------------------------------------------------------ - # XML Comparison and Cleanup - - def fix_namespaces(self, xml, ns): - """ - Assign a namespace to an element and any children that - don't have a namespace. - - Arguments: - xml -- The XML object to fix. - ns -- The namespace to add to the XML object. - """ - if xml.tag.startswith('{'): - return - xml.tag = '{%s}%s' % (ns, xml.tag) - for child in xml.getchildren(): - self.fix_namespaces(child, ns) - - def compare(self, xml, *other): - """ - Compare XML objects. - - Arguments: - xml -- The XML object to compare against. - *other -- The list of XML objects to compare. - """ - if not other: - return False - - # Compare multiple objects - if len(other) > 1: - for xml2 in other: - if not self.compare(xml, xml2): - return False - return True - - other = other[0] - - # Step 1: Check tags - if xml.tag != other.tag: - return False - - # Step 2: Check attributes - if xml.attrib != other.attrib: - return False - - # Step 3: Check text - if xml.text is None: - xml.text = "" - if other.text is None: - other.text = "" - xml.text = xml.text.strip() - other.text = other.text.strip() - - if xml.text != other.text: - return False - - # Step 4: Recursively check children - for child in xml: - child2s = other.findall("%s" % child.tag) - if child2s is None: - return False - for child2 in child2s: - if self.compare(child, child2): - break - else: - return False - - # Everything matches - return True diff --git a/tests/test_addresses.py b/tests/test_addresses.py deleted file mode 100644 index 5c510c78..00000000 --- a/tests/test_addresses.py +++ /dev/null @@ -1,111 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0033 as xep_0033 - - -class TestAddresses(SleekTest): - - def setUp(self): - registerStanzaPlugin(Message, xep_0033.Addresses) - - def testAddAddress(self): - """Testing adding extended stanza address.""" - msg = self.Message() - msg['addresses'].addAddress(atype='to', jid='to@header1.org') - self.check_message(msg, """ - - -
- - - """) - - msg = self.Message() - msg['addresses'].addAddress(atype='replyto', - jid='replyto@header1.org', - desc='Reply address') - self.check_message(msg, """ - - -
- - - """) - - def testAddAddresses(self): - """Testing adding multiple extended stanza addresses.""" - - xmlstring = """ - - -
-
-
- - - """ - - msg = self.Message() - msg['addresses'].setAddresses([ - {'type':'replyto', - 'jid':'replyto@header1.org', - 'desc':'Reply address'}, - {'type':'cc', - 'jid':'cc@header2.org'}, - {'type':'bcc', - 'jid':'bcc@header2.org'}]) - self.check_message(msg, xmlstring) - - msg = self.Message() - msg['addresses']['replyto'] = [{'jid':'replyto@header1.org', - 'desc':'Reply address'}] - msg['addresses']['cc'] = [{'jid':'cc@header2.org'}] - msg['addresses']['bcc'] = [{'jid':'bcc@header2.org'}] - self.check_message(msg, xmlstring) - - def testAddURI(self): - """Testing adding URI attribute to extended stanza address.""" - - msg = self.Message() - addr = msg['addresses'].addAddress(atype='to', - jid='to@header1.org', - node='foo') - self.check_message(msg, """ - - -
- - - """) - - addr['uri'] = 'mailto:to@header2.org' - self.check_message(msg, """ - - -
- - - """) - - def testDelivered(self): - """Testing delivered attribute of extended stanza addresses.""" - - xmlstring = """ - - -
- - - """ - - msg = self.Message() - addr = msg['addresses'].addAddress(jid='to@header1.org', atype='to') - self.check_message(msg, xmlstring % '') - - addr['delivered'] = True - self.check_message(msg, xmlstring % 'delivered="true"') - - addr['delivered'] = False - self.check_message(msg, xmlstring % '') - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestAddresses) diff --git a/tests/test_chatstates.py b/tests/test_chatstates.py deleted file mode 100644 index d5e3979a..00000000 --- a/tests/test_chatstates.py +++ /dev/null @@ -1,44 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0085 as xep_0085 - -class TestChatStates(SleekTest): - - def setUp(self): - registerStanzaPlugin(Message, xep_0085.Active) - registerStanzaPlugin(Message, xep_0085.Composing) - registerStanzaPlugin(Message, xep_0085.Gone) - registerStanzaPlugin(Message, xep_0085.Inactive) - registerStanzaPlugin(Message, xep_0085.Paused) - - def testCreateChatState(self): - """Testing creating chat states.""" - - xmlstring = """ - - <%s xmlns="http://jabber.org/protocol/chatstates" /> - - """ - - msg = self.Message() - msg['chat_state'].active() - self.check_message(msg, xmlstring % 'active', - use_values=False) - - msg['chat_state'].composing() - self.check_message(msg, xmlstring % 'composing', - use_values=False) - - - msg['chat_state'].gone() - self.check_message(msg, xmlstring % 'gone', - use_values=False) - - msg['chat_state'].inactive() - self.check_message(msg, xmlstring % 'inactive', - use_values=False) - - msg['chat_state'].paused() - self.check_message(msg, xmlstring % 'paused', - use_values=False) - -suite = unittest.TestLoader().loadTestsFromTestCase(TestChatStates) diff --git a/tests/test_disco.py b/tests/test_disco.py deleted file mode 100644 index f65fdaf4..00000000 --- a/tests/test_disco.py +++ /dev/null @@ -1,176 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0030 as xep_0030 - - -class TestDisco(SleekTest): - - def setUp(self): - registerStanzaPlugin(Iq, xep_0030.DiscoInfo) - registerStanzaPlugin(Iq, xep_0030.DiscoItems) - - def testCreateInfoQueryNoNode(self): - """Testing disco#info query with no node.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = '' - - self.check_iq(iq, """ - - - - """) - - def testCreateInfoQueryWithNode(self): - """Testing disco#info query with a node.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = 'foo' - - self.check_iq(iq, """ - - - - """) - - def testCreateInfoQueryNoNode(self): - """Testing disco#items query with no node.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_items']['node'] = '' - - self.check_iq(iq, """ - - - - """) - - def testCreateItemsQueryWithNode(self): - """Testing disco#items query with a node.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_items']['node'] = 'foo' - - self.check_iq(iq, """ - - - - """) - - def testInfoIdentities(self): - """Testing adding identities to disco#info.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = 'foo' - iq['disco_info'].addIdentity('conference', 'text', 'Chatroom') - - self.check_iq(iq, """ - - - - - - """) - - def testInfoFeatures(self): - """Testing adding features to disco#info.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_info']['node'] = 'foo' - iq['disco_info'].addFeature('foo') - iq['disco_info'].addFeature('bar') - - self.check_iq(iq, """ - - - - - - - """) - - def testItems(self): - """Testing adding features to disco#info.""" - iq = self.Iq() - iq['id'] = "0" - iq['disco_items']['node'] = 'foo' - iq['disco_items'].addItem('user@localhost') - iq['disco_items'].addItem('user@localhost', 'foo') - iq['disco_items'].addItem('user@localhost', 'bar', 'Testing') - - self.check_iq(iq, """ - - - - - - - - """) - - def testAddRemoveIdentities(self): - """Test adding and removing identities to disco#info stanza""" - ids = [('automation', 'commands', 'AdHoc'), - ('conference', 'text', 'ChatRoom')] - - info = xep_0030.DiscoInfo() - info.addIdentity(*ids[0]) - self.failUnless(info.getIdentities() == [ids[0]]) - - info.delIdentity('automation', 'commands') - self.failUnless(info.getIdentities() == []) - - info.setIdentities(ids) - self.failUnless(info.getIdentities() == ids) - - info.delIdentity('automation', 'commands') - self.failUnless(info.getIdentities() == [ids[1]]) - - info.delIdentities() - self.failUnless(info.getIdentities() == []) - - def testAddRemoveFeatures(self): - """Test adding and removing features to disco#info stanza""" - features = ['foo', 'bar', 'baz'] - - info = xep_0030.DiscoInfo() - info.addFeature(features[0]) - self.failUnless(info.getFeatures() == [features[0]]) - - info.delFeature('foo') - self.failUnless(info.getFeatures() == []) - - info.setFeatures(features) - self.failUnless(info.getFeatures() == features) - - info.delFeature('bar') - self.failUnless(info.getFeatures() == ['foo', 'baz']) - - info.delFeatures() - self.failUnless(info.getFeatures() == []) - - def testAddRemoveItems(self): - """Test adding and removing items to disco#items stanza""" - items = [('user@localhost', None, None), - ('user@localhost', 'foo', None), - ('user@localhost', 'bar', 'Test')] - - info = xep_0030.DiscoItems() - self.failUnless(True, ""+str(items[0])) - - info.addItem(*(items[0])) - self.failUnless(info.getItems() == [items[0]], info.getItems()) - - info.delItem('user@localhost') - self.failUnless(info.getItems() == []) - - info.setItems(items) - self.failUnless(info.getItems() == items) - - info.delItem('user@localhost', 'foo') - self.failUnless(info.getItems() == [items[0], items[2]]) - - info.delItems() - self.failUnless(info.getItems() == []) - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco) diff --git a/tests/test_elementbase.py b/tests/test_elementbase.py deleted file mode 100644 index 19794c90..00000000 --- a/tests/test_elementbase.py +++ /dev/null @@ -1,659 +0,0 @@ -from . sleektest import * -from sleekxmpp.xmlstream.stanzabase import ElementBase - -class TestElementBase(SleekTest): - - def testFixNs(self): - """Test fixing namespaces in an XPath expression.""" - - e = ElementBase() - ns = "http://jabber.org/protocol/disco#items" - result = e._fix_ns("{%s}foo/bar/{abc}baz/{%s}more" % (ns, ns)) - - expected = "/".join(["{%s}foo" % ns, - "{%s}bar" % ns, - "{abc}baz", - "{%s}more" % ns]) - self.failUnless(expected == result, - "Incorrect namespace fixing result: %s" % str(result)) - - - def testExtendedName(self): - """Test element names of the form tag1/tag2/tag3.""" - - class TestStanza(ElementBase): - name = "foo/bar/baz" - namespace = "test" - - stanza = TestStanza() - self.check_stanza(TestStanza, stanza, """ - - - - - - """) - - def testGetStanzaValues(self): - """Test getStanzaValues using plugins and substanzas.""" - - class TestStanzaPlugin(ElementBase): - name = "foo2" - namespace = "foo" - interfaces = set(('bar', 'baz')) - plugin_attrib = "foo2" - - class TestSubStanza(ElementBase): - name = "subfoo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - subitem = set((TestSubStanza,)) - - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - - stanza = TestStanza() - stanza['bar'] = 'a' - stanza['foo2']['baz'] = 'b' - substanza = TestSubStanza() - substanza['bar'] = 'c' - stanza.append(substanza) - - values = stanza.getStanzaValues() - expected = {'bar': 'a', - 'baz': '', - 'foo2': {'bar': '', - 'baz': 'b'}, - 'substanzas': [{'__childtag__': '{foo}subfoo', - 'bar': 'c', - 'baz': ''}]} - self.failUnless(values == expected, - "Unexpected stanza values:\n%s\n%s" % (str(expected), str(values))) - - - def testSetStanzaValues(self): - """Test using setStanzaValues with substanzas and plugins.""" - - class TestStanzaPlugin(ElementBase): - name = "pluginfoo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - plugin_attrib = "plugin_foo" - - class TestStanzaPlugin2(ElementBase): - name = "pluginfoo2" - namespace = "foo" - interfaces = set(('bar', 'baz')) - plugin_attrib = "plugin_foo2" - - class TestSubStanza(ElementBase): - name = "subfoo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - subitem = set((TestSubStanza,)) - - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - registerStanzaPlugin(TestStanza, TestStanzaPlugin2) - - stanza = TestStanza() - values = {'bar': 'a', - 'baz': '', - 'plugin_foo': {'bar': '', - 'baz': 'b'}, - 'plugin_foo2': {'bar': 'd', - 'baz': 'e'}, - 'substanzas': [{'__childtag__': '{foo}subfoo', - 'bar': 'c', - 'baz': ''}]} - stanza.setStanzaValues(values) - - self.check_stanza(TestStanza, stanza, """ - - - - - - """) - - def testGetItem(self): - """Test accessing stanza interfaces.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz', 'qux')) - sub_interfaces = set(('baz',)) - - def getQux(self): - return 'qux' - - class TestStanzaPlugin(ElementBase): - name = "foobar" - namespace = "foo" - plugin_attrib = "foobar" - interfaces = set(('fizz',)) - - TestStanza.subitem = (TestStanza,) - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - - stanza = TestStanza() - substanza = TestStanza() - stanza.append(substanza) - stanza.setStanzaValues({'bar': 'a', - 'baz': 'b', - 'qux': 42, - 'foobar': {'fizz': 'c'}}) - - # Test non-plugin interfaces - expected = {'substanzas': [substanza], - 'bar': 'a', - 'baz': 'b', - 'qux': 'qux', - 'meh': ''} - for interface, value in expected.items(): - result = stanza[interface] - self.failUnless(result == value, - "Incorrect stanza interface access result: %s" % result) - - # Test plugin interfaces - self.failUnless(isinstance(stanza['foobar'], TestStanzaPlugin), - "Incorrect plugin object result.") - self.failUnless(stanza['foobar']['fizz'] == 'c', - "Incorrect plugin subvalue result.") - - def testSetItem(self): - """Test assigning to stanza interfaces.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz', 'qux')) - sub_interfaces = set(('baz',)) - - def setQux(self, value): - pass - - class TestStanzaPlugin(ElementBase): - name = "foobar" - namespace = "foo" - plugin_attrib = "foobar" - interfaces = set(('foobar',)) - - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - - stanza = TestStanza() - - stanza['bar'] = 'attribute!' - stanza['baz'] = 'element!' - stanza['qux'] = 'overridden' - stanza['foobar'] = 'plugin' - - self.check_stanza(TestStanza, stanza, """ - - element! - - - """) - - def testDelItem(self): - """Test deleting stanza interface values.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz', 'qux')) - sub_interfaces = set(('bar',)) - - def delQux(self): - pass - - class TestStanzaPlugin(ElementBase): - name = "foobar" - namespace = "foo" - plugin_attrib = "foobar" - interfaces = set(('foobar',)) - - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - - stanza = TestStanza() - stanza['bar'] = 'a' - stanza['baz'] = 'b' - stanza['qux'] = 'c' - stanza['foobar']['foobar'] = 'd' - - self.check_stanza(TestStanza, stanza, """ - - a - - - """) - - del stanza['bar'] - del stanza['baz'] - del stanza['qux'] - del stanza['foobar'] - - self.check_stanza(TestStanza, stanza, """ - - """) - - def testModifyingAttributes(self): - """Test modifying top level attributes of a stanza's XML object.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - stanza = TestStanza() - - self.check_stanza(TestStanza, stanza, """ - - """) - - self.failUnless(stanza._getAttr('bar') == '', - "Incorrect value returned for an unset XML attribute.") - - stanza._setAttr('bar', 'a') - stanza._setAttr('baz', 'b') - - self.check_stanza(TestStanza, stanza, """ - - """) - - self.failUnless(stanza._getAttr('bar') == 'a', - "Retrieved XML attribute value is incorrect.") - - stanza._setAttr('bar', None) - stanza._delAttr('baz') - - self.check_stanza(TestStanza, stanza, """ - - """) - - self.failUnless(stanza._getAttr('bar', 'c') == 'c', - "Incorrect default value returned for an unset XML attribute.") - - def testGetSubText(self): - """Test retrieving the contents of a sub element.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar',)) - - def setBar(self, value): - wrapper = ET.Element("{foo}wrapper") - bar = ET.Element("{foo}bar") - bar.text = value - wrapper.append(bar) - self.xml.append(wrapper) - - def getBar(self): - return self._getSubText("wrapper/bar", default="not found") - - stanza = TestStanza() - self.failUnless(stanza['bar'] == 'not found', - "Default _getSubText value incorrect.") - - stanza['bar'] = 'found' - self.check_stanza(TestStanza, stanza, """ - - - found - - - """) - self.failUnless(stanza['bar'] == 'found', - "_getSubText value incorrect: %s." % stanza['bar']) - - def testSubElement(self): - """Test setting the contents of a sub element.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - def setBaz(self, value): - self._setSubText("wrapper/baz", text=value) - - def getBaz(self): - return self._getSubText("wrapper/baz") - - def setBar(self, value): - self._setSubText("wrapper/bar", text=value) - - def getBar(self): - return self._getSubText("wrapper/bar") - - stanza = TestStanza() - stanza['bar'] = 'a' - stanza['baz'] = 'b' - self.check_stanza(TestStanza, stanza, """ - - - a - b - - - """) - stanza._setSubText('wrapper/bar', text='', keep=True) - self.check_stanza(TestStanza, stanza, """ - - - - b - - - """, use_values=False) - - stanza['bar'] = 'a' - stanza._setSubText('wrapper/bar', text='') - self.check_stanza(TestStanza, stanza, """ - - - b - - - """) - - def testDelSub(self): - """Test removing sub elements.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - def setBar(self, value): - self._setSubText("path/to/only/bar", value); - - def getBar(self): - return self._getSubText("path/to/only/bar") - - def delBar(self): - self._delSub("path/to/only/bar") - - def setBaz(self, value): - self._setSubText("path/to/just/baz", value); - - def getBaz(self): - return self._getSubText("path/to/just/baz") - - def delBaz(self): - self._delSub("path/to/just/baz") - - stanza = TestStanza() - stanza['bar'] = 'a' - stanza['baz'] = 'b' - - self.check_stanza(TestStanza, stanza, """ - - - - - a - - - b - - - - - """) - - del stanza['bar'] - del stanza['baz'] - - self.check_stanza(TestStanza, stanza, """ - - - - - - - - - """, use_values=False) - - stanza['bar'] = 'a' - stanza['baz'] = 'b' - - stanza._delSub('path/to/only/bar', all=True) - - self.check_stanza(TestStanza, stanza, """ - - - - - b - - - - - """) - - def testMatch(self): - """Test matching a stanza against an XPath expression.""" - - class TestSubStanza(ElementBase): - name = "sub" - namespace = "baz" - interfaces = set(('attrib',)) - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar','baz', 'qux')) - sub_interfaces = set(('qux',)) - subitem = (TestSubStanza,) - - def setQux(self, value): - self._setSubText('qux', text=value) - - def getQux(self): - return self._getSubText('qux') - - class TestStanzaPlugin(ElementBase): - name = "plugin" - namespace = "http://test/slash/bar" - interfaces = set(('attrib',)) - - registerStanzaPlugin(TestStanza, TestStanzaPlugin) - - stanza = TestStanza() - self.failUnless(stanza.match("foo"), - "Stanza did not match its own tag name.") - - self.failUnless(stanza.match("{foo}foo"), - "Stanza did not match its own namespaced name.") - - stanza['bar'] = 'a' - self.failUnless(stanza.match("foo@bar=a"), - "Stanza did not match its own name with attribute value check.") - - stanza['baz'] = 'b' - self.failUnless(stanza.match("foo@bar=a@baz=b"), - "Stanza did not match its own name with multiple attributes.") - - stanza['qux'] = 'c' - self.failUnless(stanza.match("foo/qux"), - "Stanza did not match with subelements.") - - stanza['qux'] = '' - self.failUnless(stanza.match("foo/qux") == False, - "Stanza matched missing subinterface element.") - - self.failUnless(stanza.match("foo/bar") == False, - "Stanza matched nonexistent element.") - - stanza['plugin']['attrib'] = 'c' - self.failUnless(stanza.match("foo/plugin@attrib=c"), - "Stanza did not match with plugin and attribute.") - - self.failUnless(stanza.match("foo/{http://test/slash/bar}plugin"), - "Stanza did not match with namespaced plugin.") - - substanza = TestSubStanza() - substanza['attrib'] = 'd' - stanza.append(substanza) - self.failUnless(stanza.match("foo/sub@attrib=d"), - "Stanza did not match with substanzas and attribute.") - - self.failUnless(stanza.match("foo/{baz}sub"), - "Stanza did not match with namespaced substanza.") - - def testComparisons(self): - """Test comparing ElementBase objects.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - stanza1 = TestStanza() - stanza1['bar'] = 'a' - - self.failUnless(stanza1, - "Stanza object does not evaluate to True") - - stanza2 = TestStanza() - stanza2['baz'] = 'b' - - self.failUnless(stanza1 != stanza2, - "Different stanza objects incorrectly compared equal.") - - stanza1['baz'] = 'b' - stanza2['bar'] = 'a' - - self.failUnless(stanza1 == stanza2, - "Equal stanzas incorrectly compared inequal.") - - def testKeys(self): - """Test extracting interface names from a stanza object.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - plugin_attrib = 'qux' - - registerStanzaPlugin(TestStanza, TestStanza) - - stanza = TestStanza() - - self.failUnless(set(stanza.keys()) == set(('bar', 'baz')), - "Returned set of interface keys does not match expected.") - - stanza.enable('qux') - - self.failUnless(set(stanza.keys()) == set(('bar', 'baz', 'qux')), - "Incorrect set of interface and plugin keys.") - - def testGet(self): - """Test accessing stanza interfaces using get().""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - stanza = TestStanza() - stanza['bar'] = 'a' - - self.failUnless(stanza.get('bar') == 'a', - "Incorrect value returned by stanza.get") - - self.failUnless(stanza.get('baz', 'b') == 'b', - "Incorrect default value returned by stanza.get") - - def testSubStanzas(self): - """Test manipulating substanzas of a stanza object.""" - - class TestSubStanza(ElementBase): - name = "foobar" - namespace = "foo" - interfaces = set(('qux',)) - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - subitem = (TestSubStanza,) - - stanza = TestStanza() - substanza1 = TestSubStanza() - substanza2 = TestSubStanza() - substanza1['qux'] = 'a' - substanza2['qux'] = 'b' - - # Test appending substanzas - self.failUnless(len(stanza) == 0, - "Incorrect empty stanza size.") - - stanza.append(substanza1) - self.check_stanza(TestStanza, stanza, """ - - - - """) - self.failUnless(len(stanza) == 1, - "Incorrect stanza size with 1 substanza.") - - stanza.append(substanza2) - self.check_stanza(TestStanza, stanza, """ - - - - - """) - self.failUnless(len(stanza) == 2, - "Incorrect stanza size with 2 substanzas.") - - # Test popping substanzas - stanza.pop(0) - self.check_stanza(TestStanza, stanza, """ - - - - """) - - # Test iterating over substanzas - stanza.append(substanza1) - results = [] - for substanza in stanza: - results.append(substanza['qux']) - self.failUnless(results == ['b', 'a'], - "Iteration over substanzas failed: %s." % str(results)) - - def testCopy(self): - """Test copying stanza objects.""" - - class TestStanza(ElementBase): - name = "foo" - namespace = "foo" - interfaces = set(('bar', 'baz')) - - stanza1 = TestStanza() - stanza1['bar'] = 'a' - - stanza2 = stanza1.__copy__() - - self.failUnless(stanza1 == stanza2, - "Copied stanzas are not equal to each other.") - - stanza1['baz'] = 'b' - self.failUnless(stanza1 != stanza2, - "Divergent stanza copies incorrectly compared equal.") - -suite = unittest.TestLoader().loadTestsFromTestCase(TestElementBase) diff --git a/tests/test_errorstanzas.py b/tests/test_errorstanzas.py deleted file mode 100644 index a883b7ef..00000000 --- a/tests/test_errorstanzas.py +++ /dev/null @@ -1,56 +0,0 @@ -from . sleektest import * - -class TestErrorStanzas(SleekTest): - - def testSetup(self): - """Test setting initial values in error stanza.""" - msg = self.Message() - msg.enable('error') - self.check_message(msg, """ - - - - - - """) - - def testCondition(self): - """Test modifying the error condition.""" - msg = self.Message() - msg['error']['condition'] = 'item-not-found' - - self.check_message(msg, """ - - - - - - """) - - self.failUnless(msg['error']['condition'] == 'item-not-found', "Error condition doesn't match.") - - del msg['error']['condition'] - - self.check_message(msg, """ - - - - """) - - def testDelCondition(self): - """Test that deleting error conditions doesn't remove extra elements.""" - msg = self.Message() - msg['error']['text'] = 'Error!' - msg['error']['condition'] = 'internal-server-error' - - del msg['error']['condition'] - - self.check_message(msg, """ - - - Error! - - - """) - -suite = unittest.TestLoader().loadTestsFromTestCase(TestErrorStanzas) diff --git a/tests/test_events.py b/tests/test_events.py index d9d80c30..ea4cf8a4 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -1,6 +1,5 @@ -import sleekxmpp import time -from . sleektest import * +from sleekxmpp.test import * class TestEvents(SleekTest): diff --git a/tests/test_forms.py b/tests/test_forms.py deleted file mode 100644 index 67b0f6a8..00000000 --- a/tests/test_forms.py +++ /dev/null @@ -1,115 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0004 as xep_0004 - - -class TestDataForms(SleekTest): - - def setUp(self): - registerStanzaPlugin(Message, xep_0004.Form) - registerStanzaPlugin(xep_0004.Form, xep_0004.FormField) - registerStanzaPlugin(xep_0004.FormField, xep_0004.FieldOption) - - def testMultipleInstructions(self): - """Testing using multiple instructions elements in a data form.""" - msg = self.Message() - msg['form']['instructions'] = "Instructions\nSecond batch" - - self.check_message(msg, """ - - - Instructions - Second batch - - - """) - - def testAddField(self): - """Testing adding fields to a data form.""" - - msg = self.Message() - form = msg['form'] - form.addField(var='f1', - ftype='text-single', - label='Text', - desc='A text field', - required=True, - value='Some text!') - - self.check_message(msg, """ - - - - A text field - - Some text! - - - - """) - - form['fields'] = [('f1', {'type': 'text-single', - 'label': 'Username', - 'required': True}), - ('f2', {'type': 'text-private', - 'label': 'Password', - 'required': True}), - ('f3', {'type': 'text-multi', - 'label': 'Message', - 'value': 'Enter message.\nA long one even.'}), - ('f4', {'type': 'list-single', - 'label': 'Message Type', - 'options': [{'label': 'Cool!', - 'value': 'cool'}, - {'label': 'Urgh!', - 'value': 'urgh'}]})] - self.check_message(msg, """ - - - - - - - - - - Enter message. - A long one even. - - - - - - - - """) - - def testSetValues(self): - """Testing setting form values""" - - msg = self.Message() - form = msg['form'] - form.setFields([ - ('foo', {'type': 'text-single'}), - ('bar', {'type': 'list-multi'})]) - - form.setValues({'foo': 'Foo!', - 'bar': ['a', 'b']}) - - self.check_message(msg, """ - - - - Foo! - - - a - b - - - """) - -suite = unittest.TestLoader().loadTestsFromTestCase(TestDataForms) diff --git a/tests/test_gmail.py b/tests/test_gmail.py deleted file mode 100644 index 5636e877..00000000 --- a/tests/test_gmail.py +++ /dev/null @@ -1,88 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.gmail_notify as gmail - - -class TestGmail(SleekTest): - - def setUp(self): - registerStanzaPlugin(Iq, gmail.GmailQuery) - registerStanzaPlugin(Iq, gmail.MailBox) - registerStanzaPlugin(Iq, gmail.NewMail) - - def testCreateQuery(self): - """Testing querying Gmail for emails.""" - - iq = self.Iq() - iq['type'] = 'get' - iq['gmail']['search'] = 'is:starred' - iq['gmail']['newer-than-time'] = '1140638252542' - iq['gmail']['newer-than-tid'] = '11134623426430234' - - self.check_iq(iq, """ - - - - """) - - def testMailBox(self): - """Testing reading from Gmail mailbox result""" - - # Use the example from Google's documentation at - # http://code.google.com/apis/talk/jep_extensions/gmail.html#notifications - xml = ET.fromstring(""" - - - - - - - - - act1scene3 - Put thy rapier up. - Ay, ay, a scratch, a scratch; marry, 'tis enough. - - - - """) - - iq = self.Iq(xml=xml) - mailbox = iq['mailbox'] - self.failUnless(mailbox['result-time'] == '1118012394209', "result-time doesn't match") - self.failUnless(mailbox['url'] == 'http://mail.google.com/mail', "url doesn't match") - self.failUnless(mailbox['matched'] == '95', "total-matched incorrect") - self.failUnless(mailbox['estimate'] == False, "total-estimate incorrect") - self.failUnless(len(mailbox['threads']) == 1, "could not extract message threads") - - thread = mailbox['threads'][0] - self.failUnless(thread['tid'] == '1172320964060972012', "thread tid doesn't match") - self.failUnless(thread['participation'] == '1', "thread participation incorrect") - self.failUnless(thread['messages'] == '28', "thread message count incorrect") - self.failUnless(thread['date'] == '1118012394209', "thread date doesn't match") - self.failUnless(thread['url'] == 'http://mail.google.com/mail?view=cv', "thread url doesn't match") - self.failUnless(thread['labels'] == 'act1scene3', "thread labels incorrect") - self.failUnless(thread['subject'] == 'Put thy rapier up.', "thread subject doesn't match") - self.failUnless(thread['snippet'] == "Ay, ay, a scratch, a scratch; marry, 'tis enough.", "snippet doesn't match") - self.failUnless(len(thread['senders']) == 3, "could not extract senders") - - sender1 = thread['senders'][0] - self.failUnless(sender1['name'] == 'Me', "sender name doesn't match") - self.failUnless(sender1['address'] == 'romeo@gmail.com', "sender address doesn't match") - self.failUnless(sender1['originator'] == True, "sender originator incorrect") - self.failUnless(sender1['unread'] == False, "sender unread incorrectly True") - - sender2 = thread['senders'][2] - self.failUnless(sender2['unread'] == True, "sender unread incorrectly False") - -suite = unittest.TestLoader().loadTestsFromTestCase(TestGmail) diff --git a/tests/test_handlers.py b/tests/test_handlers.py deleted file mode 100644 index 70eb0248..00000000 --- a/tests/test_handlers.py +++ /dev/null @@ -1,112 +0,0 @@ -from . sleektest import * -import sleekxmpp -from sleekxmpp.xmlstream.handler import * -from sleekxmpp.xmlstream.matcher import * - -class TestHandlers(SleekTest): - """ - Test using handlers and waiters. - """ - - def setUp(self): - self.stream_start() - - def tearDown(self): - self.stream_close() - - def testCallback(self): - """Test using stream callback handlers.""" - - def callback_handler(stanza): - self.xmpp.sendRaw(""" - - Success! - - """) - - callback = Callback('Test Callback', - MatchXPath('{test}tester'), - callback_handler) - - self.xmpp.registerHandler(callback) - - self.stream_recv("""""") - - msg = self.Message() - msg['body'] = 'Success!' - self.stream_send_message(msg) - - def testWaiter(self): - """Test using stream waiter handler.""" - - def waiter_handler(stanza): - iq = self.xmpp.Iq() - iq['id'] = 'test' - iq['type'] = 'set' - iq['query'] = 'test' - reply = iq.send(block=True) - if reply: - self.xmpp.sendRaw(""" - - Successful: %s - - """ % reply['query']) - - self.xmpp.add_event_handler('message', waiter_handler, threaded=True) - - # Send message to trigger waiter_handler - self.stream_recv(""" - - Testing - - """) - - # Check that Iq was sent by waiter_handler - iq = self.Iq() - iq['id'] = 'test' - iq['type'] = 'set' - iq['query'] = 'test' - self.stream_send_iq(iq) - - # Send the reply Iq - self.stream_recv(""" - - - - """) - - # Check that waiter_handler received the reply - msg = self.Message() - msg['body'] = 'Successful: test' - self.stream_send_message(msg) - - def testWaiterTimeout(self): - """Test that waiter handler is removed after timeout.""" - - def waiter_handler(stanza): - iq = self.xmpp.Iq() - iq['id'] = 'test2' - iq['type'] = 'set' - iq['query'] = 'test2' - reply = iq.send(block=True, timeout=0) - - self.xmpp.add_event_handler('message', waiter_handler, threaded=True) - - # Start test by triggerig waiter_handler - self.stream_recv("""Start Test""") - - # Check that Iq was sent to trigger start of timeout period - iq = self.Iq() - iq['id'] = 'test2' - iq['type'] = 'set' - iq['query'] = 'test2' - self.stream_send_iq(iq) - - # Check that the waiter is no longer registered - waiter_exists = self.xmpp.removeHandler('IqWait_test2') - - self.failUnless(waiter_exists == False, - "Waiter handler was not removed.") - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers) diff --git a/tests/test_iqstanzas.py b/tests/test_iqstanzas.py deleted file mode 100644 index 197bc001..00000000 --- a/tests/test_iqstanzas.py +++ /dev/null @@ -1,90 +0,0 @@ -from . sleektest import * -from sleekxmpp.xmlstream.stanzabase import ET - - -class TestIqStanzas(SleekTest): - - def tearDown(self): - """Shutdown the XML stream after testing.""" - self.stream_close() - - def testSetup(self): - """Test initializing default Iq values.""" - iq = self.Iq() - self.check_iq(iq, """ - - """) - - def testPayload(self): - """Test setting Iq stanza payload.""" - iq = self.Iq() - iq.setPayload(ET.Element('{test}tester')) - self.check_iq(iq, """ - - - - """, use_values=False) - - - def testUnhandled(self): - """Test behavior for Iq.unhandled.""" - self.stream_start() - self.stream_recv(""" - - - - """) - - iq = self.Iq() - iq['id'] = 'test' - iq['error']['condition'] = 'feature-not-implemented' - iq['error']['text'] = 'No handlers registered for this request.' - - self.stream_send_iq(iq, """ - - - - - No handlers registered for this request. - - - - """) - - def testQuery(self): - """Test modifying query element of Iq stanzas.""" - iq = self.Iq() - - iq['query'] = 'query_ns' - self.check_iq(iq, """ - - - - """) - - iq['query'] = 'query_ns2' - self.check_iq(iq, """ - - - - """) - - self.failUnless(iq['query'] == 'query_ns2', "Query namespace doesn't match") - - del iq['query'] - self.check_iq(iq, """ - - """) - - def testReply(self): - """Test setting proper result type in Iq replies.""" - iq = self.Iq() - iq['to'] = 'user@localhost' - iq['type'] = 'get' - iq.reply() - - self.check_iq(iq, """ - - """) - -suite = unittest.TestLoader().loadTestsFromTestCase(TestIqStanzas) diff --git a/tests/test_jid.py b/tests/test_jid.py index cddac424..45047313 100644 --- a/tests/test_jid.py +++ b/tests/test_jid.py @@ -1,6 +1,7 @@ -from . sleektest import * +from sleekxmpp.test import * from sleekxmpp.xmlstream.jid import JID + class TestJIDClass(SleekTest): def testJIDfromfull(self): j = JID('user@someserver/some/resource') @@ -23,4 +24,5 @@ class TestJIDClass(SleekTest): self.assertEqual(j.full, 'user@someserver/some/resource', "Full does not match") self.assertEqual(str(j), 'user@someserver/some/resource', "String does not match") + suite = unittest.TestLoader().loadTestsFromTestCase(TestJIDClass) diff --git a/tests/test_messagestanzas.py b/tests/test_messagestanzas.py deleted file mode 100644 index d57f5ad4..00000000 --- a/tests/test_messagestanzas.py +++ /dev/null @@ -1,57 +0,0 @@ -from . sleektest import * -from sleekxmpp.stanza.message import Message -from sleekxmpp.stanza.htmlim import HTMLIM - - -class TestMessageStanzas(SleekTest): - - def setUp(self): - registerStanzaPlugin(Message, HTMLIM) - - def testGroupchatReplyRegression(self): - "Regression groupchat reply should be to barejid" - msg = self.Message() - msg['to'] = 'me@myserver.tld' - msg['from'] = 'room@someservice.someserver.tld/somenick' - msg['type'] = 'groupchat' - msg['body'] = "this is a message" - msg.reply() - self.failUnless(str(msg['to']) == 'room@someservice.someserver.tld') - - def testAttribProperty(self): - "Test attrib property returning self" - msg = self.Message() - msg.attrib.attrib.attrib['to'] = 'usr@server.tld' - self.failUnless(str(msg['to']) == 'usr@server.tld') - - def testHTMLPlugin(self): - "Test message/html/body stanza" - msg = self.Message() - msg['to'] = "fritzy@netflint.net/sleekxmpp" - msg['body'] = "this is the plaintext message" - msg['type'] = 'chat' - p = ET.Element('{http://www.w3.org/1999/xhtml}p') - p.text = "This is the htmlim message" - msg['html']['body'] = p - self.check_message(msg, """ - - this is the plaintext message - - -

This is the htmlim message

- - -
""") - - def testNickPlugin(self): - "Test message/nick/nick stanza." - msg = self.Message() - msg['nick']['nick'] = 'A nickname!' - self.check_message(msg, """ - - A nickname! - - """) - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestMessageStanzas) diff --git a/tests/test_presencestanzas.py b/tests/test_presencestanzas.py deleted file mode 100644 index 2452df4f..00000000 --- a/tests/test_presencestanzas.py +++ /dev/null @@ -1,67 +0,0 @@ -import sleekxmpp -from . sleektest import * -from sleekxmpp.stanza.presence import Presence - - -class TestPresenceStanzas(SleekTest): - - def testPresenceShowRegression(self): - """Regression check presence['type'] = 'dnd' show value working""" - p = self.Presence() - p['type'] = 'dnd' - self.check_presence(p, "dnd") - - def testPresenceType(self): - """Test manipulating presence['type']""" - p = self.Presence() - p['type'] = 'available' - self.check_presence(p, "") - self.failUnless(p['type'] == 'available', - "Incorrect presence['type'] for type 'available'") - - for showtype in ['away', 'chat', 'dnd', 'xa']: - p['type'] = showtype - self.check_presence(p, """ - %s - """ % showtype) - self.failUnless(p['type'] == showtype, - "Incorrect presence['type'] for type '%s'" % showtype) - - p['type'] = None - self.check_presence(p, "") - - def testPresenceUnsolicitedOffline(self): - """ - Unsolicted offline presence does not spawn changed_status - or update the roster. - """ - p = self.Presence() - p['type'] = 'unavailable' - p['from'] = 'bill@chadmore.com/gmail15af' - - c = sleekxmpp.ClientXMPP('crap@wherever', 'password') - happened = [] - - def handlechangedpresence(event): - happened.append(True) - - c.add_event_handler("changed_status", handlechangedpresence) - c._handle_presence(p) - - self.failUnless(happened == [], - "changed_status event triggered for extra unavailable presence") - self.failUnless(c.roster == {}, - "Roster updated for superfulous unavailable presence") - - def testNickPlugin(self): - """Test presence/nick/nick stanza.""" - p = self.Presence() - p['nick']['nick'] = 'A nickname!' - self.check_presence(p, """ - - A nickname! - - """) - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestPresenceStanzas) diff --git a/tests/test_pubsubstanzas.py b/tests/test_pubsubstanzas.py deleted file mode 100644 index 42a1b52e..00000000 --- a/tests/test_pubsubstanzas.py +++ /dev/null @@ -1,511 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0004 as xep_0004 -import sleekxmpp.plugins.stanza_pubsub as pubsub - - -class TestPubsubStanzas(SleekTest): - - def testAffiliations(self): - "Testing iq/pubsub/affiliations/affiliation stanzas" - iq = self.Iq() - aff1 = pubsub.Affiliation() - aff1['node'] = 'testnode' - aff1['affiliation'] = 'owner' - aff2 = pubsub.Affiliation() - aff2['node'] = 'testnode2' - aff2['affiliation'] = 'publisher' - iq['pubsub']['affiliations'].append(aff1) - iq['pubsub']['affiliations'].append(aff2) - self.check_iq(iq, """ - - - - - - - - """) - - def testSubscriptions(self): - "Testing iq/pubsub/subscriptions/subscription stanzas" - iq = self.Iq() - sub1 = pubsub.Subscription() - sub1['node'] = 'testnode' - sub1['jid'] = 'steve@myserver.tld/someresource' - sub2 = pubsub.Subscription() - sub2['node'] = 'testnode2' - sub2['jid'] = 'boogers@bork.top/bill' - sub2['subscription'] = 'subscribed' - iq['pubsub']['subscriptions'].append(sub1) - iq['pubsub']['subscriptions'].append(sub2) - self.check_iq(iq, """ - - - - - - - - """) - - def testOptionalSettings(self): - "Testing iq/pubsub/subscription/subscribe-options stanzas" - iq = self.Iq() - iq['pubsub']['subscription']['suboptions']['required'] = True - iq['pubsub']['subscription']['node'] = 'testnode alsdkjfas' - iq['pubsub']['subscription']['jid'] = "fritzy@netflint.net/sleekxmpp" - iq['pubsub']['subscription']['subscription'] = 'unconfigured' - self.check_iq(iq, """ - - - - - - - - - """) - - def testItems(self): - "Testing iq/pubsub/items stanzas" - iq = self.Iq() - iq['pubsub']['items']['node'] = 'crap' - payload = ET.fromstring(""" - - - - """) - payload2 = ET.fromstring(""" - - - - """) - item = pubsub.Item() - item['id'] = 'asdf' - item['payload'] = payload - item2 = pubsub.Item() - item2['id'] = 'asdf2' - item2['payload'] = payload2 - iq['pubsub']['items'].append(item) - iq['pubsub']['items'].append(item2) - self.check_iq(iq, """ - - - - - - - - - - - - - - - - - - """) - - def testCreate(self): - "Testing iq/pubsub/create&configure stanzas" - iq = self.Iq() - iq['pubsub']['create']['node'] = 'mynode' - iq['pubsub']['configure']['form'].addField('pubsub#title', - ftype='text-single', - value='This thing is awesome') - self.check_iq(iq, """ - - - - - - - This thing is awesome - - - - - """) - - def testState(self): - "Testing iq/psstate stanzas" - iq = self.Iq() - iq['psstate']['node']= 'mynode' - iq['psstate']['item']= 'myitem' - pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') - iq['psstate']['payload'] = pl - self.check_iq(iq, """ - - - - - """) - - def testDefault(self): - "Testing iq/pubsub_owner/default stanzas" - iq = self.Iq() - iq['pubsub_owner']['default'] - iq['pubsub_owner']['default']['node'] = 'mynode' - iq['pubsub_owner']['default']['type'] = 'leaf' - iq['pubsub_owner']['default']['form'].addField('pubsub#title', - ftype='text-single', - value='This thing is awesome') - self.check_iq(iq, """ - - - - - - This thing is awesome - - - - - """, use_values=False) - - def testSubscribe(self): - "testing iq/pubsub/subscribe stanzas" - iq = self.Iq() - iq['pubsub']['subscribe']['options'] - iq['pubsub']['subscribe']['node'] = 'cheese' - iq['pubsub']['subscribe']['jid'] = 'fritzy@netflint.net/sleekxmpp' - iq['pubsub']['subscribe']['options']['node'] = 'cheese' - iq['pubsub']['subscribe']['options']['jid'] = 'fritzy@netflint.net/sleekxmpp' - form = xep_0004.Form() - form.addField('pubsub#title', ftype='text-single', value='this thing is awesome') - iq['pubsub']['subscribe']['options']['options'] = form - self.check_iq(iq, """ - - - - - - - this thing is awesome - - - - - - """, use_values=False) - - def testPublish(self): - "Testing iq/pubsub/publish stanzas" - iq = self.Iq() - iq['pubsub']['publish']['node'] = 'thingers' - payload = ET.fromstring(""" - - - - """) - payload2 = ET.fromstring(""" - - - - """) - item = pubsub.Item() - item['id'] = 'asdf' - item['payload'] = payload - item2 = pubsub.Item() - item2['id'] = 'asdf2' - item2['payload'] = payload2 - iq['pubsub']['publish'].append(item) - iq['pubsub']['publish'].append(item2) - - self.check_iq(iq, """ - - - - - - - - - - - - - - - - - - """) - - def testDelete(self): - "Testing iq/pubsub_owner/delete stanzas" - iq = self.Iq() - iq['pubsub_owner']['delete']['node'] = 'thingers' - self.check_iq(iq, """ - - - - - """) - - def testCreateConfigGet(self): - """Testing getting config from full create""" - iq = self.Iq() - iq['to'] = 'pubsub.asdf' - iq['from'] = 'fritzy@asdf/87292ede-524d-4117-9076-d934ed3db8e7' - iq['type'] = 'set' - iq['id'] = 'E' - - pub = iq['pubsub'] - pub['create']['node'] = 'testnode2' - pub['configure']['form']['type'] = 'submit' - pub['configure']['form'].setFields([ - ('FORM_TYPE', {'type': 'hidden', - 'value': 'http://jabber.org/protocol/pubsub#node_config'}), - ('pubsub#node_type', {'type': 'list-single', - 'label': 'Select the node type', - 'value': 'leaf'}), - ('pubsub#title', {'type': 'text-single', - 'label': 'A friendly name for the node'}), - ('pubsub#deliver_notifications', {'type': 'boolean', - 'label': 'Deliver event notifications', - 'value': True}), - ('pubsub#deliver_payloads', {'type': 'boolean', - 'label': 'Deliver payloads with event notifications', - 'value': True}), - ('pubsub#notify_config', {'type': 'boolean', - 'label': 'Notify subscribers when the node configuration changes'}), - ('pubsub#notify_delete', {'type': 'boolean', - 'label': 'Notify subscribers when the node is deleted'}), - ('pubsub#notify_retract', {'type': 'boolean', - 'label': 'Notify subscribers when items are removed from the node', - 'value': True}), - ('pubsub#notify_sub', {'type': 'boolean', - 'label': 'Notify owners about new subscribers and unsubscribes'}), - ('pubsub#persist_items', {'type': 'boolean', - 'label': 'Persist items in storage'}), - ('pubsub#max_items', {'type': 'text-single', - 'label': 'Max # of items to persist', - 'value': '10'}), - ('pubsub#subscribe', {'type': 'boolean', - 'label': 'Whether to allow subscriptions', - 'value': True}), - ('pubsub#access_model', {'type': 'list-single', - 'label': 'Specify the subscriber model', - 'value': 'open'}), - ('pubsub#publish_model', {'type': 'list-single', - 'label': 'Specify the publisher model', - 'value': 'publishers'}), - ('pubsub#send_last_published_item', {'type': 'list-single', - 'label': 'Send last published item', - 'value': 'never'}), - ('pubsub#presence_based_delivery', {'type': 'boolean', - 'label': 'Deliver notification only to available users'}), - ]) - - self.check_iq(iq, """ - - - - - - - http://jabber.org/protocol/pubsub#node_config - - - leaf - - - - 1 - - - 1 - - - - - 1 - - - - - 10 - - - 1 - - - open - - - publishers - - - never - - - - - - """) - - def testItemEvent(self): - """Testing message/pubsub_event/items/item""" - msg = self.Message() - item = pubsub.EventItem() - pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) - item['payload'] = pl - item['id'] = 'abc123' - msg['pubsub_event']['items'].append(item) - msg['pubsub_event']['items']['node'] = 'cheese' - msg['type'] = 'normal' - self.check_message(msg, """ - - - - - - - - - """) - - def testItemsEvent(self): - """Testing multiple message/pubsub_event/items/item""" - msg = self.Message() - item = pubsub.EventItem() - item2 = pubsub.EventItem() - pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) - pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'}) - item2['payload'] = pl2 - item['payload'] = pl - item['id'] = 'abc123' - item2['id'] = '123abc' - msg['pubsub_event']['items'].append(item) - msg['pubsub_event']['items'].append(item2) - msg['pubsub_event']['items']['node'] = 'cheese' - msg['type'] = 'normal' - self.check_message(msg, """ - - - - - - - - - - - - """) - - def testItemsEvent(self): - """Testing message/pubsub_event/items/item & retract mix""" - msg = self.Message() - item = pubsub.EventItem() - item2 = pubsub.EventItem() - pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) - pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'}) - item2['payload'] = pl2 - retract = pubsub.EventRetract() - retract['id'] = 'aabbcc' - item['payload'] = pl - item['id'] = 'abc123' - item2['id'] = '123abc' - msg['pubsub_event']['items'].append(item) - msg['pubsub_event']['items'].append(retract) - msg['pubsub_event']['items'].append(item2) - msg['pubsub_event']['items']['node'] = 'cheese' - msg['type'] = 'normal' - self.check_message(msg, """ - - - - - - - - - - - - """) - - def testCollectionAssociate(self): - """Testing message/pubsub_event/collection/associate""" - msg = self.Message() - msg['pubsub_event']['collection']['associate']['node'] = 'cheese' - msg['pubsub_event']['collection']['node'] = 'cheeseburger' - msg['type'] = 'headline' - self.check_message(msg, """ - - - - - - - """) - - def testCollectionDisassociate(self): - """Testing message/pubsub_event/collection/disassociate""" - msg = self.Message() - msg['pubsub_event']['collection']['disassociate']['node'] = 'cheese' - msg['pubsub_event']['collection']['node'] = 'cheeseburger' - msg['type'] = 'headline' - self.check_message(msg, """ - - - - - - - """) - - def testEventConfiguration(self): - """Testing message/pubsub_event/configuration/config""" - msg = self.Message() - msg['pubsub_event']['configuration']['node'] = 'cheese' - msg['pubsub_event']['configuration']['form'].addField('pubsub#title', - ftype='text-single', - value='This thing is awesome') - msg['type'] = 'headline' - self.check_message(msg, """ - - - - - - This thing is awesome - - - - - """) - - def testEventPurge(self): - """Testing message/pubsub_event/purge""" - msg = self.Message() - msg['pubsub_event']['purge']['node'] = 'pickles' - msg['type'] = 'headline' - self.check_message(msg, """ - - - - - """) - - def testEventSubscription(self): - """Testing message/pubsub_event/subscription""" - msg = self.Message() - msg['pubsub_event']['subscription']['node'] = 'pickles' - msg['pubsub_event']['subscription']['jid'] = 'fritzy@netflint.net/test' - msg['pubsub_event']['subscription']['subid'] = 'aabb1122' - msg['pubsub_event']['subscription']['subscription'] = 'subscribed' - msg['pubsub_event']['subscription']['expiry'] = 'presence' - msg['type'] = 'headline' - self.check_message(msg, """ - - - - - """) - -suite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubStanzas) diff --git a/tests/test_roster.py b/tests/test_roster.py deleted file mode 100644 index f210551d..00000000 --- a/tests/test_roster.py +++ /dev/null @@ -1,84 +0,0 @@ -from . sleektest import * -from sleekxmpp.stanza.roster import Roster - - -class TestRosterStanzas(SleekTest): - - def testAddItems(self): - """Test adding items to a roster stanza.""" - iq = self.Iq() - iq['roster'].setItems({ - 'user@example.com': { - 'name': 'User', - 'subscription': 'both', - 'groups': ['Friends', 'Coworkers']}, - 'otheruser@example.com': { - 'name': 'Other User', - 'subscription': 'both', - 'groups': []}}) - self.check_iq(iq, """ - - - - Friends - Coworkers - - - - - """) - - def testGetItems(self): - """Test retrieving items from a roster stanza.""" - xml_string = """ - - - - Friends - Coworkers - - - - - """ - iq = self.Iq(ET.fromstring(xml_string)) - expected = { - 'user@example.com': { - 'name': 'User', - 'subscription': 'both', - 'groups': ['Friends', 'Coworkers']}, - 'otheruser@example.com': { - 'name': 'Other User', - 'subscription': 'both', - 'groups': []}} - debug = "Roster items don't match after retrieval." - debug += "\nReturned: %s" % str(iq['roster']['items']) - debug += "\nExpected: %s" % str(expected) - self.failUnless(iq['roster']['items'] == expected, debug) - - def testDelItems(self): - """Test clearing items from a roster stanza.""" - xml_string = """ - - - - Friends - Coworkers - - - - - """ - iq = self.Iq(ET.fromstring(xml_string)) - del iq['roster']['items'] - self.check_iq(iq, """ - - - - """) - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestRosterStanzas) diff --git a/tests/test_stanza_base.py b/tests/test_stanza_base.py new file mode 100644 index 00000000..9bd326b6 --- /dev/null +++ b/tests/test_stanza_base.py @@ -0,0 +1,79 @@ +from sleekxmpp.test import * +from sleekxmpp.xmlstream.stanzabase import ET, StanzaBase + + +class TestStanzaBase(SleekTest): + + def testTo(self): + """Test the 'to' interface of StanzaBase.""" + stanza = StanzaBase() + stanza['to'] = 'user@example.com' + self.failUnless(str(stanza['to']) == 'user@example.com', + "Setting and retrieving stanza 'to' attribute did not work.") + + def testFrom(self): + """Test the 'from' interface of StanzaBase.""" + stanza = StanzaBase() + stanza['from'] = 'user@example.com' + self.failUnless(str(stanza['from']) == 'user@example.com', + "Setting and retrieving stanza 'from' attribute did not work.") + + def testPayload(self): + """Test the 'payload' interface of StanzaBase.""" + stanza = StanzaBase() + self.failUnless(stanza['payload'] == [], + "Empty stanza does not have an empty payload.") + + stanza['payload'] = ET.Element("{foo}foo") + self.failUnless(len(stanza['payload']) == 1, + "Stanza contents and payload do not match.") + + stanza['payload'] = ET.Element('{bar}bar') + self.failUnless(len(stanza['payload']) == 2, + "Stanza payload was not appended.") + + del stanza['payload'] + self.failUnless(stanza['payload'] == [], + "Stanza payload not cleared after deletion.") + + stanza['payload'] = [ET.Element('{foo}foo'), + ET.Element('{bar}bar')] + self.failUnless(len(stanza['payload']) == 2, + "Adding multiple elements to stanza's payload did not work.") + + def testClear(self): + """Test clearing a stanza.""" + stanza = StanzaBase() + stanza['to'] = 'user@example.com' + stanza['payload'] = ET.Element("{foo}foo") + stanza.clear() + + self.failUnless(stanza['payload'] == [], + "Stanza payload was not cleared after calling .clear()") + self.failUnless(str(stanza['to']) == "user@example.com", + "Stanza attributes were not preserved after calling .clear()") + + def testReply(self): + """Test creating a reply stanza.""" + stanza = StanzaBase() + stanza['to'] = "recipient@example.com" + stanza['from'] = "sender@example.com" + stanza['payload'] = ET.Element("{foo}foo") + + stanza.reply() + + self.failUnless(str(stanza['to'] == "sender@example.com"), + "Stanza reply did not change 'to' attribute.") + self.failUnless(stanza['payload'] == [], + "Stanza reply did not empty stanza payload.") + + def testError(self): + """Test marking a stanza as an error.""" + stanza = StanzaBase() + stanza['type'] = 'get' + stanza.error() + self.failUnless(stanza['type'] == 'error', + "Stanza type is not 'error' after calling error()") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaBase) diff --git a/tests/test_stanza_element.py b/tests/test_stanza_element.py new file mode 100644 index 00000000..c157e0a8 --- /dev/null +++ b/tests/test_stanza_element.py @@ -0,0 +1,660 @@ +from sleekxmpp.test import * +from sleekxmpp.xmlstream.stanzabase import ElementBase + + +class TestElementBase(SleekTest): + + def testFixNs(self): + """Test fixing namespaces in an XPath expression.""" + + e = ElementBase() + ns = "http://jabber.org/protocol/disco#items" + result = e._fix_ns("{%s}foo/bar/{abc}baz/{%s}more" % (ns, ns)) + + expected = "/".join(["{%s}foo" % ns, + "{%s}bar" % ns, + "{abc}baz", + "{%s}more" % ns]) + self.failUnless(expected == result, + "Incorrect namespace fixing result: %s" % str(result)) + + + def testExtendedName(self): + """Test element names of the form tag1/tag2/tag3.""" + + class TestStanza(ElementBase): + name = "foo/bar/baz" + namespace = "test" + + stanza = TestStanza() + self.check_stanza(TestStanza, stanza, """ + + + + + + """) + + def testGetStanzaValues(self): + """Test getStanzaValues using plugins and substanzas.""" + + class TestStanzaPlugin(ElementBase): + name = "foo2" + namespace = "foo" + interfaces = set(('bar', 'baz')) + plugin_attrib = "foo2" + + class TestSubStanza(ElementBase): + name = "subfoo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + subitem = set((TestSubStanza,)) + + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + + stanza = TestStanza() + stanza['bar'] = 'a' + stanza['foo2']['baz'] = 'b' + substanza = TestSubStanza() + substanza['bar'] = 'c' + stanza.append(substanza) + + values = stanza.getStanzaValues() + expected = {'bar': 'a', + 'baz': '', + 'foo2': {'bar': '', + 'baz': 'b'}, + 'substanzas': [{'__childtag__': '{foo}subfoo', + 'bar': 'c', + 'baz': ''}]} + self.failUnless(values == expected, + "Unexpected stanza values:\n%s\n%s" % (str(expected), str(values))) + + + def testSetStanzaValues(self): + """Test using setStanzaValues with substanzas and plugins.""" + + class TestStanzaPlugin(ElementBase): + name = "pluginfoo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + plugin_attrib = "plugin_foo" + + class TestStanzaPlugin2(ElementBase): + name = "pluginfoo2" + namespace = "foo" + interfaces = set(('bar', 'baz')) + plugin_attrib = "plugin_foo2" + + class TestSubStanza(ElementBase): + name = "subfoo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + subitem = set((TestSubStanza,)) + + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + registerStanzaPlugin(TestStanza, TestStanzaPlugin2) + + stanza = TestStanza() + values = {'bar': 'a', + 'baz': '', + 'plugin_foo': {'bar': '', + 'baz': 'b'}, + 'plugin_foo2': {'bar': 'd', + 'baz': 'e'}, + 'substanzas': [{'__childtag__': '{foo}subfoo', + 'bar': 'c', + 'baz': ''}]} + stanza.setStanzaValues(values) + + self.check_stanza(TestStanza, stanza, """ + + + + + + """) + + def testGetItem(self): + """Test accessing stanza interfaces.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz', 'qux')) + sub_interfaces = set(('baz',)) + + def getQux(self): + return 'qux' + + class TestStanzaPlugin(ElementBase): + name = "foobar" + namespace = "foo" + plugin_attrib = "foobar" + interfaces = set(('fizz',)) + + TestStanza.subitem = (TestStanza,) + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + + stanza = TestStanza() + substanza = TestStanza() + stanza.append(substanza) + stanza.setStanzaValues({'bar': 'a', + 'baz': 'b', + 'qux': 42, + 'foobar': {'fizz': 'c'}}) + + # Test non-plugin interfaces + expected = {'substanzas': [substanza], + 'bar': 'a', + 'baz': 'b', + 'qux': 'qux', + 'meh': ''} + for interface, value in expected.items(): + result = stanza[interface] + self.failUnless(result == value, + "Incorrect stanza interface access result: %s" % result) + + # Test plugin interfaces + self.failUnless(isinstance(stanza['foobar'], TestStanzaPlugin), + "Incorrect plugin object result.") + self.failUnless(stanza['foobar']['fizz'] == 'c', + "Incorrect plugin subvalue result.") + + def testSetItem(self): + """Test assigning to stanza interfaces.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz', 'qux')) + sub_interfaces = set(('baz',)) + + def setQux(self, value): + pass + + class TestStanzaPlugin(ElementBase): + name = "foobar" + namespace = "foo" + plugin_attrib = "foobar" + interfaces = set(('foobar',)) + + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + + stanza = TestStanza() + + stanza['bar'] = 'attribute!' + stanza['baz'] = 'element!' + stanza['qux'] = 'overridden' + stanza['foobar'] = 'plugin' + + self.check_stanza(TestStanza, stanza, """ + + element! + + + """) + + def testDelItem(self): + """Test deleting stanza interface values.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz', 'qux')) + sub_interfaces = set(('bar',)) + + def delQux(self): + pass + + class TestStanzaPlugin(ElementBase): + name = "foobar" + namespace = "foo" + plugin_attrib = "foobar" + interfaces = set(('foobar',)) + + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + + stanza = TestStanza() + stanza['bar'] = 'a' + stanza['baz'] = 'b' + stanza['qux'] = 'c' + stanza['foobar']['foobar'] = 'd' + + self.check_stanza(TestStanza, stanza, """ + + a + + + """) + + del stanza['bar'] + del stanza['baz'] + del stanza['qux'] + del stanza['foobar'] + + self.check_stanza(TestStanza, stanza, """ + + """) + + def testModifyingAttributes(self): + """Test modifying top level attributes of a stanza's XML object.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + stanza = TestStanza() + + self.check_stanza(TestStanza, stanza, """ + + """) + + self.failUnless(stanza._getAttr('bar') == '', + "Incorrect value returned for an unset XML attribute.") + + stanza._setAttr('bar', 'a') + stanza._setAttr('baz', 'b') + + self.check_stanza(TestStanza, stanza, """ + + """) + + self.failUnless(stanza._getAttr('bar') == 'a', + "Retrieved XML attribute value is incorrect.") + + stanza._setAttr('bar', None) + stanza._delAttr('baz') + + self.check_stanza(TestStanza, stanza, """ + + """) + + self.failUnless(stanza._getAttr('bar', 'c') == 'c', + "Incorrect default value returned for an unset XML attribute.") + + def testGetSubText(self): + """Test retrieving the contents of a sub element.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar',)) + + def setBar(self, value): + wrapper = ET.Element("{foo}wrapper") + bar = ET.Element("{foo}bar") + bar.text = value + wrapper.append(bar) + self.xml.append(wrapper) + + def getBar(self): + return self._getSubText("wrapper/bar", default="not found") + + stanza = TestStanza() + self.failUnless(stanza['bar'] == 'not found', + "Default _getSubText value incorrect.") + + stanza['bar'] = 'found' + self.check_stanza(TestStanza, stanza, """ + + + found + + + """) + self.failUnless(stanza['bar'] == 'found', + "_getSubText value incorrect: %s." % stanza['bar']) + + def testSubElement(self): + """Test setting the contents of a sub element.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + def setBaz(self, value): + self._setSubText("wrapper/baz", text=value) + + def getBaz(self): + return self._getSubText("wrapper/baz") + + def setBar(self, value): + self._setSubText("wrapper/bar", text=value) + + def getBar(self): + return self._getSubText("wrapper/bar") + + stanza = TestStanza() + stanza['bar'] = 'a' + stanza['baz'] = 'b' + self.check_stanza(TestStanza, stanza, """ + + + a + b + + + """) + stanza._setSubText('wrapper/bar', text='', keep=True) + self.check_stanza(TestStanza, stanza, """ + + + + b + + + """, use_values=False) + + stanza['bar'] = 'a' + stanza._setSubText('wrapper/bar', text='') + self.check_stanza(TestStanza, stanza, """ + + + b + + + """) + + def testDelSub(self): + """Test removing sub elements.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + def setBar(self, value): + self._setSubText("path/to/only/bar", value); + + def getBar(self): + return self._getSubText("path/to/only/bar") + + def delBar(self): + self._delSub("path/to/only/bar") + + def setBaz(self, value): + self._setSubText("path/to/just/baz", value); + + def getBaz(self): + return self._getSubText("path/to/just/baz") + + def delBaz(self): + self._delSub("path/to/just/baz") + + stanza = TestStanza() + stanza['bar'] = 'a' + stanza['baz'] = 'b' + + self.check_stanza(TestStanza, stanza, """ + + + + + a + + + b + + + + + """) + + del stanza['bar'] + del stanza['baz'] + + self.check_stanza(TestStanza, stanza, """ + + + + + + + + + """, use_values=False) + + stanza['bar'] = 'a' + stanza['baz'] = 'b' + + stanza._delSub('path/to/only/bar', all=True) + + self.check_stanza(TestStanza, stanza, """ + + + + + b + + + + + """) + + def testMatch(self): + """Test matching a stanza against an XPath expression.""" + + class TestSubStanza(ElementBase): + name = "sub" + namespace = "baz" + interfaces = set(('attrib',)) + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar','baz', 'qux')) + sub_interfaces = set(('qux',)) + subitem = (TestSubStanza,) + + def setQux(self, value): + self._setSubText('qux', text=value) + + def getQux(self): + return self._getSubText('qux') + + class TestStanzaPlugin(ElementBase): + name = "plugin" + namespace = "http://test/slash/bar" + interfaces = set(('attrib',)) + + registerStanzaPlugin(TestStanza, TestStanzaPlugin) + + stanza = TestStanza() + self.failUnless(stanza.match("foo"), + "Stanza did not match its own tag name.") + + self.failUnless(stanza.match("{foo}foo"), + "Stanza did not match its own namespaced name.") + + stanza['bar'] = 'a' + self.failUnless(stanza.match("foo@bar=a"), + "Stanza did not match its own name with attribute value check.") + + stanza['baz'] = 'b' + self.failUnless(stanza.match("foo@bar=a@baz=b"), + "Stanza did not match its own name with multiple attributes.") + + stanza['qux'] = 'c' + self.failUnless(stanza.match("foo/qux"), + "Stanza did not match with subelements.") + + stanza['qux'] = '' + self.failUnless(stanza.match("foo/qux") == False, + "Stanza matched missing subinterface element.") + + self.failUnless(stanza.match("foo/bar") == False, + "Stanza matched nonexistent element.") + + stanza['plugin']['attrib'] = 'c' + self.failUnless(stanza.match("foo/plugin@attrib=c"), + "Stanza did not match with plugin and attribute.") + + self.failUnless(stanza.match("foo/{http://test/slash/bar}plugin"), + "Stanza did not match with namespaced plugin.") + + substanza = TestSubStanza() + substanza['attrib'] = 'd' + stanza.append(substanza) + self.failUnless(stanza.match("foo/sub@attrib=d"), + "Stanza did not match with substanzas and attribute.") + + self.failUnless(stanza.match("foo/{baz}sub"), + "Stanza did not match with namespaced substanza.") + + def testComparisons(self): + """Test comparing ElementBase objects.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + stanza1 = TestStanza() + stanza1['bar'] = 'a' + + self.failUnless(stanza1, + "Stanza object does not evaluate to True") + + stanza2 = TestStanza() + stanza2['baz'] = 'b' + + self.failUnless(stanza1 != stanza2, + "Different stanza objects incorrectly compared equal.") + + stanza1['baz'] = 'b' + stanza2['bar'] = 'a' + + self.failUnless(stanza1 == stanza2, + "Equal stanzas incorrectly compared inequal.") + + def testKeys(self): + """Test extracting interface names from a stanza object.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + plugin_attrib = 'qux' + + registerStanzaPlugin(TestStanza, TestStanza) + + stanza = TestStanza() + + self.failUnless(set(stanza.keys()) == set(('bar', 'baz')), + "Returned set of interface keys does not match expected.") + + stanza.enable('qux') + + self.failUnless(set(stanza.keys()) == set(('bar', 'baz', 'qux')), + "Incorrect set of interface and plugin keys.") + + def testGet(self): + """Test accessing stanza interfaces using get().""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + stanza = TestStanza() + stanza['bar'] = 'a' + + self.failUnless(stanza.get('bar') == 'a', + "Incorrect value returned by stanza.get") + + self.failUnless(stanza.get('baz', 'b') == 'b', + "Incorrect default value returned by stanza.get") + + def testSubStanzas(self): + """Test manipulating substanzas of a stanza object.""" + + class TestSubStanza(ElementBase): + name = "foobar" + namespace = "foo" + interfaces = set(('qux',)) + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + subitem = (TestSubStanza,) + + stanza = TestStanza() + substanza1 = TestSubStanza() + substanza2 = TestSubStanza() + substanza1['qux'] = 'a' + substanza2['qux'] = 'b' + + # Test appending substanzas + self.failUnless(len(stanza) == 0, + "Incorrect empty stanza size.") + + stanza.append(substanza1) + self.check_stanza(TestStanza, stanza, """ + + + + """) + self.failUnless(len(stanza) == 1, + "Incorrect stanza size with 1 substanza.") + + stanza.append(substanza2) + self.check_stanza(TestStanza, stanza, """ + + + + + """) + self.failUnless(len(stanza) == 2, + "Incorrect stanza size with 2 substanzas.") + + # Test popping substanzas + stanza.pop(0) + self.check_stanza(TestStanza, stanza, """ + + + + """) + + # Test iterating over substanzas + stanza.append(substanza1) + results = [] + for substanza in stanza: + results.append(substanza['qux']) + self.failUnless(results == ['b', 'a'], + "Iteration over substanzas failed: %s." % str(results)) + + def testCopy(self): + """Test copying stanza objects.""" + + class TestStanza(ElementBase): + name = "foo" + namespace = "foo" + interfaces = set(('bar', 'baz')) + + stanza1 = TestStanza() + stanza1['bar'] = 'a' + + stanza2 = stanza1.__copy__() + + self.failUnless(stanza1 == stanza2, + "Copied stanzas are not equal to each other.") + + stanza1['baz'] = 'b' + self.failUnless(stanza1 != stanza2, + "Divergent stanza copies incorrectly compared equal.") + +suite = unittest.TestLoader().loadTestsFromTestCase(TestElementBase) diff --git a/tests/test_stanza_error.py b/tests/test_stanza_error.py new file mode 100644 index 00000000..f76ac62d --- /dev/null +++ b/tests/test_stanza_error.py @@ -0,0 +1,57 @@ +from sleekxmpp.test import * + + +class TestErrorStanzas(SleekTest): + + def testSetup(self): + """Test setting initial values in error stanza.""" + msg = self.Message() + msg.enable('error') + self.check_message(msg, """ + + + + + + """) + + def testCondition(self): + """Test modifying the error condition.""" + msg = self.Message() + msg['error']['condition'] = 'item-not-found' + + self.check_message(msg, """ + + + + + + """) + + self.failUnless(msg['error']['condition'] == 'item-not-found', "Error condition doesn't match.") + + del msg['error']['condition'] + + self.check_message(msg, """ + + + + """) + + def testDelCondition(self): + """Test that deleting error conditions doesn't remove extra elements.""" + msg = self.Message() + msg['error']['text'] = 'Error!' + msg['error']['condition'] = 'internal-server-error' + + del msg['error']['condition'] + + self.check_message(msg, """ + + + Error! + + + """) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestErrorStanzas) diff --git a/tests/test_stanza_gmail.py b/tests/test_stanza_gmail.py new file mode 100644 index 00000000..5c31a4e2 --- /dev/null +++ b/tests/test_stanza_gmail.py @@ -0,0 +1,88 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.gmail_notify as gmail + + +class TestGmail(SleekTest): + + def setUp(self): + registerStanzaPlugin(Iq, gmail.GmailQuery) + registerStanzaPlugin(Iq, gmail.MailBox) + registerStanzaPlugin(Iq, gmail.NewMail) + + def testCreateQuery(self): + """Testing querying Gmail for emails.""" + + iq = self.Iq() + iq['type'] = 'get' + iq['gmail']['search'] = 'is:starred' + iq['gmail']['newer-than-time'] = '1140638252542' + iq['gmail']['newer-than-tid'] = '11134623426430234' + + self.check_iq(iq, """ + + + + """) + + def testMailBox(self): + """Testing reading from Gmail mailbox result""" + + # Use the example from Google's documentation at + # http://code.google.com/apis/talk/jep_extensions/gmail.html#notifications + xml = ET.fromstring(""" + + + + + + + + + act1scene3 + Put thy rapier up. + Ay, ay, a scratch, a scratch; marry, 'tis enough. + + + + """) + + iq = self.Iq(xml=xml) + mailbox = iq['mailbox'] + self.failUnless(mailbox['result-time'] == '1118012394209', "result-time doesn't match") + self.failUnless(mailbox['url'] == 'http://mail.google.com/mail', "url doesn't match") + self.failUnless(mailbox['matched'] == '95', "total-matched incorrect") + self.failUnless(mailbox['estimate'] == False, "total-estimate incorrect") + self.failUnless(len(mailbox['threads']) == 1, "could not extract message threads") + + thread = mailbox['threads'][0] + self.failUnless(thread['tid'] == '1172320964060972012', "thread tid doesn't match") + self.failUnless(thread['participation'] == '1', "thread participation incorrect") + self.failUnless(thread['messages'] == '28', "thread message count incorrect") + self.failUnless(thread['date'] == '1118012394209', "thread date doesn't match") + self.failUnless(thread['url'] == 'http://mail.google.com/mail?view=cv', "thread url doesn't match") + self.failUnless(thread['labels'] == 'act1scene3', "thread labels incorrect") + self.failUnless(thread['subject'] == 'Put thy rapier up.', "thread subject doesn't match") + self.failUnless(thread['snippet'] == "Ay, ay, a scratch, a scratch; marry, 'tis enough.", "snippet doesn't match") + self.failUnless(len(thread['senders']) == 3, "could not extract senders") + + sender1 = thread['senders'][0] + self.failUnless(sender1['name'] == 'Me', "sender name doesn't match") + self.failUnless(sender1['address'] == 'romeo@gmail.com', "sender address doesn't match") + self.failUnless(sender1['originator'] == True, "sender originator incorrect") + self.failUnless(sender1['unread'] == False, "sender unread incorrectly True") + + sender2 = thread['senders'][2] + self.failUnless(sender2['unread'] == True, "sender unread incorrectly False") + +suite = unittest.TestLoader().loadTestsFromTestCase(TestGmail) diff --git a/tests/test_stanza_iq.py b/tests/test_stanza_iq.py new file mode 100644 index 00000000..3f6ce8d7 --- /dev/null +++ b/tests/test_stanza_iq.py @@ -0,0 +1,90 @@ +from sleekxmpp.test import * +from sleekxmpp.xmlstream.stanzabase import ET + + +class TestIqStanzas(SleekTest): + + def tearDown(self): + """Shutdown the XML stream after testing.""" + self.stream_close() + + def testSetup(self): + """Test initializing default Iq values.""" + iq = self.Iq() + self.check_iq(iq, """ + + """) + + def testPayload(self): + """Test setting Iq stanza payload.""" + iq = self.Iq() + iq.setPayload(ET.Element('{test}tester')) + self.check_iq(iq, """ + + + + """, use_values=False) + + + def testUnhandled(self): + """Test behavior for Iq.unhandled.""" + self.stream_start() + self.stream_recv(""" + + + + """) + + iq = self.Iq() + iq['id'] = 'test' + iq['error']['condition'] = 'feature-not-implemented' + iq['error']['text'] = 'No handlers registered for this request.' + + self.stream_send_iq(iq, """ + + + + + No handlers registered for this request. + + + + """) + + def testQuery(self): + """Test modifying query element of Iq stanzas.""" + iq = self.Iq() + + iq['query'] = 'query_ns' + self.check_iq(iq, """ + + + + """) + + iq['query'] = 'query_ns2' + self.check_iq(iq, """ + + + + """) + + self.failUnless(iq['query'] == 'query_ns2', "Query namespace doesn't match") + + del iq['query'] + self.check_iq(iq, """ + + """) + + def testReply(self): + """Test setting proper result type in Iq replies.""" + iq = self.Iq() + iq['to'] = 'user@localhost' + iq['type'] = 'get' + iq.reply() + + self.check_iq(iq, """ + + """) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestIqStanzas) diff --git a/tests/test_stanza_message.py b/tests/test_stanza_message.py new file mode 100644 index 00000000..d4f4d8d0 --- /dev/null +++ b/tests/test_stanza_message.py @@ -0,0 +1,57 @@ +from sleekxmpp.test import * +from sleekxmpp.stanza.message import Message +from sleekxmpp.stanza.htmlim import HTMLIM + + +class TestMessageStanzas(SleekTest): + + def setUp(self): + registerStanzaPlugin(Message, HTMLIM) + + def testGroupchatReplyRegression(self): + "Regression groupchat reply should be to barejid" + msg = self.Message() + msg['to'] = 'me@myserver.tld' + msg['from'] = 'room@someservice.someserver.tld/somenick' + msg['type'] = 'groupchat' + msg['body'] = "this is a message" + msg.reply() + self.failUnless(str(msg['to']) == 'room@someservice.someserver.tld') + + def testAttribProperty(self): + "Test attrib property returning self" + msg = self.Message() + msg.attrib.attrib.attrib['to'] = 'usr@server.tld' + self.failUnless(str(msg['to']) == 'usr@server.tld') + + def testHTMLPlugin(self): + "Test message/html/body stanza" + msg = self.Message() + msg['to'] = "fritzy@netflint.net/sleekxmpp" + msg['body'] = "this is the plaintext message" + msg['type'] = 'chat' + p = ET.Element('{http://www.w3.org/1999/xhtml}p') + p.text = "This is the htmlim message" + msg['html']['body'] = p + self.check_message(msg, """ + + this is the plaintext message + + +

This is the htmlim message

+ + +
""") + + def testNickPlugin(self): + "Test message/nick/nick stanza." + msg = self.Message() + msg['nick']['nick'] = 'A nickname!' + self.check_message(msg, """ + + A nickname! + + """) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestMessageStanzas) diff --git a/tests/test_stanza_presence.py b/tests/test_stanza_presence.py new file mode 100644 index 00000000..ad357ec3 --- /dev/null +++ b/tests/test_stanza_presence.py @@ -0,0 +1,66 @@ +from sleekxmpp.test import * +from sleekxmpp.stanza.presence import Presence + + +class TestPresenceStanzas(SleekTest): + + def testPresenceShowRegression(self): + """Regression check presence['type'] = 'dnd' show value working""" + p = self.Presence() + p['type'] = 'dnd' + self.check_presence(p, "dnd") + + def testPresenceType(self): + """Test manipulating presence['type']""" + p = self.Presence() + p['type'] = 'available' + self.check_presence(p, "") + self.failUnless(p['type'] == 'available', + "Incorrect presence['type'] for type 'available'") + + for showtype in ['away', 'chat', 'dnd', 'xa']: + p['type'] = showtype + self.check_presence(p, """ + %s + """ % showtype) + self.failUnless(p['type'] == showtype, + "Incorrect presence['type'] for type '%s'" % showtype) + + p['type'] = None + self.check_presence(p, "") + + def testPresenceUnsolicitedOffline(self): + """ + Unsolicted offline presence does not spawn changed_status + or update the roster. + """ + p = self.Presence() + p['type'] = 'unavailable' + p['from'] = 'bill@chadmore.com/gmail15af' + + c = sleekxmpp.ClientXMPP('crap@wherever', 'password') + happened = [] + + def handlechangedpresence(event): + happened.append(True) + + c.add_event_handler("changed_status", handlechangedpresence) + c._handle_presence(p) + + self.failUnless(happened == [], + "changed_status event triggered for extra unavailable presence") + self.failUnless(c.roster == {}, + "Roster updated for superfulous unavailable presence") + + def testNickPlugin(self): + """Test presence/nick/nick stanza.""" + p = self.Presence() + p['nick']['nick'] = 'A nickname!' + self.check_presence(p, """ + + A nickname! + + """) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestPresenceStanzas) diff --git a/tests/test_stanza_roster.py b/tests/test_stanza_roster.py new file mode 100644 index 00000000..a0e57987 --- /dev/null +++ b/tests/test_stanza_roster.py @@ -0,0 +1,84 @@ +from sleekxmpp.test import * +from sleekxmpp.stanza.roster import Roster + + +class TestRosterStanzas(SleekTest): + + def testAddItems(self): + """Test adding items to a roster stanza.""" + iq = self.Iq() + iq['roster'].setItems({ + 'user@example.com': { + 'name': 'User', + 'subscription': 'both', + 'groups': ['Friends', 'Coworkers']}, + 'otheruser@example.com': { + 'name': 'Other User', + 'subscription': 'both', + 'groups': []}}) + self.check_iq(iq, """ + + + + Friends + Coworkers + + + + + """) + + def testGetItems(self): + """Test retrieving items from a roster stanza.""" + xml_string = """ + + + + Friends + Coworkers + + + + + """ + iq = self.Iq(ET.fromstring(xml_string)) + expected = { + 'user@example.com': { + 'name': 'User', + 'subscription': 'both', + 'groups': ['Friends', 'Coworkers']}, + 'otheruser@example.com': { + 'name': 'Other User', + 'subscription': 'both', + 'groups': []}} + debug = "Roster items don't match after retrieval." + debug += "\nReturned: %s" % str(iq['roster']['items']) + debug += "\nExpected: %s" % str(expected) + self.failUnless(iq['roster']['items'] == expected, debug) + + def testDelItems(self): + """Test clearing items from a roster stanza.""" + xml_string = """ + + + + Friends + Coworkers + + + + + """ + iq = self.Iq(ET.fromstring(xml_string)) + del iq['roster']['items'] + self.check_iq(iq, """ + + + + """) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestRosterStanzas) diff --git a/tests/test_stanza_xep_0004.py b/tests/test_stanza_xep_0004.py new file mode 100644 index 00000000..835f0dd6 --- /dev/null +++ b/tests/test_stanza_xep_0004.py @@ -0,0 +1,115 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0004 as xep_0004 + + +class TestDataForms(SleekTest): + + def setUp(self): + registerStanzaPlugin(Message, xep_0004.Form) + registerStanzaPlugin(xep_0004.Form, xep_0004.FormField) + registerStanzaPlugin(xep_0004.FormField, xep_0004.FieldOption) + + def testMultipleInstructions(self): + """Testing using multiple instructions elements in a data form.""" + msg = self.Message() + msg['form']['instructions'] = "Instructions\nSecond batch" + + self.check_message(msg, """ + + + Instructions + Second batch + + + """) + + def testAddField(self): + """Testing adding fields to a data form.""" + + msg = self.Message() + form = msg['form'] + form.addField(var='f1', + ftype='text-single', + label='Text', + desc='A text field', + required=True, + value='Some text!') + + self.check_message(msg, """ + + + + A text field + + Some text! + + + + """) + + form['fields'] = [('f1', {'type': 'text-single', + 'label': 'Username', + 'required': True}), + ('f2', {'type': 'text-private', + 'label': 'Password', + 'required': True}), + ('f3', {'type': 'text-multi', + 'label': 'Message', + 'value': 'Enter message.\nA long one even.'}), + ('f4', {'type': 'list-single', + 'label': 'Message Type', + 'options': [{'label': 'Cool!', + 'value': 'cool'}, + {'label': 'Urgh!', + 'value': 'urgh'}]})] + self.check_message(msg, """ + + + + + + + + + + Enter message. + A long one even. + + + + + + + + """) + + def testSetValues(self): + """Testing setting form values""" + + msg = self.Message() + form = msg['form'] + form.setFields([ + ('foo', {'type': 'text-single'}), + ('bar', {'type': 'list-multi'})]) + + form.setValues({'foo': 'Foo!', + 'bar': ['a', 'b']}) + + self.check_message(msg, """ + + + + Foo! + + + a + b + + + """) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestDataForms) diff --git a/tests/test_stanza_xep_0030.py b/tests/test_stanza_xep_0030.py new file mode 100644 index 00000000..bb7ee6a3 --- /dev/null +++ b/tests/test_stanza_xep_0030.py @@ -0,0 +1,176 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0030 as xep_0030 + + +class TestDisco(SleekTest): + + def setUp(self): + registerStanzaPlugin(Iq, xep_0030.DiscoInfo) + registerStanzaPlugin(Iq, xep_0030.DiscoItems) + + def testCreateInfoQueryNoNode(self): + """Testing disco#info query with no node.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = '' + + self.check_iq(iq, """ + + + + """) + + def testCreateInfoQueryWithNode(self): + """Testing disco#info query with a node.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + + self.check_iq(iq, """ + + + + """) + + def testCreateInfoQueryNoNode(self): + """Testing disco#items query with no node.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = '' + + self.check_iq(iq, """ + + + + """) + + def testCreateItemsQueryWithNode(self): + """Testing disco#items query with a node.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = 'foo' + + self.check_iq(iq, """ + + + + """) + + def testInfoIdentities(self): + """Testing adding identities to disco#info.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + iq['disco_info'].addIdentity('conference', 'text', 'Chatroom') + + self.check_iq(iq, """ + + + + + + """) + + def testInfoFeatures(self): + """Testing adding features to disco#info.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_info']['node'] = 'foo' + iq['disco_info'].addFeature('foo') + iq['disco_info'].addFeature('bar') + + self.check_iq(iq, """ + + + + + + + """) + + def testItems(self): + """Testing adding features to disco#info.""" + iq = self.Iq() + iq['id'] = "0" + iq['disco_items']['node'] = 'foo' + iq['disco_items'].addItem('user@localhost') + iq['disco_items'].addItem('user@localhost', 'foo') + iq['disco_items'].addItem('user@localhost', 'bar', 'Testing') + + self.check_iq(iq, """ + + + + + + + + """) + + def testAddRemoveIdentities(self): + """Test adding and removing identities to disco#info stanza""" + ids = [('automation', 'commands', 'AdHoc'), + ('conference', 'text', 'ChatRoom')] + + info = xep_0030.DiscoInfo() + info.addIdentity(*ids[0]) + self.failUnless(info.getIdentities() == [ids[0]]) + + info.delIdentity('automation', 'commands') + self.failUnless(info.getIdentities() == []) + + info.setIdentities(ids) + self.failUnless(info.getIdentities() == ids) + + info.delIdentity('automation', 'commands') + self.failUnless(info.getIdentities() == [ids[1]]) + + info.delIdentities() + self.failUnless(info.getIdentities() == []) + + def testAddRemoveFeatures(self): + """Test adding and removing features to disco#info stanza""" + features = ['foo', 'bar', 'baz'] + + info = xep_0030.DiscoInfo() + info.addFeature(features[0]) + self.failUnless(info.getFeatures() == [features[0]]) + + info.delFeature('foo') + self.failUnless(info.getFeatures() == []) + + info.setFeatures(features) + self.failUnless(info.getFeatures() == features) + + info.delFeature('bar') + self.failUnless(info.getFeatures() == ['foo', 'baz']) + + info.delFeatures() + self.failUnless(info.getFeatures() == []) + + def testAddRemoveItems(self): + """Test adding and removing items to disco#items stanza""" + items = [('user@localhost', None, None), + ('user@localhost', 'foo', None), + ('user@localhost', 'bar', 'Test')] + + info = xep_0030.DiscoItems() + self.failUnless(True, ""+str(items[0])) + + info.addItem(*(items[0])) + self.failUnless(info.getItems() == [items[0]], info.getItems()) + + info.delItem('user@localhost') + self.failUnless(info.getItems() == []) + + info.setItems(items) + self.failUnless(info.getItems() == items) + + info.delItem('user@localhost', 'foo') + self.failUnless(info.getItems() == [items[0], items[2]]) + + info.delItems() + self.failUnless(info.getItems() == []) + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco) diff --git a/tests/test_stanza_xep_0033.py b/tests/test_stanza_xep_0033.py new file mode 100644 index 00000000..90f6374a --- /dev/null +++ b/tests/test_stanza_xep_0033.py @@ -0,0 +1,111 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0033 as xep_0033 + + +class TestAddresses(SleekTest): + + def setUp(self): + registerStanzaPlugin(Message, xep_0033.Addresses) + + def testAddAddress(self): + """Testing adding extended stanza address.""" + msg = self.Message() + msg['addresses'].addAddress(atype='to', jid='to@header1.org') + self.check_message(msg, """ + + +
+ + + """) + + msg = self.Message() + msg['addresses'].addAddress(atype='replyto', + jid='replyto@header1.org', + desc='Reply address') + self.check_message(msg, """ + + +
+ + + """) + + def testAddAddresses(self): + """Testing adding multiple extended stanza addresses.""" + + xmlstring = """ + + +
+
+
+ + + """ + + msg = self.Message() + msg['addresses'].setAddresses([ + {'type':'replyto', + 'jid':'replyto@header1.org', + 'desc':'Reply address'}, + {'type':'cc', + 'jid':'cc@header2.org'}, + {'type':'bcc', + 'jid':'bcc@header2.org'}]) + self.check_message(msg, xmlstring) + + msg = self.Message() + msg['addresses']['replyto'] = [{'jid':'replyto@header1.org', + 'desc':'Reply address'}] + msg['addresses']['cc'] = [{'jid':'cc@header2.org'}] + msg['addresses']['bcc'] = [{'jid':'bcc@header2.org'}] + self.check_message(msg, xmlstring) + + def testAddURI(self): + """Testing adding URI attribute to extended stanza address.""" + + msg = self.Message() + addr = msg['addresses'].addAddress(atype='to', + jid='to@header1.org', + node='foo') + self.check_message(msg, """ + + +
+ + + """) + + addr['uri'] = 'mailto:to@header2.org' + self.check_message(msg, """ + + +
+ + + """) + + def testDelivered(self): + """Testing delivered attribute of extended stanza addresses.""" + + xmlstring = """ + + +
+ + + """ + + msg = self.Message() + addr = msg['addresses'].addAddress(jid='to@header1.org', atype='to') + self.check_message(msg, xmlstring % '') + + addr['delivered'] = True + self.check_message(msg, xmlstring % 'delivered="true"') + + addr['delivered'] = False + self.check_message(msg, xmlstring % '') + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestAddresses) diff --git a/tests/test_stanza_xep_0060.py b/tests/test_stanza_xep_0060.py new file mode 100644 index 00000000..32ee37a1 --- /dev/null +++ b/tests/test_stanza_xep_0060.py @@ -0,0 +1,511 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0004 as xep_0004 +import sleekxmpp.plugins.stanza_pubsub as pubsub + + +class TestPubsubStanzas(SleekTest): + + def testAffiliations(self): + "Testing iq/pubsub/affiliations/affiliation stanzas" + iq = self.Iq() + aff1 = pubsub.Affiliation() + aff1['node'] = 'testnode' + aff1['affiliation'] = 'owner' + aff2 = pubsub.Affiliation() + aff2['node'] = 'testnode2' + aff2['affiliation'] = 'publisher' + iq['pubsub']['affiliations'].append(aff1) + iq['pubsub']['affiliations'].append(aff2) + self.check_iq(iq, """ + + + + + + + + """) + + def testSubscriptions(self): + "Testing iq/pubsub/subscriptions/subscription stanzas" + iq = self.Iq() + sub1 = pubsub.Subscription() + sub1['node'] = 'testnode' + sub1['jid'] = 'steve@myserver.tld/someresource' + sub2 = pubsub.Subscription() + sub2['node'] = 'testnode2' + sub2['jid'] = 'boogers@bork.top/bill' + sub2['subscription'] = 'subscribed' + iq['pubsub']['subscriptions'].append(sub1) + iq['pubsub']['subscriptions'].append(sub2) + self.check_iq(iq, """ + + + + + + + + """) + + def testOptionalSettings(self): + "Testing iq/pubsub/subscription/subscribe-options stanzas" + iq = self.Iq() + iq['pubsub']['subscription']['suboptions']['required'] = True + iq['pubsub']['subscription']['node'] = 'testnode alsdkjfas' + iq['pubsub']['subscription']['jid'] = "fritzy@netflint.net/sleekxmpp" + iq['pubsub']['subscription']['subscription'] = 'unconfigured' + self.check_iq(iq, """ + + + + + + + + + """) + + def testItems(self): + "Testing iq/pubsub/items stanzas" + iq = self.Iq() + iq['pubsub']['items']['node'] = 'crap' + payload = ET.fromstring(""" + + + + """) + payload2 = ET.fromstring(""" + + + + """) + item = pubsub.Item() + item['id'] = 'asdf' + item['payload'] = payload + item2 = pubsub.Item() + item2['id'] = 'asdf2' + item2['payload'] = payload2 + iq['pubsub']['items'].append(item) + iq['pubsub']['items'].append(item2) + self.check_iq(iq, """ + + + + + + + + + + + + + + + + + + """) + + def testCreate(self): + "Testing iq/pubsub/create&configure stanzas" + iq = self.Iq() + iq['pubsub']['create']['node'] = 'mynode' + iq['pubsub']['configure']['form'].addField('pubsub#title', + ftype='text-single', + value='This thing is awesome') + self.check_iq(iq, """ + + + + + + + This thing is awesome + + + + + """) + + def testState(self): + "Testing iq/psstate stanzas" + iq = self.Iq() + iq['psstate']['node']= 'mynode' + iq['psstate']['item']= 'myitem' + pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed') + iq['psstate']['payload'] = pl + self.check_iq(iq, """ + + + + + """) + + def testDefault(self): + "Testing iq/pubsub_owner/default stanzas" + iq = self.Iq() + iq['pubsub_owner']['default'] + iq['pubsub_owner']['default']['node'] = 'mynode' + iq['pubsub_owner']['default']['type'] = 'leaf' + iq['pubsub_owner']['default']['form'].addField('pubsub#title', + ftype='text-single', + value='This thing is awesome') + self.check_iq(iq, """ + + + + + + This thing is awesome + + + + + """, use_values=False) + + def testSubscribe(self): + "testing iq/pubsub/subscribe stanzas" + iq = self.Iq() + iq['pubsub']['subscribe']['options'] + iq['pubsub']['subscribe']['node'] = 'cheese' + iq['pubsub']['subscribe']['jid'] = 'fritzy@netflint.net/sleekxmpp' + iq['pubsub']['subscribe']['options']['node'] = 'cheese' + iq['pubsub']['subscribe']['options']['jid'] = 'fritzy@netflint.net/sleekxmpp' + form = xep_0004.Form() + form.addField('pubsub#title', ftype='text-single', value='this thing is awesome') + iq['pubsub']['subscribe']['options']['options'] = form + self.check_iq(iq, """ + + + + + + + this thing is awesome + + + + + + """, use_values=False) + + def testPublish(self): + "Testing iq/pubsub/publish stanzas" + iq = self.Iq() + iq['pubsub']['publish']['node'] = 'thingers' + payload = ET.fromstring(""" + + + + """) + payload2 = ET.fromstring(""" + + + + """) + item = pubsub.Item() + item['id'] = 'asdf' + item['payload'] = payload + item2 = pubsub.Item() + item2['id'] = 'asdf2' + item2['payload'] = payload2 + iq['pubsub']['publish'].append(item) + iq['pubsub']['publish'].append(item2) + + self.check_iq(iq, """ + + + + + + + + + + + + + + + + + + """) + + def testDelete(self): + "Testing iq/pubsub_owner/delete stanzas" + iq = self.Iq() + iq['pubsub_owner']['delete']['node'] = 'thingers' + self.check_iq(iq, """ + + + + + """) + + def testCreateConfigGet(self): + """Testing getting config from full create""" + iq = self.Iq() + iq['to'] = 'pubsub.asdf' + iq['from'] = 'fritzy@asdf/87292ede-524d-4117-9076-d934ed3db8e7' + iq['type'] = 'set' + iq['id'] = 'E' + + pub = iq['pubsub'] + pub['create']['node'] = 'testnode2' + pub['configure']['form']['type'] = 'submit' + pub['configure']['form'].setFields([ + ('FORM_TYPE', {'type': 'hidden', + 'value': 'http://jabber.org/protocol/pubsub#node_config'}), + ('pubsub#node_type', {'type': 'list-single', + 'label': 'Select the node type', + 'value': 'leaf'}), + ('pubsub#title', {'type': 'text-single', + 'label': 'A friendly name for the node'}), + ('pubsub#deliver_notifications', {'type': 'boolean', + 'label': 'Deliver event notifications', + 'value': True}), + ('pubsub#deliver_payloads', {'type': 'boolean', + 'label': 'Deliver payloads with event notifications', + 'value': True}), + ('pubsub#notify_config', {'type': 'boolean', + 'label': 'Notify subscribers when the node configuration changes'}), + ('pubsub#notify_delete', {'type': 'boolean', + 'label': 'Notify subscribers when the node is deleted'}), + ('pubsub#notify_retract', {'type': 'boolean', + 'label': 'Notify subscribers when items are removed from the node', + 'value': True}), + ('pubsub#notify_sub', {'type': 'boolean', + 'label': 'Notify owners about new subscribers and unsubscribes'}), + ('pubsub#persist_items', {'type': 'boolean', + 'label': 'Persist items in storage'}), + ('pubsub#max_items', {'type': 'text-single', + 'label': 'Max # of items to persist', + 'value': '10'}), + ('pubsub#subscribe', {'type': 'boolean', + 'label': 'Whether to allow subscriptions', + 'value': True}), + ('pubsub#access_model', {'type': 'list-single', + 'label': 'Specify the subscriber model', + 'value': 'open'}), + ('pubsub#publish_model', {'type': 'list-single', + 'label': 'Specify the publisher model', + 'value': 'publishers'}), + ('pubsub#send_last_published_item', {'type': 'list-single', + 'label': 'Send last published item', + 'value': 'never'}), + ('pubsub#presence_based_delivery', {'type': 'boolean', + 'label': 'Deliver notification only to available users'}), + ]) + + self.check_iq(iq, """ + + + + + + + http://jabber.org/protocol/pubsub#node_config + + + leaf + + + + 1 + + + 1 + + + + + 1 + + + + + 10 + + + 1 + + + open + + + publishers + + + never + + + + + + """) + + def testItemEvent(self): + """Testing message/pubsub_event/items/item""" + msg = self.Message() + item = pubsub.EventItem() + pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) + item['payload'] = pl + item['id'] = 'abc123' + msg['pubsub_event']['items'].append(item) + msg['pubsub_event']['items']['node'] = 'cheese' + msg['type'] = 'normal' + self.check_message(msg, """ + + + + + + + + + """) + + def testItemsEvent(self): + """Testing multiple message/pubsub_event/items/item""" + msg = self.Message() + item = pubsub.EventItem() + item2 = pubsub.EventItem() + pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) + pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'}) + item2['payload'] = pl2 + item['payload'] = pl + item['id'] = 'abc123' + item2['id'] = '123abc' + msg['pubsub_event']['items'].append(item) + msg['pubsub_event']['items'].append(item2) + msg['pubsub_event']['items']['node'] = 'cheese' + msg['type'] = 'normal' + self.check_message(msg, """ + + + + + + + + + + + + """) + + def testItemsEvent(self): + """Testing message/pubsub_event/items/item & retract mix""" + msg = self.Message() + item = pubsub.EventItem() + item2 = pubsub.EventItem() + pl = ET.Element('{http://netflint.net/protocol/test}test', {'failed':'3', 'passed':'24'}) + pl2 = ET.Element('{http://netflint.net/protocol/test-other}test', {'total':'27', 'failed':'3'}) + item2['payload'] = pl2 + retract = pubsub.EventRetract() + retract['id'] = 'aabbcc' + item['payload'] = pl + item['id'] = 'abc123' + item2['id'] = '123abc' + msg['pubsub_event']['items'].append(item) + msg['pubsub_event']['items'].append(retract) + msg['pubsub_event']['items'].append(item2) + msg['pubsub_event']['items']['node'] = 'cheese' + msg['type'] = 'normal' + self.check_message(msg, """ + + + + + + + + + + + + """) + + def testCollectionAssociate(self): + """Testing message/pubsub_event/collection/associate""" + msg = self.Message() + msg['pubsub_event']['collection']['associate']['node'] = 'cheese' + msg['pubsub_event']['collection']['node'] = 'cheeseburger' + msg['type'] = 'headline' + self.check_message(msg, """ + + + + + + + """) + + def testCollectionDisassociate(self): + """Testing message/pubsub_event/collection/disassociate""" + msg = self.Message() + msg['pubsub_event']['collection']['disassociate']['node'] = 'cheese' + msg['pubsub_event']['collection']['node'] = 'cheeseburger' + msg['type'] = 'headline' + self.check_message(msg, """ + + + + + + + """) + + def testEventConfiguration(self): + """Testing message/pubsub_event/configuration/config""" + msg = self.Message() + msg['pubsub_event']['configuration']['node'] = 'cheese' + msg['pubsub_event']['configuration']['form'].addField('pubsub#title', + ftype='text-single', + value='This thing is awesome') + msg['type'] = 'headline' + self.check_message(msg, """ + + + + + + This thing is awesome + + + + + """) + + def testEventPurge(self): + """Testing message/pubsub_event/purge""" + msg = self.Message() + msg['pubsub_event']['purge']['node'] = 'pickles' + msg['type'] = 'headline' + self.check_message(msg, """ + + + + + """) + + def testEventSubscription(self): + """Testing message/pubsub_event/subscription""" + msg = self.Message() + msg['pubsub_event']['subscription']['node'] = 'pickles' + msg['pubsub_event']['subscription']['jid'] = 'fritzy@netflint.net/test' + msg['pubsub_event']['subscription']['subid'] = 'aabb1122' + msg['pubsub_event']['subscription']['subscription'] = 'subscribed' + msg['pubsub_event']['subscription']['expiry'] = 'presence' + msg['type'] = 'headline' + self.check_message(msg, """ + + + + + """) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubStanzas) diff --git a/tests/test_stanza_xep_0085.py b/tests/test_stanza_xep_0085.py new file mode 100644 index 00000000..5e63530f --- /dev/null +++ b/tests/test_stanza_xep_0085.py @@ -0,0 +1,44 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0085 as xep_0085 + +class TestChatStates(SleekTest): + + def setUp(self): + registerStanzaPlugin(Message, xep_0085.Active) + registerStanzaPlugin(Message, xep_0085.Composing) + registerStanzaPlugin(Message, xep_0085.Gone) + registerStanzaPlugin(Message, xep_0085.Inactive) + registerStanzaPlugin(Message, xep_0085.Paused) + + def testCreateChatState(self): + """Testing creating chat states.""" + + xmlstring = """ + + <%s xmlns="http://jabber.org/protocol/chatstates" /> + + """ + + msg = self.Message() + msg['chat_state'].active() + self.check_message(msg, xmlstring % 'active', + use_values=False) + + msg['chat_state'].composing() + self.check_message(msg, xmlstring % 'composing', + use_values=False) + + + msg['chat_state'].gone() + self.check_message(msg, xmlstring % 'gone', + use_values=False) + + msg['chat_state'].inactive() + self.check_message(msg, xmlstring % 'inactive', + use_values=False) + + msg['chat_state'].paused() + self.check_message(msg, xmlstring % 'paused', + use_values=False) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestChatStates) diff --git a/tests/test_stanzabase.py b/tests/test_stanzabase.py deleted file mode 100644 index 682068d9..00000000 --- a/tests/test_stanzabase.py +++ /dev/null @@ -1,79 +0,0 @@ -from . sleektest import * -import sleekxmpp -from sleekxmpp.xmlstream.stanzabase import ET, StanzaBase - -class TestStanzaBase(SleekTest): - - def testTo(self): - """Test the 'to' interface of StanzaBase.""" - stanza = StanzaBase() - stanza['to'] = 'user@example.com' - self.failUnless(str(stanza['to']) == 'user@example.com', - "Setting and retrieving stanza 'to' attribute did not work.") - - def testFrom(self): - """Test the 'from' interface of StanzaBase.""" - stanza = StanzaBase() - stanza['from'] = 'user@example.com' - self.failUnless(str(stanza['from']) == 'user@example.com', - "Setting and retrieving stanza 'from' attribute did not work.") - - def testPayload(self): - """Test the 'payload' interface of StanzaBase.""" - stanza = StanzaBase() - self.failUnless(stanza['payload'] == [], - "Empty stanza does not have an empty payload.") - - stanza['payload'] = ET.Element("{foo}foo") - self.failUnless(len(stanza['payload']) == 1, - "Stanza contents and payload do not match.") - - stanza['payload'] = ET.Element('{bar}bar') - self.failUnless(len(stanza['payload']) == 2, - "Stanza payload was not appended.") - - del stanza['payload'] - self.failUnless(stanza['payload'] == [], - "Stanza payload not cleared after deletion.") - - stanza['payload'] = [ET.Element('{foo}foo'), - ET.Element('{bar}bar')] - self.failUnless(len(stanza['payload']) == 2, - "Adding multiple elements to stanza's payload did not work.") - - def testClear(self): - """Test clearing a stanza.""" - stanza = StanzaBase() - stanza['to'] = 'user@example.com' - stanza['payload'] = ET.Element("{foo}foo") - stanza.clear() - - self.failUnless(stanza['payload'] == [], - "Stanza payload was not cleared after calling .clear()") - self.failUnless(str(stanza['to']) == "user@example.com", - "Stanza attributes were not preserved after calling .clear()") - - def testReply(self): - """Test creating a reply stanza.""" - stanza = StanzaBase() - stanza['to'] = "recipient@example.com" - stanza['from'] = "sender@example.com" - stanza['payload'] = ET.Element("{foo}foo") - - stanza.reply() - - self.failUnless(str(stanza['to'] == "sender@example.com"), - "Stanza reply did not change 'to' attribute.") - self.failUnless(stanza['payload'] == [], - "Stanza reply did not empty stanza payload.") - - def testError(self): - """Test marking a stanza as an error.""" - stanza = StanzaBase() - stanza['type'] = 'get' - stanza.error() - self.failUnless(stanza['type'] == 'error', - "Stanza type is not 'error' after calling error()") - - -suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaBase) diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 00000000..3fbf86e6 --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,60 @@ +from sleekxmpp.test import * +import sleekxmpp.plugins.xep_0033 as xep_0033 + + +class TestStreamTester(SleekTest): + """ + Test that we can simulate and test a stanza stream. + """ + + def tearDown(self): + self.stream_close() + + def testClientEcho(self): + """Test that we can interact with a ClientXMPP instance.""" + self.stream_start(mode='client') + + def echo(msg): + msg.reply('Thanks for sending: %(body)s' % msg).send() + + self.xmpp.add_event_handler('message', echo) + + self.stream_recv(""" + + Hi! + + """) + + self.stream_send_message(""" + + Thanks for sending: Hi! + + """) + + def testComponentEcho(self): + """Test that we can interact with a ComponentXMPP instance.""" + self.stream_start(mode='component') + + def echo(msg): + msg.reply('Thanks for sending: %(body)s' % msg).send() + + self.xmpp.add_event_handler('message', echo) + + self.stream_recv(""" + + Hi! + + """) + + self.stream_send_message(""" + + Thanks for sending: Hi! + + """) + + def testSendStreamHeader(self): + """Test that we can check a sent stream header.""" + self.stream_start(mode='client', skip=False) + self.stream_send_header(sto='localhost') + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester) diff --git a/tests/test_stream_handlers.py b/tests/test_stream_handlers.py new file mode 100644 index 00000000..8c05572a --- /dev/null +++ b/tests/test_stream_handlers.py @@ -0,0 +1,112 @@ +from sleekxmpp.test import * +from sleekxmpp.xmlstream.handler import * +from sleekxmpp.xmlstream.matcher import * + + +class TestHandlers(SleekTest): + """ + Test using handlers and waiters. + """ + + def setUp(self): + self.stream_start() + + def tearDown(self): + self.stream_close() + + def testCallback(self): + """Test using stream callback handlers.""" + + def callback_handler(stanza): + self.xmpp.sendRaw(""" + + Success! + + """) + + callback = Callback('Test Callback', + MatchXPath('{test}tester'), + callback_handler) + + self.xmpp.registerHandler(callback) + + self.stream_recv("""""") + + msg = self.Message() + msg['body'] = 'Success!' + self.stream_send_message(msg) + + def testWaiter(self): + """Test using stream waiter handler.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + reply = iq.send(block=True) + if reply: + self.xmpp.sendRaw(""" + + Successful: %s + + """ % reply['query']) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Send message to trigger waiter_handler + self.stream_recv(""" + + Testing + + """) + + # Check that Iq was sent by waiter_handler + iq = self.Iq() + iq['id'] = 'test' + iq['type'] = 'set' + iq['query'] = 'test' + self.stream_send_iq(iq) + + # Send the reply Iq + self.stream_recv(""" + + + + """) + + # Check that waiter_handler received the reply + msg = self.Message() + msg['body'] = 'Successful: test' + self.stream_send_message(msg) + + def testWaiterTimeout(self): + """Test that waiter handler is removed after timeout.""" + + def waiter_handler(stanza): + iq = self.xmpp.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + reply = iq.send(block=True, timeout=0) + + self.xmpp.add_event_handler('message', waiter_handler, threaded=True) + + # Start test by triggerig waiter_handler + self.stream_recv("""Start Test""") + + # Check that Iq was sent to trigger start of timeout period + iq = self.Iq() + iq['id'] = 'test2' + iq['type'] = 'set' + iq['query'] = 'test2' + self.stream_send_iq(iq) + + # Check that the waiter is no longer registered + waiter_exists = self.xmpp.removeHandler('IqWait_test2') + + self.failUnless(waiter_exists == False, + "Waiter handler was not removed.") + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestHandlers) diff --git a/tests/test_streamtester.py b/tests/test_streamtester.py deleted file mode 100644 index 9bc9de67..00000000 --- a/tests/test_streamtester.py +++ /dev/null @@ -1,60 +0,0 @@ -from . sleektest import * -import sleekxmpp.plugins.xep_0033 as xep_0033 - - -class TestStreamTester(SleekTest): - """ - Test that we can simulate and test a stanza stream. - """ - - def tearDown(self): - self.stream_close() - - def testClientEcho(self): - """Test that we can interact with a ClientXMPP instance.""" - self.stream_start(mode='client') - - def echo(msg): - msg.reply('Thanks for sending: %(body)s' % msg).send() - - self.xmpp.add_event_handler('message', echo) - - self.stream_recv(""" - - Hi! - - """) - - self.stream_send_message(""" - - Thanks for sending: Hi! - - """) - - def testComponentEcho(self): - """Test that we can interact with a ComponentXMPP instance.""" - self.stream_start(mode='component') - - def echo(msg): - msg.reply('Thanks for sending: %(body)s' % msg).send() - - self.xmpp.add_event_handler('message', echo) - - self.stream_recv(""" - - Hi! - - """) - - self.stream_send_message(""" - - Thanks for sending: Hi! - - """) - - def testSendStreamHeader(self): - """Test that we can check a sent stream header.""" - self.stream_start(mode='client', skip=False) - self.streamSendHeader(sto='localhost') - -suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester) diff --git a/tests/test_tostring.py b/tests/test_tostring.py index 2999949a..45f405ce 100644 --- a/tests/test_tostring.py +++ b/tests/test_tostring.py @@ -1,4 +1,4 @@ -from . sleektest import * +from sleekxmpp.test import * from sleekxmpp.stanza import Message from sleekxmpp.xmlstream.stanzabase import ET from sleekxmpp.xmlstream.tostring import tostring, xml_escape -- cgit v1.2.3