diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/sleektest.py | 299 | ||||
-rw-r--r-- | tests/test_addresses.py | 110 | ||||
-rw-r--r-- | tests/test_chatstates.py | 44 | ||||
-rw-r--r-- | tests/test_disco.py | 113 | ||||
-rw-r--r-- | tests/test_forms.py | 90 | ||||
-rw-r--r-- | tests/test_gmail.py | 88 | ||||
-rw-r--r-- | tests/test_stream.py | 34 |
7 files changed, 732 insertions, 46 deletions
diff --git a/tests/sleektest.py b/tests/sleektest.py new file mode 100644 index 00000000..eef3b900 --- /dev/null +++ b/tests/sleektest.py @@ -0,0 +1,299 @@ +""" + SleekXMPP: The Sleek XMPP Library + Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout + This file is part of SleekXMPP. + + See the file license.txt for copying permission. +""" + +import unittest +import socket +try: + import queue +except ImportError: + import Queue as queue +from xml.etree import cElementTree as ET +from sleekxmpp import ClientXMPP +from sleekxmpp import Message, Iq +from sleekxmpp.stanza.presence import Presence +from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath + +class TestSocket(object): + + def __init__(self, *args, **kwargs): + self.socket = socket.socket(*args, **kwargs) + self.recv_queue = queue.Queue() + self.send_queue = queue.Queue() + + def __getattr__(self, name): + """Pass requests through to actual socket""" + # Override a few methods to prevent actual socket connections + overrides = {'connect': lambda *args: None, + 'close': lambda *args: None, + 'shutdown': lambda *args: None} + return overrides.get(name, getattr(self.socket, name)) + + # ------------------------------------------------------------------ + # Testing Interface + + def nextSent(self, timeout=None): + """Get the next stanza that has been 'sent'""" + args = {'block': False} + if timeout is not None: + args = {'block': True, 'timeout': timeout} + try: + return self.send_queue.get(**args) + except: + return None + + def recvData(self, data): + """Add data to the receiving queue""" + self.recv_queue.put(data) + + # ------------------------------------------------------------------ + # Socket Interface + + def recv(self, *args, **kwargs): + return self.read(block=True) + + def send(self, data): + self.send_queue.put(data) + + # ------------------------------------------------------------------ + # File Socket + + def makefile(self, mode='r', bufsize=-1): + """File socket version to use with ElementTree""" + return self + + def read(self, size=4096, block=True, timeout=None): + """Implement the file socket interface""" + if timeout is not None: + block = True + try: + return self.recv_queue.get(block, timeout) + except: + return None + +class TestStream(object): + """Dummy class to pass a stream object to created stanzas""" + + def __init__(self): + self.default_ns = 'jabber:client' + + +class SleekTest(unittest.TestCase): + """ + A SleekXMPP specific TestCase class that provides + methods for comparing message, iq, and presence stanzas. + """ + + def stanzaPlugin(self, stanza, plugin): + """ + Associate a stanza object as a plugin for another stanza. + """ + tag = "{%s}%s" % (plugin.namespace, plugin.name) + stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin + stanza.plugin_tag_map[tag] = plugin + + # ------------------------------------------------------------------ + # Shortcut methods for creating stanza objects + + def Message(self, *args, **kwargs): + """Create a message stanza.""" + return Message(None, *args, **kwargs) + + def Iq(self, *args, **kwargs): + """Create an iq stanza.""" + return Iq(None, *args, **kwargs) + + def Presence(self, *args, **kwargs): + """Create a presence stanza.""" + return Presence(None, *args, **kwargs) + + # ------------------------------------------------------------------ + # Methods for comparing stanza objects to XML strings + + def checkMessage(self, msg, xml_string, use_values=True): + """ + Create and compare several message stanza objects to a + correct XML string. + + If use_values is False, the test using getValues() and + setValues() will not be used. + """ + + self.fix_namespaces(msg.xml, 'jabber:client') + debug = "Given Stanza:\n%s\n" % ET.tostring(msg.xml) + + xml = ET.fromstring(xml_string) + self.fix_namespaces(xml, 'jabber:client') + + debug += "XML String:\n%s\n" % ET.tostring(xml) + + msg2 = self.Message(xml) + debug += "Constructed Stanza:\n%s\n" % ET.tostring(msg2.xml) + + if use_values: + # Ugly, but need to make sure the type attribute is set. + msg['type'] = msg['type'] + if xml.attrib.get('type', None) is None: + xml.attrib['type'] = 'normal' + + values = msg2.getValues() + msg3 = self.Message() + msg3.setValues(values) + + debug += "Second Constructed Stanza:\n%s\n" % ET.tostring(msg3.xml) + debug = "Three methods for creating stanza do not match:\n" + debug + self.failUnless(self.compare([xml, msg.xml, msg2.xml, msg3.xml]), + debug) + else: + debug = "Two methods for creating stanza do not match:\n" + debug + self.failUnless(self.compare([xml, msg.xml, msg2.xml]), debug) + + def checkIq(self, iq, xml_string, use_values=True): + """ + Create and compare several iq stanza objects to a + correct XML string. + + If use_values is False, the test using getValues() and + setValues() will not be used. + """ + + self.fix_namespaces(iq.xml, 'jabber:client') + debug = "Given Stanza:\n%s\n" % ET.tostring(iq.xml) + + xml = ET.fromstring(xml_string) + self.fix_namespaces(xml, 'jabber:client') + debug += "XML String:\n%s\n" % ET.tostring(xml) + + iq2 = self.Iq(xml) + debug += "Constructed Stanza:\n%s\n" % ET.tostring(iq2.xml) + + if use_values: + values = iq.getValues() + iq3 = self.Iq() + iq3.setValues(values) + + debug += "Second Constructed Stanza:\n%s\n" % ET.tostring(iq3.xml) + debug = "Three methods for creating stanza do not match:\n" + debug + self.failUnless(self.compare([xml, iq.xml, iq2.xml, iq3.xml]), + debug) + else: + debug = "Two methods for creating stanza do not match:\n" + debug + self.failUnless(self.compare([xml, iq.xml, iq2.xml]), debug) + + def checkPresence(self, pres, xml_string, use_values=True): + """ + Create and compare several presence stanza objects to a + correct XML string. + + If use_values is False, the test using getValues() and + setValues() will not be used. + """ + pass + + # ------------------------------------------------------------------ + # Methods for simulating stanza streams. + + def streamStart(self, mode='client', skip=True): + if mode == 'client': + self.xmpp = ClientXMPP('tester@localhost', 'test') + self.xmpp.setSocket(TestSocket()) + + self.xmpp.state.set('reconnect', False) + self.xmpp.state.set('is client', True) + self.xmpp.state.set('connected', True) + + # Must have the stream header ready for xmpp.process() to work + self.xmpp.socket.recvData(self.xmpp.stream_header) + + self.xmpp.connectTCP = lambda a, b, c, d: True + self.xmpp.startTLS = lambda: True + self.xmpp.process(threaded=True) + if skip: + # Clear startup stanzas + self.xmpp.socket.nextSent(timeout=1) + + def streamRecv(self, data): + data = str(data) + self.xmpp.socket.recvData(data) + + def streamSendMessage(self, data, use_values=True, timeout=.5): + if isinstance(data, str): + data = self.Message(xml=ET.fromstring(data)) + sent = self.xmpp.socket.nextSent(timeout=1) + self.checkMessage(data, sent, use_values) + + def streamSendIq(self, data, use_values=True, timeout=.5): + if isinstance(data, str): + data = self.Iq(xml=ET.fromstring(data)) + sent = self.xmpp.socket.nextSent(timeout) + self.checkIq(data, sent, use_values) + + def streamSendPresence(self, data, use_values=True, timeout=.5): + if isinstance(data, str): + data = self.Presence(xml=ET.fromstring(data)) + sent = self.xmpp.socket.nextSent(timeout) + self.checkPresence(data, sent, use_values) + + def streamClose(self): + if self.xmpp is not None: + self.xmpp.disconnect() + self.xmpp.socket.recvData(self.xmpp.stream_footer) + + # ------------------------------------------------------------------ + # XML Comparison and Cleanup + + def fix_namespaces(self, xml, ns): + """ + Assign a namespace to an element and any children that + don't have a namespace. + """ + if xml.tag.startswith('{'): + return + xml.tag = '{%s}%s' % (ns, xml.tag) + for child in xml.getchildren(): + self.fix_namespaces(child, ns) + + def compare(self, xml1, xml2=None): + """ + Compare XML objects. + + If given a list of XML objects, then + all of the elements in the list will be + compared. + """ + + # Compare multiple objects + if type(xml1) is list: + xmls = xml1 + xml1 = xmls[0] + for xml in xmls[1:]: + xml2 = xml + if not self.compare(xml1, xml2): + return False + return True + + # Step 1: Check tags + if xml1.tag != xml2.tag: + return False + + # Step 2: Check attributes + if xml1.attrib != xml2.attrib: + return False + + # Step 3: Recursively check children + for child in xml1: + child2s = xml2.findall("%s" % child.tag) + if child2s is None: + return False + for child2 in child2s: + if self.compare(child, child2): + break + else: + return False + + # Everything matches + return True diff --git a/tests/test_addresses.py b/tests/test_addresses.py new file mode 100644 index 00000000..2718bb19 --- /dev/null +++ b/tests/test_addresses.py @@ -0,0 +1,110 @@ +from sleektest import * +import sleekxmpp.plugins.xep_0033 as xep_0033 + + +class TestAddresses(SleekTest): + + def setUp(self): + self.stanzaPlugin(Message, xep_0033.Addresses) + + def testAddAddress(self): + """Testing adding extended stanza address.""" + msg = self.Message() + msg['addresses'].addAddress(atype='to', jid='to@header1.org') + self.checkMessage(msg, """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address jid="to@header1.org" type="to" /> + </addresses> + </message> + """) + + msg = self.Message() + msg['addresses'].addAddress(atype='replyto', + jid='replyto@header1.org', + desc='Reply address') + self.checkMessage(msg, """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address jid="replyto@header1.org" type="replyto" desc="Reply address" /> + </addresses> + </message> + """) + + def testAddAddresses(self): + """Testing adding multiple extended stanza addresses.""" + + xmlstring = """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address jid="replyto@header1.org" type="replyto" desc="Reply address" /> + <address jid="cc@header2.org" type="cc" /> + <address jid="bcc@header2.org" type="bcc" /> + </addresses> + </message> + """ + + msg = self.Message() + msg['addresses'].setAddresses([{'type':'replyto', + 'jid':'replyto@header1.org', + 'desc':'Reply address'}, + {'type':'cc', + 'jid':'cc@header2.org'}, + {'type':'bcc', + 'jid':'bcc@header2.org'}]) + self.checkMessage(msg, xmlstring) + + msg = self.Message() + msg['addresses']['replyto'] = [{'jid':'replyto@header1.org', + 'desc':'Reply address'}] + msg['addresses']['cc'] = [{'jid':'cc@header2.org'}] + msg['addresses']['bcc'] = [{'jid':'bcc@header2.org'}] + self.checkMessage(msg, xmlstring) + + def testAddURI(self): + """Testing adding URI attribute to extended stanza address.""" + + msg = self.Message() + addr = msg['addresses'].addAddress(atype='to', + jid='to@header1.org', + node='foo') + self.checkMessage(msg, """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address node="foo" jid="to@header1.org" type="to" /> + </addresses> + </message> + """) + + addr['uri'] = 'mailto:to@header2.org' + self.checkMessage(msg, """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address type="to" uri="mailto:to@header2.org" /> + </addresses> + </message> + """) + + def testDelivered(self): + """Testing delivered attribute of extended stanza addresses.""" + + xmlstring = """ + <message> + <addresses xmlns="http://jabber.org/protocol/address"> + <address %s jid="to@header1.org" type="to" /> + </addresses> + </message> + """ + + msg = self.Message() + addr = msg['addresses'].addAddress(jid='to@header1.org', atype='to') + self.checkMessage(msg, xmlstring % '') + + addr['delivered'] = True + self.checkMessage(msg, xmlstring % 'delivered="true"') + + addr['delivered'] = False + self.checkMessage(msg, xmlstring % '') + + +suite = unittest.TestLoader().loadTestsFromTestCase(TestAddresses) diff --git a/tests/test_chatstates.py b/tests/test_chatstates.py new file mode 100644 index 00000000..1e585be4 --- /dev/null +++ b/tests/test_chatstates.py @@ -0,0 +1,44 @@ +from sleektest import * +import sleekxmpp.plugins.xep_0085 as xep_0085 + +class TestChatStates(SleekTest): + + def setUp(self): + self.stanzaPlugin(Message, xep_0085.Active) + self.stanzaPlugin(Message, xep_0085.Composing) + self.stanzaPlugin(Message, xep_0085.Gone) + self.stanzaPlugin(Message, xep_0085.Inactive) + self.stanzaPlugin(Message, xep_0085.Paused) + + def testCreateChatState(self): + """Testing creating chat states.""" + + xmlstring = """ + <message> + <%s xmlns="http://jabber.org/protocol/chatstates" /> + </message> + """ + + msg = self.Message() + msg['chat_state'].active() + self.checkMessage(msg, xmlstring % 'active', + use_values=False) + + msg['chat_state'].composing() + self.checkMessage(msg, xmlstring % 'composing', + use_values=False) + + + msg['chat_state'].gone() + self.checkMessage(msg, xmlstring % 'gone', + use_values=False) + + msg['chat_state'].inactive() + self.checkMessage(msg, xmlstring % 'inactive', + use_values=False) + + msg['chat_state'].paused() + self.checkMessage(msg, xmlstring % 'paused', + use_values=False) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestChatStates) diff --git a/tests/test_disco.py b/tests/test_disco.py index bbe285a6..6daad13e 100644 --- a/tests/test_disco.py +++ b/tests/test_disco.py @@ -1,96 +1,118 @@ -import unittest -from xml.etree import cElementTree as ET -from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath -from . import xmlcompare +from sleektest import * +import sleekxmpp.plugins.xep_0030 as xep_0030 -import sleekxmpp.plugins.xep_0030 as sd -def stanzaPlugin(stanza, plugin): - stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin - stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin - -class testdisco(unittest.TestCase): +class TestDisco(SleekTest): def setUp(self): - self.sd = sd - stanzaPlugin(self.sd.Iq, self.sd.DiscoInfo) - stanzaPlugin(self.sd.Iq, self.sd.DiscoItems) - - def try3Methods(self, xmlstring, iq): - iq2 = self.sd.Iq(None, self.sd.ET.fromstring(xmlstring)) - values = iq2.getValues() - iq3 = self.sd.Iq() - iq3.setValues(values) - self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3), str(iq)+"3 methods for creating stanza don't match") + self.stanzaPlugin(Iq, xep_0030.DiscoInfo) + self.stanzaPlugin(Iq, xep_0030.DiscoItems) def testCreateInfoQueryNoNode(self): """Testing disco#info query with no node.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_info']['node'] = '' - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" /></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#info" /> + </iq> + """) def testCreateInfoQueryWithNode(self): """Testing disco#info query with a node.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_info']['node'] = 'foo' - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo" /></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#info" node="foo" /> + </iq> + """) def testCreateInfoQueryNoNode(self): """Testing disco#items query with no node.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_items']['node'] = '' - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" /></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#items" /> + </iq> + """) def testCreateItemsQueryWithNode(self): """Testing disco#items query with a node.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_items']['node'] = 'foo' - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo" /></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#items" node="foo" /> + </iq> + """) def testInfoIdentities(self): """Testing adding identities to disco#info.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_info']['node'] = 'foo' iq['disco_info'].addIdentity('conference', 'text', 'Chatroom') - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><identity category="conference" type="text" name="Chatroom" /></query></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#info" node="foo"> + <identity category="conference" type="text" name="Chatroom" /> + </query> + </iq> + """) def testInfoFeatures(self): """Testing adding features to disco#info.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_info']['node'] = 'foo' iq['disco_info'].addFeature('foo') iq['disco_info'].addFeature('bar') - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><feature var="foo" /><feature var="bar" /></query></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#info" node="foo"> + <feature var="foo" /> + <feature var="bar" /> + </query> + </iq> + """) def testItems(self): """Testing adding features to disco#info.""" - iq = self.sd.Iq() + iq = self.Iq() iq['id'] = "0" iq['disco_items']['node'] = 'foo' iq['disco_items'].addItem('user@localhost') iq['disco_items'].addItem('user@localhost', 'foo') iq['disco_items'].addItem('user@localhost', 'bar', 'Testing') - xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo"><item jid="user@localhost" /><item node="foo" jid="user@localhost" /><item node="bar" jid="user@localhost" name="Testing" /></query></iq>""" - self.try3Methods(xmlstring, iq) + + self.checkIq(iq, """ + <iq id="0"> + <query xmlns="http://jabber.org/protocol/disco#items" node="foo"> + <item jid="user@localhost" /> + <item node="foo" jid="user@localhost" /> + <item node="bar" jid="user@localhost" name="Testing" /> + </query> + </iq> + """) def testAddRemoveIdentities(self): """Test adding and removing identities to disco#info stanza""" ids = [('automation', 'commands', 'AdHoc'), ('conference', 'text', 'ChatRoom')] - info = self.sd.DiscoInfo() + info = xep_0030.DiscoInfo() info.addIdentity(*ids[0]) self.failUnless(info.getIdentities() == [ids[0]]) @@ -110,7 +132,7 @@ class testdisco(unittest.TestCase): """Test adding and removing features to disco#info stanza""" features = ['foo', 'bar', 'baz'] - info = self.sd.DiscoInfo() + info = xep_0030.DiscoInfo() info.addFeature(features[0]) self.failUnless(info.getFeatures() == [features[0]]) @@ -132,7 +154,7 @@ class testdisco(unittest.TestCase): ('user@localhost', 'foo', None), ('user@localhost', 'bar', 'Test')] - info = self.sd.DiscoItems() + info = xep_0030.DiscoItems() self.failUnless(True, ""+str(items[0])) info.addItem(*(items[0])) @@ -151,5 +173,4 @@ class testdisco(unittest.TestCase): self.failUnless(info.getItems() == []) - -suite = unittest.TestLoader().loadTestsFromTestCase(testdisco) +suite = unittest.TestLoader().loadTestsFromTestCase(TestDisco) diff --git a/tests/test_forms.py b/tests/test_forms.py new file mode 100644 index 00000000..981d8870 --- /dev/null +++ b/tests/test_forms.py @@ -0,0 +1,90 @@ +from sleektest import * +import sleekxmpp.plugins.alt_0004 as xep_0004 + + +class TestDataForms(SleekTest): + + def setUp(self): + self.stanzaPlugin(Message, xep_0004.Form) + self.stanzaPlugin(xep_0004.Form, xep_0004.FormField) + self.stanzaPlugin(xep_0004.FormField, xep_0004.FieldOption) + + def testMultipleInstructions(self): + """Testing using multiple instructions elements in a data form.""" + msg = self.Message() + msg['form']['instructions'] = "Instructions\nSecond batch" + + self.checkMessage(msg, """ + <message> + <x xmlns="jabber:x:data"> + <instructions>Instructions</instructions> + <instructions>Second batch</instructions> + </x> + </message> + """, use_values=False) + + def testAddField(self): + """Testing adding fields to a data form.""" + + msg = self.Message() + form = msg['form'] + form.addField('f1', + ftype='text-single', + label='Text', + desc='A text field', + required=True, + value='Some text!') + + self.checkMessage(msg, """ + <message> + <x xmlns="jabber:x:data"> + <field var="f1" type="text-single" label="Text"> + <desc>A text field</desc> + <required /> + <value>Some text!</value> + </field> + </x> + </message> + """, use_values=False) + + form['fields'] = {'f1': {'type': 'text-single', + 'label': 'Username', + 'required': True}, + 'f2': {'type': 'text-private', + 'label': 'Password', + 'required': True}, + 'f3': {'type': 'text-multi', + 'label': 'Message', + 'value': 'Enter message.\nA long one even.'}, + 'f4': {'type': 'list-single', + 'label': 'Message Type', + 'options': [{'label': 'Cool!', + 'value': 'cool'}, + {'label': 'Urgh!', + 'value': 'urgh'}]}} + self.checkMessage(msg, """ + <message> + <x xmlns="jabber:x:data"> + <field var="f1" type="text-single" label="Username"> + <required /> + </field> + <field var="f2" type="text-private" label="Password"> + <required /> + </field> + <field var="f3" type="text-multi" label="Message"> + <value>Enter message.</value> + <value>A long one even.</value> + </field> + <field var="f4" type="list-single" label="Message Type"> + <option label="Cool!"> + <value>cool</value> + </option> + <option label="Urgh!"> + <value>urgh</value> + </option> + </field> + </x> + </message> + """, use_values=False) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestDataForms) diff --git a/tests/test_gmail.py b/tests/test_gmail.py new file mode 100644 index 00000000..199b76ae --- /dev/null +++ b/tests/test_gmail.py @@ -0,0 +1,88 @@ +from sleektest import * +import sleekxmpp.plugins.gmail_notify as gmail + + +class TestGmail(SleekTest): + + def setUp(self): + self.stanzaPlugin(Iq, gmail.GmailQuery) + self.stanzaPlugin(Iq, gmail.MailBox) + self.stanzaPlugin(Iq, gmail.NewMail) + + def testCreateQuery(self): + """Testing querying Gmail for emails.""" + + iq = self.Iq() + iq['type'] = 'get' + iq['gmail']['search'] = 'is:starred' + iq['gmail']['newer-than-time'] = '1140638252542' + iq['gmail']['newer-than-tid'] = '11134623426430234' + + self.checkIq(iq, """ + <iq type="get"> + <query xmlns="google:mail:notify" + newer-than-time="1140638252542" + newer-than-tid="11134623426430234" + q="is:starred" /> + </iq> + """) + + def testMailBox(self): + """Testing reading from Gmail mailbox result""" + + # Use the example from Google's documentation at + # http://code.google.com/apis/talk/jep_extensions/gmail.html#notifications + xml = ET.fromstring(""" + <iq type="result"> + <mailbox xmlns="google:mail:notify" + result-time='1118012394209' + url='http://mail.google.com/mail' + total-matched='95' + total-estimate='0'> + <mail-thread-info tid='1172320964060972012' + participation='1' + messages='28' + date='1118012394209' + url='http://mail.google.com/mail?view=cv'> + <senders> + <sender name='Me' address='romeo@gmail.com' originator='1' /> + <sender name='Benvolio' address='benvolio@gmail.com' /> + <sender name='Mercutio' address='mercutio@gmail.com' unread='1'/> + </senders> + <labels>act1scene3</labels> + <subject>Put thy rapier up.</subject> + <snippet>Ay, ay, a scratch, a scratch; marry, 'tis enough.</snippet> + </mail-thread-info> + </mailbox> + </iq> + """) + + iq = self.Iq(xml=xml) + mailbox = iq['mailbox'] + self.failUnless(mailbox['result-time'] == '1118012394209', "result-time doesn't match") + self.failUnless(mailbox['url'] == 'http://mail.google.com/mail', "url doesn't match") + self.failUnless(mailbox['matched'] == '95', "total-matched incorrect") + self.failUnless(mailbox['estimate'] == False, "total-estimate incorrect") + self.failUnless(len(mailbox['threads']) == 1, "could not extract message threads") + + thread = mailbox['threads'][0] + self.failUnless(thread['tid'] == '1172320964060972012', "thread tid doesn't match") + self.failUnless(thread['participation'] == '1', "thread participation incorrect") + self.failUnless(thread['messages'] == '28', "thread message count incorrect") + self.failUnless(thread['date'] == '1118012394209', "thread date doesn't match") + self.failUnless(thread['url'] == 'http://mail.google.com/mail?view=cv', "thread url doesn't match") + self.failUnless(thread['labels'] == 'act1scene3', "thread labels incorrect") + self.failUnless(thread['subject'] == 'Put thy rapier up.', "thread subject doesn't match") + self.failUnless(thread['snippet'] == "Ay, ay, a scratch, a scratch; marry, 'tis enough.", "snippet doesn't match") + self.failUnless(len(thread['senders']) == 3, "could not extract senders") + + sender1 = thread['senders'][0] + self.failUnless(sender1['name'] == 'Me', "sender name doesn't match") + self.failUnless(sender1['address'] == 'romeo@gmail.com', "sender address doesn't match") + self.failUnless(sender1['originator'] == True, "sender originator incorrect") + self.failUnless(sender1['unread'] == False, "sender unread incorrectly True") + + sender2 = thread['senders'][2] + self.failUnless(sender2['unread'] == True, "sender unread incorrectly False") + +suite = unittest.TestLoader().loadTestsFromTestCase(TestGmail) diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 00000000..eb4aaa59 --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,34 @@ +from sleektest import * +import sleekxmpp.plugins.xep_0033 as xep_0033 + + +class TestStreamTester(SleekTest): + """ + Test that we can simulate and test a stanza stream. + """ + + def setUp(self): + self.streamStart() + + def tearDown(self): + self.streamClose() + + def testEcho(self): + def echo(msg): + msg.reply('Thanks for sending: %(body)s' % msg).send() + + self.xmpp.add_event_handler('message', echo) + + self.streamRecv(""" + <message to="tester@localhost" from="user@localhost"> + <body>Hi!</body> + </message> + """) + + self.streamSendMessage(""" + <message to="user@localhost" from="tester@localhost"> + <body>Thanks for sending: Hi!</body> + </message> + """) + +suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamTester) |