summaryrefslogtreecommitdiff
path: root/sleekxmpp/test
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/test')
-rw-r--r--sleekxmpp/test/__init__.py11
-rw-r--r--sleekxmpp/test/livesocket.py172
-rw-r--r--sleekxmpp/test/mocksocket.py153
-rw-r--r--sleekxmpp/test/sleektest.py774
4 files changed, 0 insertions, 1110 deletions
diff --git a/sleekxmpp/test/__init__.py b/sleekxmpp/test/__init__.py
deleted file mode 100644
index 54d4dc57..00000000
--- a/sleekxmpp/test/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-from sleekxmpp.test.mocksocket import TestSocket
-from sleekxmpp.test.livesocket import TestLiveSocket
-from sleekxmpp.test.sleektest import *
diff --git a/sleekxmpp/test/livesocket.py b/sleekxmpp/test/livesocket.py
deleted file mode 100644
index d70ee4eb..00000000
--- a/sleekxmpp/test/livesocket.py
+++ /dev/null
@@ -1,172 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-import socket
-import threading
-
-from sleekxmpp.util import Queue
-
-
-class TestLiveSocket(object):
-
- """
- A live test socket that reads and writes to queues in
- addition to an actual networking socket.
-
- Methods:
- next_sent -- Return the next sent stanza.
- next_recv -- Return the next received stanza.
- recv_data -- Dummy method to have same interface as TestSocket.
- recv -- Read the next stanza from the socket.
- send -- Write a stanza to the socket.
- makefile -- Dummy call, returns self.
- read -- Read the next stanza from the socket.
- """
-
- def __init__(self, *args, **kwargs):
- """
- Create a new, live test socket.
-
- Arguments:
- Same as arguments for socket.socket
- """
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.recv_buffer = []
- self.recv_queue = Queue()
- self.send_queue = Queue()
- self.send_queue_lock = threading.Lock()
- self.recv_queue_lock = threading.Lock()
- self.is_live = True
-
- def __getattr__(self, name):
- """
- Return attribute values of internal, live socket.
-
- Arguments:
- name -- Name of the attribute requested.
- """
-
- return getattr(self.socket, name)
-
- # ------------------------------------------------------------------
- # Testing Interface
-
- def disconnect_errror(self):
- """
- Used to simulate a socket disconnection error.
-
- Not used by live sockets.
- """
- try:
- self.socket.shutdown()
- self.socket.close()
- except:
- pass
-
- def next_sent(self, timeout=None):
- """
- Get the next stanza that has been sent.
-
- Arguments:
- timeout -- Optional timeout for waiting for a new value.
- """
- args = {'block': False}
- if timeout is not None:
- args = {'block': True, 'timeout': timeout}
- try:
- return self.send_queue.get(**args)
- except:
- return None
-
- def next_recv(self, timeout=None):
- """
- Get the next stanza that has been received.
-
- Arguments:
- timeout -- Optional timeout for waiting for a new value.
- """
- args = {'block': False}
- if timeout is not None:
- args = {'block': True, 'timeout': timeout}
- try:
- if self.recv_buffer:
- return self.recv_buffer.pop(0)
- else:
- return self.recv_queue.get(**args)
- except:
- return None
-
- def recv_data(self, data):
- """
- Add data to a receive buffer for cases when more than a single stanza
- was received.
- """
- self.recv_buffer.append(data)
-
- # ------------------------------------------------------------------
- # Socket Interface
-
- def recv(self, *args, **kwargs):
- """
- Read data from the socket.
-
- Store a copy in the receive queue.
-
- Arguments:
- Placeholders. Same as for socket.recv.
- """
- data = self.socket.recv(*args, **kwargs)
- with self.recv_queue_lock:
- self.recv_queue.put(data)
- return data
-
- def send(self, data):
- """
- Send data on the socket.
-
- Store a copy in the send queue.
-
- Arguments:
- data -- String value to write.
- """
- with self.send_queue_lock:
- self.send_queue.put(data)
- return self.socket.send(data)
-
- # ------------------------------------------------------------------
- # File Socket
-
- def makefile(self, *args, **kwargs):
- """
- File socket version to use with ElementTree.
-
- Arguments:
- Placeholders, same as socket.makefile()
- """
- return self
-
- def read(self, *args, **kwargs):
- """
- Implement the file socket read interface.
-
- Arguments:
- Placeholders, same as socket.recv()
- """
- return self.recv(*args, **kwargs)
-
- def clear(self):
- """
- Empty the send queue, typically done once the session has started to
- remove the feature negotiation and log in stanzas.
- """
- with self.send_queue_lock:
- for i in range(0, self.send_queue.qsize()):
- self.send_queue.get(block=False)
- with self.recv_queue_lock:
- for i in range(0, self.recv_queue.qsize()):
- self.recv_queue.get(block=False)
diff --git a/sleekxmpp/test/mocksocket.py b/sleekxmpp/test/mocksocket.py
deleted file mode 100644
index 4c9d1699..00000000
--- a/sleekxmpp/test/mocksocket.py
+++ /dev/null
@@ -1,153 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-import socket
-
-from sleekxmpp.util import Queue
-
-
-class TestSocket(object):
-
- """
- A dummy socket that reads and writes to queues instead
- of an actual networking socket.
-
- Methods:
- next_sent -- Return the next sent stanza.
- recv_data -- Make a stanza available to read next.
- recv -- Read the next stanza from the socket.
- send -- Write a stanza to the socket.
- makefile -- Dummy call, returns self.
- read -- Read the next stanza from the socket.
- """
-
- def __init__(self, *args, **kwargs):
- """
- Create a new test socket.
-
- Arguments:
- Same as arguments for socket.socket
- """
- self.socket = socket.socket(*args, **kwargs)
- self.recv_queue = Queue()
- self.send_queue = Queue()
- self.is_live = False
- self.disconnected = False
-
- def __getattr__(self, name):
- """
- Return attribute values of internal, dummy socket.
-
- Some attributes and methods are disabled to prevent the
- socket from connecting to the network.
-
- Arguments:
- name -- Name of the attribute requested.
- """
-
- def dummy(*args):
- """Method to do nothing and prevent actual socket connections."""
- return None
-
- overrides = {'connect': dummy,
- 'close': dummy,
- 'shutdown': dummy}
-
- return overrides.get(name, getattr(self.socket, name))
-
- # ------------------------------------------------------------------
- # Testing Interface
-
- def next_sent(self, timeout=None):
- """
- Get the next stanza that has been 'sent'.
-
- Arguments:
- timeout -- Optional timeout for waiting for a new value.
- """
- args = {'block': False}
- if timeout is not None:
- args = {'block': True, 'timeout': timeout}
- try:
- return self.send_queue.get(**args)
- except:
- return None
-
- def recv_data(self, data):
- """
- Add data to the receiving queue.
-
- Arguments:
- data -- String data to 'write' to the socket to be received
- by the XMPP client.
- """
- self.recv_queue.put(data)
-
- def disconnect_error(self):
- """
- Simulate a disconnect error by raising a socket.error exception
- for any current or further socket operations.
- """
- self.disconnected = True
-
- # ------------------------------------------------------------------
- # Socket Interface
-
- def recv(self, *args, **kwargs):
- """
- Read a value from the received queue.
-
- Arguments:
- Placeholders. Same as for socket.Socket.recv.
- """
- if self.disconnected:
- raise socket.error
- return self.read(block=True)
-
- def send(self, data):
- """
- Send data by placing it in the send queue.
-
- Arguments:
- data -- String value to write.
- """
- if self.disconnected:
- raise socket.error
- self.send_queue.put(data)
- return len(data)
-
- # ------------------------------------------------------------------
- # File Socket
-
- def makefile(self, *args, **kwargs):
- """
- File socket version to use with ElementTree.
-
- Arguments:
- Placeholders, same as socket.Socket.makefile()
- """
- return self
-
- def read(self, block=True, timeout=None, **kwargs):
- """
- Implement the file socket interface.
-
- Arguments:
- block -- Indicate if the read should block until a
- value is ready.
- timeout -- Time in seconds a block should last before
- returning None.
- """
- if self.disconnected:
- raise socket.error
- if timeout is not None:
- block = True
- try:
- return self.recv_queue.get(block, timeout)
- except:
- return None
diff --git a/sleekxmpp/test/sleektest.py b/sleekxmpp/test/sleektest.py
deleted file mode 100644
index e26f99ce..00000000
--- a/sleekxmpp/test/sleektest.py
+++ /dev/null
@@ -1,774 +0,0 @@
-"""
- SleekXMPP: The Sleek XMPP Library
- Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
- This file is part of SleekXMPP.
-
- See the file LICENSE for copying permission.
-"""
-
-import unittest
-from xml.parsers.expat import ExpatError
-
-from sleekxmpp import ClientXMPP, ComponentXMPP
-from sleekxmpp.util import Queue
-from sleekxmpp.stanza import Message, Iq, Presence
-from sleekxmpp.test import TestSocket, TestLiveSocket
-from sleekxmpp.xmlstream import ET
-from sleekxmpp.xmlstream import ElementBase
-from sleekxmpp.xmlstream.tostring import tostring
-from sleekxmpp.xmlstream.matcher import StanzaPath, MatcherId, MatchIDSender
-from sleekxmpp.xmlstream.matcher import MatchXMLMask, MatchXPath
-
-
-class SleekTest(unittest.TestCase):
-
- """
- A SleekXMPP specific TestCase class that provides
- methods for comparing message, iq, and presence stanzas.
-
- Methods:
- Message -- Create a Message stanza object.
- Iq -- Create an Iq stanza object.
- Presence -- Create a Presence stanza object.
- check_jid -- Check a JID and its component parts.
- check -- Compare a stanza against an XML string.
- stream_start -- Initialize a dummy XMPP client.
- stream_close -- Disconnect the XMPP client.
- make_header -- Create a stream header.
- send_header -- Check that the given header has been sent.
- send_feature -- Send a raw XML element.
- send -- Check that the XMPP client sent the given
- generic stanza.
- recv -- Queue data for XMPP client to receive, or
- verify the data that was received from a
- live connection.
- recv_header -- Check that a given stream header
- was received.
- recv_feature -- Check that a given, raw XML element
- was recveived.
- fix_namespaces -- Add top-level namespace to an XML object.
- compare -- Compare XML objects against each other.
- """
-
- def __init__(self, *args, **kwargs):
- unittest.TestCase.__init__(self, *args, **kwargs)
- self.xmpp = None
-
- def parse_xml(self, xml_string):
- try:
- xml = ET.fromstring(xml_string)
- return xml
- except (SyntaxError, ExpatError) as e:
- msg = e.msg if hasattr(e, 'msg') else e.message
- if 'unbound' in msg:
- known_prefixes = {
- 'stream': 'http://etherx.jabber.org/streams'}
-
- prefix = xml_string.split('<')[1].split(':')[0]
- if prefix in known_prefixes:
- xml_string = '<fixns xmlns:%s="%s">%s</fixns>' % (
- prefix,
- known_prefixes[prefix],
- xml_string)
- xml = self.parse_xml(xml_string)
- xml = list(xml)[0]
- return xml
- else:
- self.fail("XML data was mal-formed:\n%s" % xml_string)
-
- # ------------------------------------------------------------------
- # Shortcut methods for creating stanza objects
-
- def Message(self, *args, **kwargs):
- """
- Create a Message stanza.
-
- Uses same arguments as StanzaBase.__init__
-
- Arguments:
- xml -- An XML object to use for the Message's values.
- """
- return Message(self.xmpp, *args, **kwargs)
-
- def Iq(self, *args, **kwargs):
- """
- Create an Iq stanza.
-
- Uses same arguments as StanzaBase.__init__
-
- Arguments:
- xml -- An XML object to use for the Iq's values.
- """
- return Iq(self.xmpp, *args, **kwargs)
-
- def Presence(self, *args, **kwargs):
- """
- Create a Presence stanza.
-
- Uses same arguments as StanzaBase.__init__
-
- Arguments:
- xml -- An XML object to use for the Iq's values.
- """
- return Presence(self.xmpp, *args, **kwargs)
-
- def check_jid(self, jid, user=None, domain=None, resource=None,
- bare=None, full=None, string=None):
- """
- Verify the components of a JID.
-
- Arguments:
- jid -- The JID object to test.
- user -- Optional. The user name portion of the JID.
- domain -- Optional. The domain name portion of the JID.
- resource -- Optional. The resource portion of the JID.
- bare -- Optional. The bare JID.
- full -- Optional. The full JID.
- string -- Optional. The string version of the JID.
- """
- if user is not None:
- self.assertEqual(jid.user, user,
- "User does not match: %s" % jid.user)
- if domain is not None:
- self.assertEqual(jid.domain, domain,
- "Domain does not match: %s" % jid.domain)
- if resource is not None:
- self.assertEqual(jid.resource, resource,
- "Resource does not match: %s" % jid.resource)
- if bare is not None:
- self.assertEqual(jid.bare, bare,
- "Bare JID does not match: %s" % jid.bare)
- if full is not None:
- self.assertEqual(jid.full, full,
- "Full JID does not match: %s" % jid.full)
- if string is not None:
- self.assertEqual(str(jid), string,
- "String does not match: %s" % str(jid))
-
- def check_roster(self, owner, jid, name=None, subscription=None,
- afrom=None, ato=None, pending_out=None, pending_in=None,
- groups=None):
- roster = self.xmpp.roster[owner][jid]
- if name is not None:
- self.assertEqual(roster['name'], name,
- "Incorrect name value: %s" % roster['name'])
- if subscription is not None:
- self.assertEqual(roster['subscription'], subscription,
- "Incorrect subscription: %s" % roster['subscription'])
- if afrom is not None:
- self.assertEqual(roster['from'], afrom,
- "Incorrect from state: %s" % roster['from'])
- if ato is not None:
- self.assertEqual(roster['to'], ato,
- "Incorrect to state: %s" % roster['to'])
- if pending_out is not None:
- self.assertEqual(roster['pending_out'], pending_out,
- "Incorrect pending_out state: %s" % roster['pending_out'])
- if pending_in is not None:
- self.assertEqual(roster['pending_in'], pending_out,
- "Incorrect pending_in state: %s" % roster['pending_in'])
- if groups is not None:
- self.assertEqual(roster['groups'], groups,
- "Incorrect groups: %s" % roster['groups'])
-
- # ------------------------------------------------------------------
- # Methods for comparing stanza objects to XML strings
-
- def check(self, stanza, criteria, method='exact',
- defaults=None, use_values=True):
- """
- Create and compare several stanza objects to a correct XML string.
-
- If use_values is False, tests using stanza.values will not be used.
-
- Some stanzas provide default values for some interfaces, but
- these defaults can be problematic for testing since they can easily
- be forgotten when supplying the XML string. A list of interfaces that
- use defaults may be provided and the generated stanzas will use the
- default values for those interfaces if needed.
-
- However, correcting the supplied XML is not possible for interfaces
- that add or remove XML elements. Only interfaces that map to XML
- attributes may be set using the defaults parameter. The supplied XML
- must take into account any extra elements that are included by default.
-
- Arguments:
- stanza -- The stanza object to test.
- criteria -- An expression the stanza must match against.
- method -- The type of matching to use; one of:
- 'exact', 'mask', 'id', 'xpath', and 'stanzapath'.
- Defaults to the value of self.match_method.
- defaults -- A list of stanza interfaces that have default
- values. These interfaces will be set to their
- defaults for the given and generated stanzas to
- prevent unexpected test failures.
- use_values -- Indicates if testing using stanza.values should
- be used. Defaults to True.
- """
- if method is None and hasattr(self, 'match_method'):
- method = getattr(self, 'match_method')
-
- if method != 'exact':
- matchers = {'stanzapath': StanzaPath,
- 'xpath': MatchXPath,
- 'mask': MatchXMLMask,
- 'idsender': MatchIDSender,
- 'id': MatcherId}
- Matcher = matchers.get(method, None)
- if Matcher is None:
- raise ValueError("Unknown matching method.")
- test = Matcher(criteria)
- self.failUnless(test.match(stanza),
- "Stanza did not match using %s method:\n" % method + \
- "Criteria:\n%s\n" % str(criteria) + \
- "Stanza:\n%s" % str(stanza))
- else:
- stanza_class = stanza.__class__
- if not isinstance(criteria, ElementBase):
- xml = self.parse_xml(criteria)
- else:
- xml = criteria.xml
-
- # Ensure that top level namespaces are used, even if they
- # were not provided.
- self.fix_namespaces(stanza.xml, 'jabber:client')
- self.fix_namespaces(xml, 'jabber:client')
-
- stanza2 = stanza_class(xml=xml)
-
- if use_values:
- # Using stanza.values will add XML for any interface that
- # has a default value. We need to set those defaults on
- # the existing stanzas and XML so that they will compare
- # correctly.
- default_stanza = stanza_class()
- if defaults is None:
- known_defaults = {
- Message: ['type'],
- Presence: ['priority']
- }
- defaults = known_defaults.get(stanza_class, [])
- for interface in defaults:
- stanza[interface] = stanza[interface]
- stanza2[interface] = stanza2[interface]
- # Can really only automatically add defaults for top
- # level attribute values. Anything else must be accounted
- # for in the provided XML string.
- if interface not in xml.attrib:
- if interface in default_stanza.xml.attrib:
- value = default_stanza.xml.attrib[interface]
- xml.attrib[interface] = value
-
- values = stanza2.values
- stanza3 = stanza_class()
- stanza3.values = values
-
- debug = "Three methods for creating stanzas do not match.\n"
- debug += "Given XML:\n%s\n" % tostring(xml)
- debug += "Given stanza:\n%s\n" % tostring(stanza.xml)
- debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml)
- debug += "Second generated stanza:\n%s\n" % tostring(stanza3.xml)
- result = self.compare(xml, stanza.xml, stanza2.xml, stanza3.xml)
- else:
- debug = "Two methods for creating stanzas do not match.\n"
- debug += "Given XML:\n%s\n" % tostring(xml)
- debug += "Given stanza:\n%s\n" % tostring(stanza.xml)
- debug += "Generated stanza:\n%s\n" % tostring(stanza2.xml)
- result = self.compare(xml, stanza.xml, stanza2.xml)
-
- self.failUnless(result, debug)
-
- # ------------------------------------------------------------------
- # Methods for simulating stanza streams.
-
- def stream_disconnect(self):
- """
- Simulate a stream disconnection.
- """
- if self.xmpp:
- self.xmpp.socket.disconnect_error()
-
- def stream_start(self, mode='client', skip=True, header=None, socket='mock', jid='tester@localhost',
- password='test', server='localhost', port=5222, sasl_mech=None, plugins=None, plugin_config=None):
- """
- Initialize an XMPP client or component using a dummy XML stream.
-
- Arguments:
- mode -- Either 'client' or 'component'. Defaults to 'client'.
- skip -- Indicates if the first item in the sent queue (the
- stream header) should be removed. Tests that wish
- to test initializing the stream should set this to
- False. Otherwise, the default of True should be used.
- socket -- Either 'mock' or 'live' to indicate if the socket
- should be a dummy, mock socket or a live, functioning
- socket. Defaults to 'mock'.
- jid -- The JID to use for the connection.
- Defaults to 'tester@localhost'.
- password -- The password to use for the connection.
- Defaults to 'test'.
- server -- The name of the XMPP server. Defaults to 'localhost'.
- port -- The port to use when connecting to the server.
- Defaults to 5222.
- plugins -- List of plugins to register. By default, all plugins
- are loaded.
- """
- if not plugin_config:
- plugin_config = {}
-
- if mode == 'client':
- self.xmpp = ClientXMPP(jid, password,
- sasl_mech=sasl_mech,
- plugin_config=plugin_config)
- elif mode == 'component':
- self.xmpp = ComponentXMPP(jid, password,
- server, port,
- plugin_config=plugin_config)
- else:
- raise ValueError("Unknown XMPP connection mode.")
-
- # Remove unique ID prefix to make it easier to test
- self.xmpp._id_prefix = ''
- self.xmpp._disconnect_wait_for_threads = False
- self.xmpp.default_lang = None
- self.xmpp.peer_default_lang = None
-
- # We will use this to wait for the session_start event
- # for live connections.
- skip_queue = Queue()
-
- if socket == 'mock':
- self.xmpp.set_socket(TestSocket())
-
- # Simulate connecting for mock sockets.
- self.xmpp.auto_reconnect = False
- self.xmpp.state._set_state('connected')
-
- # Must have the stream header ready for xmpp.process() to work.
- if not header:
- header = self.xmpp.stream_header
- self.xmpp.socket.recv_data(header)
- elif socket == 'live':
- self.xmpp.socket_class = TestLiveSocket
-
- def wait_for_session(x):
- self.xmpp.socket.clear()
- skip_queue.put('started')
-
- self.xmpp.add_event_handler('session_start', wait_for_session)
- if server is not None:
- self.xmpp.connect((server, port))
- else:
- self.xmpp.connect()
- else:
- raise ValueError("Unknown socket type.")
-
- if plugins is None:
- self.xmpp.register_plugins()
- else:
- for plugin in plugins:
- self.xmpp.register_plugin(plugin)
-
- # Some plugins require messages to have ID values. Set
- # this to True in tests related to those plugins.
- self.xmpp.use_message_ids = False
-
- self.xmpp.process(threaded=True)
- if skip:
- if socket != 'live':
- # Mark send queue as usable
- self.xmpp.session_bind_event.set()
- self.xmpp.session_started_event.set()
- # Clear startup stanzas
- self.xmpp.socket.next_sent(timeout=1)
- if mode == 'component':
- self.xmpp.socket.next_sent(timeout=1)
- else:
- skip_queue.get(block=True, timeout=10)
-
- def make_header(self, sto='',
- sfrom='',
- sid='',
- stream_ns="http://etherx.jabber.org/streams",
- default_ns="jabber:client",
- default_lang="en",
- version="1.0",
- xml_header=True):
- """
- Create a stream header to be received by the test XMPP agent.
-
- The header must be saved and passed to stream_start.
-
- Arguments:
- sto -- The recipient of the stream header.
- sfrom -- The agent sending the stream header.
- sid -- The stream's id.
- stream_ns -- The namespace of the stream's root element.
- default_ns -- The default stanza namespace.
- version -- The stream version.
- xml_header -- Indicates if the XML version header should be
- appended before the stream header.
- """
- header = '<stream:stream %s>'
- parts = []
- if xml_header:
- header = '<?xml version="1.0"?>' + header
- if sto:
- parts.append('to="%s"' % sto)
- if sfrom:
- parts.append('from="%s"' % sfrom)
- if sid:
- parts.append('id="%s"' % sid)
- if default_lang:
- parts.append('xml:lang="%s"' % default_lang)
- parts.append('version="%s"' % version)
- parts.append('xmlns:stream="%s"' % stream_ns)
- parts.append('xmlns="%s"' % default_ns)
- return header % ' '.join(parts)
-
- def recv(self, data, defaults=None, method='exact', use_values=True, timeout=1):
- """
- Pass data to the dummy XMPP client as if it came from an XMPP server.
-
- If using a live connection, verify what the server has sent.
-
- Arguments:
- data -- If a dummy socket is being used, the XML that is to
- be received next. Otherwise it is the criteria used
- to match against live data that is received.
- defaults -- A list of stanza interfaces with default values that
- may interfere with comparisons.
- method -- Select the type of comparison to use for
- verifying the received stanza. Options are 'exact',
- 'id', 'stanzapath', 'xpath', and 'mask'.
- Defaults to the value of self.match_method.
- use_values -- Indicates if stanza comparisons should test using
- stanza.values. Defaults to True.
- timeout -- Time to wait in seconds for data to be received by
- a live connection.
- """
- if not defaults:
- defaults = []
-
- if self.xmpp.socket.is_live:
- # we are working with a live connection, so we should
- # verify what has been received instead of simulating
- # receiving data.
- recv_data = self.xmpp.socket.next_recv(timeout)
- if recv_data is None:
- self.fail("No stanza was received.")
- xml = self.parse_xml(recv_data)
- self.fix_namespaces(xml, 'jabber:client')
- stanza = self.xmpp._build_stanza(xml, 'jabber:client')
- self.check(stanza, data,
- method=method,
- defaults=defaults,
- use_values=use_values)
- else:
- # place the data in the dummy socket receiving queue.
- data = str(data)
- self.xmpp.socket.recv_data(data)
-
- def recv_header(self, sto='',
- sfrom='',
- sid='',
- stream_ns="http://etherx.jabber.org/streams",
- default_ns="jabber:client",
- version="1.0",
- xml_header=False,
- timeout=1):
- """
- Check that a given stream header was received.
-
- Arguments:
- sto -- The recipient of the stream header.
- sfrom -- The agent sending the stream header.
- sid -- The stream's id. Set to None to ignore.
- stream_ns -- The namespace of the stream's root element.
- default_ns -- The default stanza namespace.
- version -- The stream version.
- xml_header -- Indicates if the XML version header should be
- appended before the stream header.
- timeout -- Length of time to wait in seconds for a
- response.
- """
- header = self.make_header(sto, sfrom, sid,
- stream_ns=stream_ns,
- default_ns=default_ns,
- version=version,
- xml_header=xml_header)
- recv_header = self.xmpp.socket.next_recv(timeout)
- if recv_header is None:
- raise ValueError("Socket did not return data.")
-
- # Apply closing elements so that we can construct
- # XML objects for comparison.
- header2 = header + '</stream:stream>'
- recv_header2 = recv_header + '</stream:stream>'
-
- xml = self.parse_xml(header2)
- recv_xml = self.parse_xml(recv_header2)
-
- if sid is None:
- # Ignore the id sent by the server since
- # we can't know in advance what it will be.
- if 'id' in recv_xml.attrib:
- del recv_xml.attrib['id']
-
- # Ignore the xml:lang attribute for now.
- if 'xml:lang' in recv_xml.attrib:
- del recv_xml.attrib['xml:lang']
- xml_ns = 'http://www.w3.org/XML/1998/namespace'
- if '{%s}lang' % xml_ns in recv_xml.attrib:
- del recv_xml.attrib['{%s}lang' % xml_ns]
-
- if list(recv_xml):
- # We received more than just the header
- for xml in recv_xml:
- self.xmpp.socket.recv_data(tostring(xml))
-
- attrib = recv_xml.attrib
- recv_xml.clear()
- recv_xml.attrib = attrib
-
- self.failUnless(
- self.compare(xml, recv_xml),
- "Stream headers do not match:\nDesired:\n%s\nReceived:\n%s" % (
- '%s %s' % (xml.tag, xml.attrib),
- '%s %s' % (recv_xml.tag, recv_xml.attrib)))
-
- def recv_feature(self, data, method='mask', use_values=True, timeout=1):
- """
- """
- if method is None and hasattr(self, 'match_method'):
- method = getattr(self, 'match_method')
-
- if self.xmpp.socket.is_live:
- # we are working with a live connection, so we should
- # verify what has been received instead of simulating
- # receiving data.
- recv_data = self.xmpp.socket.next_recv(timeout)
- xml = self.parse_xml(data)
- recv_xml = self.parse_xml(recv_data)
- if recv_data is None:
- self.fail("No stanza was received.")
- if method == 'exact':
- self.failUnless(self.compare(xml, recv_xml),
- "Features do not match.\nDesired:\n%s\nReceived:\n%s" % (
- tostring(xml), tostring(recv_xml)))
- elif method == 'mask':
- matcher = MatchXMLMask(xml)
- self.failUnless(matcher.match(recv_xml),
- "Stanza did not match using %s method:\n" % method + \
- "Criteria:\n%s\n" % tostring(xml) + \
- "Stanza:\n%s" % tostring(recv_xml))
- else:
- raise ValueError("Uknown matching method: %s" % method)
- else:
- # place the data in the dummy socket receiving queue.
- data = str(data)
- self.xmpp.socket.recv_data(data)
-
- def send_header(self, sto='',
- sfrom='',
- sid='',
- stream_ns="http://etherx.jabber.org/streams",
- default_ns="jabber:client",
- default_lang="en",
- version="1.0",
- xml_header=False,
- timeout=1):
- """
- Check that a given stream header was sent.
-
- Arguments:
- sto -- The recipient of the stream header.
- sfrom -- The agent sending the stream header.
- sid -- The stream's id.
- stream_ns -- The namespace of the stream's root element.
- default_ns -- The default stanza namespace.
- version -- The stream version.
- xml_header -- Indicates if the XML version header should be
- appended before the stream header.
- timeout -- Length of time to wait in seconds for a
- response.
- """
- header = self.make_header(sto, sfrom, sid,
- stream_ns=stream_ns,
- default_ns=default_ns,
- default_lang=default_lang,
- version=version,
- xml_header=xml_header)
- sent_header = self.xmpp.socket.next_sent(timeout)
- if sent_header is None:
- raise ValueError("Socket did not return data.")
-
- # Apply closing elements so that we can construct
- # XML objects for comparison.
- header2 = header + '</stream:stream>'
- sent_header2 = sent_header + b'</stream:stream>'
-
- xml = self.parse_xml(header2)
- sent_xml = self.parse_xml(sent_header2)
-
- self.failUnless(
- self.compare(xml, sent_xml),
- "Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % (
- header, sent_header))
-
- def send_feature(self, data, method='mask', use_values=True, timeout=1):
- """
- """
- sent_data = self.xmpp.socket.next_sent(timeout)
- xml = self.parse_xml(data)
- sent_xml = self.parse_xml(sent_data)
- if sent_data is None:
- self.fail("No stanza was sent.")
- if method == 'exact':
- self.failUnless(self.compare(xml, sent_xml),
- "Features do not match.\nDesired:\n%s\nReceived:\n%s" % (
- tostring(xml), tostring(sent_xml)))
- elif method == 'mask':
- matcher = MatchXMLMask(xml)
- self.failUnless(matcher.match(sent_xml),
- "Stanza did not match using %s method:\n" % method + \
- "Criteria:\n%s\n" % tostring(xml) + \
- "Stanza:\n%s" % tostring(sent_xml))
- else:
- raise ValueError("Uknown matching method: %s" % method)
-
- def send(self, data, defaults=None, use_values=True,
- timeout=.5, method='exact'):
- """
- Check that the XMPP client sent the given stanza XML.
-
- Extracts the next sent stanza and compares it with the given
- XML using check.
-
- Arguments:
- stanza_class -- The class of the sent stanza object.
- data -- The XML string of the expected Message stanza,
- or an equivalent stanza object.
- use_values -- Modifies the type of tests used by check_message.
- defaults -- A list of stanza interfaces that have defaults
- values which may interfere with comparisons.
- timeout -- Time in seconds to wait for a stanza before
- failing the check.
- method -- Select the type of comparison to use for
- verifying the sent stanza. Options are 'exact',
- 'id', 'stanzapath', 'xpath', and 'mask'.
- Defaults to the value of self.match_method.
- """
- sent = self.xmpp.socket.next_sent(timeout)
- if data is None and sent is None:
- return
- if data is None and sent is not None:
- self.fail("Stanza data was sent: %s" % sent)
- if sent is None:
- self.fail("No stanza was sent.")
-
- xml = self.parse_xml(sent)
- self.fix_namespaces(xml, 'jabber:client')
- sent = self.xmpp._build_stanza(xml, 'jabber:client')
- self.check(sent, data,
- method=method,
- defaults=defaults,
- use_values=use_values)
-
- def stream_close(self):
- """
- Disconnect the dummy XMPP client.
-
- Can be safely called even if stream_start has not been called.
-
- Must be placed in the tearDown method of a test class to ensure
- that the XMPP client is disconnected after an error.
- """
- if hasattr(self, 'xmpp') and self.xmpp is not None:
- self.xmpp.socket.recv_data(self.xmpp.stream_footer)
- self.xmpp.disconnect()
-
- # ------------------------------------------------------------------
- # XML Comparison and Cleanup
-
- def fix_namespaces(self, xml, ns):
- """
- Assign a namespace to an element and any children that
- don't have a namespace.
-
- Arguments:
- xml -- The XML object to fix.
- ns -- The namespace to add to the XML object.
- """
- if xml.tag.startswith('{'):
- return
- xml.tag = '{%s}%s' % (ns, xml.tag)
- for child in xml:
- self.fix_namespaces(child, ns)
-
- def compare(self, xml, *other):
- """
- Compare XML objects.
-
- Arguments:
- xml -- The XML object to compare against.
- *other -- The list of XML objects to compare.
- """
- if not other:
- return False
-
- # Compare multiple objects
- if len(other) > 1:
- for xml2 in other:
- if not self.compare(xml, xml2):
- return False
- return True
-
- other = other[0]
-
- # Step 1: Check tags
- if xml.tag != other.tag:
- return False
-
- # Step 2: Check attributes
- if xml.attrib != other.attrib:
- return False
-
- # Step 3: Check text
- if xml.text is None:
- xml.text = ""
- if other.text is None:
- other.text = ""
- xml.text = xml.text.strip()
- other.text = other.text.strip()
-
- if xml.text != other.text:
- return False
-
- # Step 4: Check children count
- if len(list(xml)) != len(list(other)):
- return False
-
- # Step 5: Recursively check children
- for child in xml:
- child2s = other.findall("%s" % child.tag)
- if child2s is None:
- return False
- for child2 in child2s:
- if self.compare(child, child2):
- break
- else:
- return False
-
- # Step 6: Recursively check children the other way.
- for child in other:
- child2s = xml.findall("%s" % child.tag)
- if child2s is None:
- return False
- for child2 in child2s:
- if self.compare(child, child2):
- break
- else:
- return False
-
- # Everything matches
- return True