diff options
Diffstat (limited to 'tests/sleektest.py')
-rw-r--r-- | tests/sleektest.py | 650 |
1 files changed, 0 insertions, 650 deletions
diff --git a/tests/sleektest.py b/tests/sleektest.py deleted file mode 100644 index 28901877..00000000 --- a/tests/sleektest.py +++ /dev/null @@ -1,650 +0,0 @@ -""" - SleekXMPP: The Sleek XMPP Library - Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout - This file is part of SleekXMPP. - - See the file LICENSE for copying permission. -""" - -import unittest -import socket -try: - import queue -except ImportError: - import Queue as queue - -import sleekxmpp -from sleekxmpp import ClientXMPP, ComponentXMPP -from sleekxmpp.stanza import Message, Iq, Presence -from sleekxmpp.xmlstream.stanzabase import registerStanzaPlugin, ET -from sleekxmpp.xmlstream.tostring import tostring - - -class TestSocket(object): - - """ - A dummy socket that reads and writes to queues instead - of an actual networking socket. - - Methods: - next_sent -- Return the next sent stanza. - recv_data -- Make a stanza available to read next. - recv -- Read the next stanza from the socket. - send -- Write a stanza to the socket. - makefile -- Dummy call, returns self. - read -- Read the next stanza from the socket. - """ - - def __init__(self, *args, **kwargs): - """ - Create a new test socket. - - Arguments: - Same as arguments for socket.socket - """ - self.socket = socket.socket(*args, **kwargs) - self.recv_queue = queue.Queue() - self.send_queue = queue.Queue() - - def __getattr__(self, name): - """ - Return attribute values of internal, dummy socket. - - Some attributes and methods are disabled to prevent the - socket from connecting to the network. - - Arguments: - name -- Name of the attribute requested. - """ - - def dummy(*args): - """Method to do nothing and prevent actual socket connections.""" - return None - - overrides = {'connect': dummy, - 'close': dummy, - 'shutdown': dummy} - - return overrides.get(name, getattr(self.socket, name)) - - # ------------------------------------------------------------------ - # Testing Interface - - def next_sent(self, timeout=None): - """ - Get the next stanza that has been 'sent'. - - Arguments: - timeout -- Optional timeout for waiting for a new value. - """ - args = {'block': False} - if timeout is not None: - args = {'block': True, 'timeout': timeout} - try: - return self.send_queue.get(**args) - except: - return None - - def recv_data(self, data): - """ - Add data to the receiving queue. - - Arguments: - data -- String data to 'write' to the socket to be received - by the XMPP client. - """ - self.recv_queue.put(data) - - # ------------------------------------------------------------------ - # Socket Interface - - def recv(self, *args, **kwargs): - """ - Read a value from the received queue. - - Arguments: - Placeholders. Same as for socket.Socket.recv. - """ - return self.read(block=True) - - def send(self, data): - """ - Send data by placing it in the send queue. - - Arguments: - data -- String value to write. - """ - self.send_queue.put(data) - - # ------------------------------------------------------------------ - # File Socket - - def makefile(self, *args, **kwargs): - """ - File socket version to use with ElementTree. - - Arguments: - Placeholders, same as socket.Socket.makefile() - """ - return self - - def read(self, block=True, timeout=None, **kwargs): - """ - Implement the file socket interface. - - Arguments: - block -- Indicate if the read should block until a - value is ready. - timeout -- Time in seconds a block should last before - returning None. - """ - if timeout is not None: - block = True - try: - return self.recv_queue.get(block, timeout) - except: - return None - - -class SleekTest(unittest.TestCase): - - """ - A SleekXMPP specific TestCase class that provides - methods for comparing message, iq, and presence stanzas. - - Methods: - Message -- Create a Message stanza object. - Iq -- Create an Iq stanza object. - Presence -- Create a Presence stanza object. - check_stanza -- Compare a generic stanza against an XML string. - check_message -- Compare a Message stanza against an XML string. - check_iq -- Compare an Iq stanza against an XML string. - check_presence -- Compare a Presence stanza against an XML string. - stream_start -- Initialize a dummy XMPP client. - stream_recv -- Queue data for XMPP client to receive. - stream_make_header -- Create a stream header. - stream_send_header -- Check that the given header has been sent. - stream_send_message -- Check that the XMPP client sent the given - Message stanza. - stream_send_iq -- Check that the XMPP client sent the given - Iq stanza. - stream_send_presence -- Check thatt the XMPP client sent the given - Presence stanza. - stream_send_stanza -- Check that the XMPP client sent the given - generic stanza. - stream_close -- Disconnect the XMPP client. - fix_namespaces -- Add top-level namespace to an XML object. - compare -- Compare XML objects against each other. - """ - - # ------------------------------------------------------------------ - # Shortcut methods for creating stanza objects - - def Message(self, *args, **kwargs): - """ - Create a Message stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Message's values. - """ - return Message(None, *args, **kwargs) - - def Iq(self, *args, **kwargs): - """ - Create an Iq stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Iq's values. - """ - return Iq(None, *args, **kwargs) - - def Presence(self, *args, **kwargs): - """ - Create a Presence stanza. - - Uses same arguments as StanzaBase.__init__ - - Arguments: - xml -- An XML object to use for the Iq's values. - """ - return Presence(None, *args, **kwargs) - - # ------------------------------------------------------------------ - # Methods for comparing stanza objects to XML strings - - def check_stanza(self, stanza_class, stanza, xml_string, - defaults=None, use_values=True): - """ - Create and compare several stanza objects to a correct XML string. - - If use_values is False, test using getStanzaValues() and - setStanzaValues() will not be used. - - Some stanzas provide default values for some interfaces, but - these defaults can be problematic for testing since they can easily - be forgotten when supplying the XML string. A list of interfaces that - use defaults may be provided and the generated stanzas will use the - default values for those interfaces if needed. - - However, correcting the supplied XML is not possible for interfaces - that add or remove XML elements. Only interfaces that map to XML - attributes may be set using the defaults parameter. The supplied XML - must take into account any extra elements that are included by default. - - Arguments: - stanza_class -- The class of the stanza being tested. - stanza -- The stanza object to test. - xml_string -- A string version of the correct XML expected. - defaults -- A list of stanza interfaces that have default - values. These interfaces will be set to their - defaults for the given and generated stanzas to - prevent unexpected test failures. - use_values -- Indicates if testing using getStanzaValues() and - setStanzaValues() should be used. Defaults to - True. - """ - xml = ET.fromstring(xml_string) - - # Ensure that top level namespaces are used, even if they - # were not provided. - self.fix_namespaces(stanza.xml, 'jabber:client') - self.fix_namespaces(xml, 'jabber:client') - - stanza2 = stanza_class(xml=xml) - - if use_values: - # Using getStanzaValues() and setStanzaValues() will add - # XML for any interface that has a default value. We need - # to set those defaults on the existing stanzas and XML - # so that they will compare correctly. - default_stanza = stanza_class() - if defaults is None: - defaults = [] - for interface in defaults: - stanza[interface] = stanza[interface] - stanza2[interface] = stanza2[interface] - # Can really only automatically add defaults for top - # level attribute values. Anything else must be accounted - # for in the provided XML string. - if interface not in xml.attrib: - if interface in default_stanza.xml.attrib: - value = default_stanza.xml.attrib[interface] - xml.attrib[interface] = value - - values = stanza2.getStanzaValues() - stanza3 = stanza_class() - stanza3.setStanzaValues(values) - - debug = "Three methods for creating stanzas do not match.\n" - debug += "Given XML:\n%s\n" % tostring(xml) - debug += "Given stanza:\n%s\n" % tostring(stanza.xml) - debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml) - debug += "Second generated stanza:\n%s\n" % tostring(stanza3.xml) - result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml) - else: - debug = "Two methods for creating stanzas do not match.\n" - debug += "Given XML:\n%s\n" % tostring(xml) - debug += "Given stanza:\n%s\n" % tostring(stanza.xml) - debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml) - result = self.compare(xml, stanza.xml, stanza2.xml) - - self.failUnless(result, debug) - - def check_message(self, msg, xml_string, use_values=True): - """ - Create and compare several message stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - msg -- The Message stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - - return self.check_stanza(Message, msg, xml_string, - defaults=['type'], - use_values=use_values) - - def check_iq(self, iq, xml_string, use_values=True): - """ - Create and compare several iq stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - iq -- The Iq stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - return self.check_stanza(Iq, iq, xml_string, use_values=use_values) - - def check_presence(self, pres, xml_string, use_values=True): - """ - Create and compare several presence stanza objects to a - correct XML string. - - If use_values is False, the test using getStanzaValues() and - setStanzaValues() will not be used. - - Arguments: - iq -- The Iq stanza object to check. - xml_string -- The XML contents to compare against. - use_values -- Indicates if the test using getStanzaValues - and setStanzaValues should be used. Defaults - to True. - """ - return self.check_stanza(Presence, pres, xml_string, - defaults=['priority'], - use_values=use_values) - - # ------------------------------------------------------------------ - # Methods for simulating stanza streams. - - def stream_start(self, mode='client', skip=True, header=None): - """ - Initialize an XMPP client or component using a dummy XML stream. - - Arguments: - mode -- Either 'client' or 'component'. Defaults to 'client'. - skip -- Indicates if the first item in the sent queue (the - stream header) should be removed. Tests that wish - to test initializing the stream should set this to - False. Otherwise, the default of True should be used. - """ - if mode == 'client': - self.xmpp = ClientXMPP('tester@localhost', 'test') - elif mode == 'component': - self.xmpp = ComponentXMPP('tester.localhost', 'test', - 'localhost', 8888) - else: - raise ValueError("Unknown XMPP connection mode.") - - self.xmpp.setSocket(TestSocket()) - self.xmpp.state.set('reconnect', False) - self.xmpp.state.set('is client', True) - self.xmpp.state.set('connected', True) - - # Must have the stream header ready for xmpp.process() to work. - if not header: - header = self.xmpp.stream_header - self.xmpp.socket.recv_data(header) - - self.xmpp.connect = lambda a=None, b=None, c=None, d=None: True - self.xmpp.process(threaded=True) - if skip: - # Clear startup stanzas - self.xmpp.socket.next_sent(timeout=0.01) - if mode == 'component': - self.xmpp.socket.next_sent(timeout=0.01) - - def stream_recv(self, data): - """ - Pass data to the dummy XMPP client as if it came from an XMPP server. - - Arguments: - data -- String stanza XML to be received and processed by the - XMPP client or component. - """ - data = str(data) - self.xmpp.socket.recv_data(data) - - def stream_make_header(self, sto='', - sfrom='', - sid='', - stream_ns="http://etherx.jabber.org/streams", - default_ns="jabber:client", - version="1.0", - xml_header=True): - """ - Create a stream header to be received by the test XMPP agent. - - The header must be saved and passed to stream_start. - - Arguments: - sto -- The recipient of the stream header. - sfrom -- The agent sending the stream header. - sid -- The stream's id. - stream_ns -- The namespace of the stream's root element. - default_ns -- The default stanza namespace. - version -- The stream version. - xml_header -- Indicates if the XML version header should be - appended before the stream header. - """ - header = '<stream:stream %s>' - parts = [] - if xml_header: - header = '<?xml version="1.0"?>' + header - if sto: - parts.append('to="%s"' % sto) - if sfrom: - parts.append('from="%s"' % sfrom) - if sid: - parts.append('id="%s"' % sid) - parts.append('version="%s"' % version) - parts.append('xmlns:stream="%s"' % stream_ns) - parts.append('xmlns="%s"' % default_ns) - return header % ' '.join(parts) - - def stream_send_header(self, sto='', - sfrom='', - sid='', - stream_ns="http://etherx.jabber.org/streams", - default_ns="jabber:client", - version="1.0", - xml_header=False, - timeout=0.1): - """ - Check that a given stream header was sent. - - Arguments: - sto -- The recipient of the stream header. - sfrom -- The agent sending the stream header. - sid -- The stream's id. - stream_ns -- The namespace of the stream's root element. - default_ns -- The default stanza namespace. - version -- The stream version. - xml_header -- Indicates if the XML version header should be - appended before the stream header. - timeout -- Length of time to wait in seconds for a - response. - """ - header = self.stream_make_header(sto, sfrom, sid, - stream_ns=stream_ns, - default_ns=default_ns, - version=version, - xml_header=xml_header) - sent_header = self.xmpp.socket.next_sent(timeout) - if sent_header is None: - raise ValueError("Socket did not return data.") - - # Apply closing elements so that we can construct - # XML objects for comparison. - header2 = header + '</stream:stream>' - sent_header2 = sent_header + '</stream:stream>' - - xml = ET.fromstring(header2) - sent_xml = ET.fromstring(sent_header2) - - self.failUnless( - self.compare(xml, sent_xml), - "Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % ( - header, sent_header)) - - def stream_send_stanza(self, stanza_class, data, defaults=None, - use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_stanza. - - Arguments: - stanza_class -- The class of the sent stanza object. - data -- The XML string of the expected Message stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_message. - defaults -- A list of stanza interfaces that have defaults - values which may interfere with comparisons. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isintance(data, str): - data = stanza_class(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_stanza(stanza_class, data, sent, - defaults=defaults, - use_values=use_values) - - def stream_send_message(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_message. - - Arguments: - data -- The XML string of the expected Message stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_message. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Message(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_message(data, sent, use_values) - - def stream_send_iq(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_iq. - - Arguments: - data -- The XML string of the expected Iq stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_iq. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Iq(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_iq(data, sent, use_values) - - def stream_send_presence(self, data, use_values=True, timeout=.1): - """ - Check that the XMPP client sent the given stanza XML. - - Extracts the next sent stanza and compares it with the given - XML using check_presence. - - Arguments: - data -- The XML string of the expected Presence stanza, - or an equivalent stanza object. - use_values -- Modifies the type of tests used by check_presence. - timeout -- Time in seconds to wait for a stanza before - failing the check. - """ - if isinstance(data, str): - data = self.Presence(xml=ET.fromstring(data)) - sent = self.xmpp.socket.next_sent(timeout) - self.check_presence(data, sent, use_values) - - def stream_close(self): - """ - Disconnect the dummy XMPP client. - - Can be safely called even if stream_start has not been called. - - Must be placed in the tearDown method of a test class to ensure - that the XMPP client is disconnected after an error. - """ - if hasattr(self, 'xmpp') and self.xmpp is not None: - self.xmpp.disconnect() - self.xmpp.socket.recv_data(self.xmpp.stream_footer) - - # ------------------------------------------------------------------ - # XML Comparison and Cleanup - - def fix_namespaces(self, xml, ns): - """ - Assign a namespace to an element and any children that - don't have a namespace. - - Arguments: - xml -- The XML object to fix. - ns -- The namespace to add to the XML object. - """ - if xml.tag.startswith('{'): - return - xml.tag = '{%s}%s' % (ns, xml.tag) - for child in xml.getchildren(): - self.fix_namespaces(child, ns) - - def compare(self, xml, *other): - """ - Compare XML objects. - - Arguments: - xml -- The XML object to compare against. - *other -- The list of XML objects to compare. - """ - if not other: - return False - - # Compare multiple objects - if len(other) > 1: - for xml2 in other: - if not self.compare(xml, xml2): - return False - return True - - other = other[0] - - # Step 1: Check tags - if xml.tag != other.tag: - return False - - # Step 2: Check attributes - if xml.attrib != other.attrib: - return False - - # Step 3: Check text - if xml.text is None: - xml.text = "" - if other.text is None: - other.text = "" - xml.text = xml.text.strip() - other.text = other.text.strip() - - if xml.text != other.text: - return False - - # Step 4: Recursively check children - for child in xml: - child2s = other.findall("%s" % child.tag) - if child2s is None: - return False - for child2 in child2s: - if self.compare(child, child2): - break - else: - return False - - # Everything matches - return True |