import unittest
import logging
from slixmpp import JID
from slixmpp.test import SlixTest
from slixmpp.plugins import xep_0100
from slixmpp.plugins.xep_0100 import LegacyError
class TestStreamGateway(SlixTest):
def setUp(self):
self.stream_start(
mode="component",
plugins=["xep_0077", "xep_0100"],
jid="aim.shakespeare.lit",
server="shakespeare.lit",
plugin_config={
"xep_0100": {"component_name": "AIM Gateway", "type": "aim"}
},
)
def next_sent(self):
self.wait_for_send_queue()
sent = self.xmpp.socket.next_sent(timeout=0.5)
if sent is None:
return None
xml = self.parse_xml(sent)
self.fix_namespaces(xml, "jabber:component:accept")
sent = self.xmpp._build_stanza(xml, "jabber:component:accept")
return sent
def testDisco(self):
# https://xmpp.org/extensions/xep-0100.html#example-3
self.recv(
"""
"""
)
self.send(
"""
"""
)
def testRegister(self):
event_result = {}
def legacy_login(iq):
event_result["user"] = iq["from"]
self.xmpp.add_event_handler("legacy_login", legacy_login)
# Jabber User sends IQ-set qualified by the 'jabber:iq:register' namespace to Gateway,
# containing information required to register.
# https://xmpp.org/extensions/xep-0100.html#example-7
self.recv(
"""
RomeoMyRomeo
ILoveJuliet
"""
)
# Gateway verifies that registration information provided by Jabber User is valid
# (using whatever means appropriate for the Legacy Service) and informs Jabber User of success [A1].
# https://xmpp.org/extensions/xep-0100.html#example-8
self.send(
"""
"""
)
# Gateway sends subscription request to Jabber User (i.e., by sending a presence stanza
# of type "subscribe" to Jabber User's bare JID).
# https://xmpp.org/extensions/xep-0100.html#example-11
sent = self.next_sent()
self.check(
sent, "/presence@type=subscribe@from=aim.shakespeare.lit", "stanzapath"
)
self.assertTrue(
sent["to"] == "romeo@montague.lit"
) # cannot use stanzapath because of @
# Jabber User's client SHOULD approve the subscription request (i.e., by sending a presence stanza
# of type "subscribed" to Gateway).
self.recv(
"""
"""
)
# Jabber User sends subscription request to Gateway (i.e., by sending a presence stanza
# of type "subscribe" to Gateway).
self.recv(
"""
"""
)
# Gateway sends approves subscription request (i.e., by sending a presence stanza of type
# "subscribed" to Jabber User's bare JID).
sent = self.next_sent()
self.check(
sent, "/presence@type=subscribed@from=aim.shakespeare.lit", "stanzapath"
)
self.assertTrue(
sent["to"] == "romeo@montague.lit"
) # cannot use stanzapath because of @
self.assertTrue(
self.xmpp.client_roster["romeo@montague.lit"]["subscription"] == "both"
)
self.recv(
"""
"""
)
self.assertTrue(event_result["user"] == "romeo@montague.lit/orchard")
def testBadCredentials(self):
def raise_v(*a, **kwa):
raise ValueError("Not good")
self.xmpp["xep_0077"].api.register(raise_v, "user_validate")
self.recv(
"""
RomeoMyRomeo
ILoveJuliet
"""
)
# xmlns="jabber:client" in error substanza, bug in XEP-0077 plugin or OK?
self.send(
"""
RomeoMyRomeo
ILoveJuliet
Not good
""",
use_values=False,
)
def testLogin(self):
event_result = {}
def legacy_login(presence):
event_result["user"] = presence["from"]
self.xmpp.add_event_handler("legacy_login", legacy_login)
self.xmpp["xep_0077"].api["user_validate"](
None,
None,
JID("romeo@montague.lit"),
{"username": "RomeoMyRomeo", "password": "ILoveJuliet"},
)
# Jabber User sends available presence broadcast to Server or sends
# directed presence to Gateway or a Legacy User.
# https://xmpp.org/extensions/xep-0100.html#example-26
self.recv(
"""
"""
)
# Gateway sends presence stanza to Jabber User expressing availability.
# https://xmpp.org/extensions/xep-0100.html#example-27
self.send(
"""
0
"""
)
self.assertTrue(event_result["user"] == "romeo@montague.lit/orchard")
def testLogout(self):
self.add_user()
event_result = {}
def legacy_logout(presence):
event_result["user"] = presence["from"]
self.xmpp.add_event_handler("legacy_logout", legacy_logout)
# Jabber User sends available presence broadcast to Server or sends
# directed presence to Gateway or a Legacy User.
# https://xmpp.org/extensions/xep-0100.html#example-32
self.recv(
"""
"""
)
# Gateway sends presence stanza of type "unavailable" to Jabber User.
# https://xmpp.org/extensions/xep-0100.html#example-33
self.send(
"""
0
"""
)
self.assertTrue(event_result["user"] == "romeo@montague.lit/orchard")
def testAddContact(self):
self.add_user()
# Had to lowercase capuletnurse everywhere
# Jabber User sends presence stanza of type "subscribe" to Legacy User.
self.recv(
"""
"""
)
# If Legacy User approves subscription request, Gateway sends presence stanza of
# type "subscribed" to Jabber User on behalf of Legacy User. [A1]
self.send(
"""
"""
)
# Had to remove the resource here
self.send(
"""
"""
)
self.send(
"""
"""
)
self.recv(
"""
"""
)
def testAddContactFail(self):
self.add_user()
res = {}
async def legacy_contact_add(jid, node, ifrom, contact_jid):
res.update(**locals())
raise LegacyError
self.xmpp["xep_0100"].api.register(
legacy_contact_add, "legacy_contact_add"
)
self.recv(
"""
"""
)
self.send(
"""
"""
)
self.assertTrue(res["ifrom"] == "romeo@montague.lit")
self.assertTrue(res["contact_jid"] == "juliet@aim.shakespeare.lit")
def testRemoveContact(self):
self.add_user()
result = {}
# Jabber User sends IQ-set qualified by the 'jabber:iq:roster' namespace, containing subscription
# attribute with value of "remove".
async def legacy_contact_remove(jid, node, ifrom, contact_jid):
result.update(**locals())
self.xmpp["xep_0100"].api.register(
legacy_contact_remove, "legacy_contact_remove"
)
# Jabber User sends IQ-set qualified by the 'jabber:iq:roster' namespace, containing subscription
# attribute with value of "remove".
self.recv( # server sends this
"""
"""
)
for ptype in "unsubscribe", "unsubscribed", "unavailable":
self.send( # server sends this
f"""
"""
)
self.assertTrue(result["ifrom"] == "romeo@montague.lit")
self.assertTrue(
result["contact_jid"] == JID("CapuletNurse@aim.shakespeare.lit")
)
def testSendMessage(self):
self.xmpp["xep_0100"].transform_legacy_message(
jabber_user_jid="romeo@montague.lit",
legacy_contact_id="juliet",
body="Art thou not Romeo, and a Montague?",
)
self.send(
"""
Art thou not Romeo, and a Montague?
"""
)
def testLegacyMessage(self):
self.add_user()
result = {}
def legacy_message(msg):
result["msg"] = msg
self.xmpp.add_event_handler("legacy_message", legacy_message)
self.recv(
"""
Something shakespearian
"""
)
self.wait_for_send_queue()
self.assertTrue(result["msg"]["from"] == "romeo@montague.lit")
self.assertTrue(result["msg"]["to"] == "juliet@aim.shakespeare.lit")
def testPluginEnd(self):
exc = False
try:
self.xmpp.plugin.disable("xep_0100")
except Exception:
exc = True
self.assertFalse(exc)
def add_user(self):
self.xmpp.loop.run_until_complete(
self.xmpp["xep_0077"].api["user_validate"](
None,
None,
JID("romeo@montague.lit"),
{"username": "RomeoMyRomeo", "password": "ILoveJuliet"},
)
)
# TODO: edit reg
# TODO: unregister
# TODO: login fails
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamGateway)