#!/usr/bin/env python3 import collections import slixmpp import asyncio import logging import signal import atexit import lxml.etree import sys import io from functools import partial from slixmpp.xmlstream.matcher.base import MatcherBase class MatchAll(MatcherBase): """match everything""" def match(self, xml): return True class StanzaError(Exception): pass class XMPPComponent(slixmpp.BaseXMPP): """ XMPPComponent sending a “scenario” of stanzas, checking that the responses match the expected results. """ def __init__(self, scenario, biboumi): super().__init__(jid="biboumi.localhost", default_ns="jabber:component:accept") self.is_component = True self.stream_header = '' % ( 'xmlns="jabber:component:accept"', 'xmlns:stream="%s"' % self.stream_ns, self.boundjid, self.get_id()) self.stream_footer = "" self.register_handler(slixmpp.Callback('Match All', MatchAll(None), self.handle_incoming_stanza)) self.add_event_handler("session_end", self.on_end_session) asyncio.async(self.accept_routine()) self.scenario = scenario self.biboumi = biboumi # A callable, taking a stanza as argument and raising a StanzaError # exception if the test should fail. self.stanza_checker = None self.failed = False self.accepting_server = None def error(self, message): print("Failure: %s" % (message,)) self.scenario.steps = [] self.failed = True def on_end_session(self, event): self.loop.stop() def handle_incoming_stanza(self, stanza): if self.stanza_checker: try: self.stanza_checker(stanza) except StanzaError as e: self.error(e) self.stanza_checker = None self.run_scenario() def run_scenario(self): if scenario.steps: step = scenario.steps.pop(0) step(self, self.biboumi) else: self.biboumi.stop() @asyncio.coroutine def accept_routine(self): self.accepting_server = yield from self.loop.create_server(lambda: self, "127.0.0.1", "8811", reuse_address=True) def check_stanza_against_all_expected_xpaths(self): pass def check_xpath(xpaths, stanza): for xpath in xpaths: tree = lxml.etree.parse(io.StringIO(str(stanza))) matched = tree.xpath(xpath, namespaces={'re': 'http://exslt.org/regular-expressions', 'muc_user': 'http://jabber.org/protocol/muc#user'}) if not matched: raise StanzaError("Received stanza “%s” did not match expected xpath “%s”" % (stanza, xpath)) class Scenario: """Defines a list of actions that are executed in sequence, until one of them throws an exception, or until the end. An action can be something like “send a stanza”, “receive the next stanza and check that it matches the given XPath”, “send a signal”, “wait for the end of the process”, etc """ def __init__(self, name, steps): """ Steps is a list of 2-tuple: [(action, answer), (action, answer)] """ self.name = name self.steps = [] for elem in steps: if isinstance(elem, collections.Iterable): for step in elem: self.steps.append(step) else: self.steps.append(elem) class ProcessRunner: def __init__(self): self.process = None self.signal_sent = False @asyncio.coroutine def start(self): self.process = yield from self.create @asyncio.coroutine def wait(self): code = yield from self.process.wait() return code def stop(self): if not self.signal_sent: self.signal_sent = True if self.process: self.process.send_signal(signal.SIGINT) def __del__(self): self.stop() class BiboumiRunner(ProcessRunner): def __init__(self, name, with_valgrind): super().__init__() self.name = name self.fd = open("biboumi_%s_output.txt" % (name,), "w") if with_valgrind: self.create = asyncio.create_subprocess_exec("valgrind", "--leak-check=full", "--show-leak-kinds=all", "--errors-for-leak-kinds=all", "--error-exitcode=16", "./biboumi", "test.conf", stdin=None, stdout=self.fd, stderr=self.fd, loop=None, limit=None) else: self.create = asyncio.create_subprocess_exec("./biboumi", "test.conf", stdin=None, stdout=self.fd, stderr=self.fd, loop=None, limit=None) class IrcServerRunner(ProcessRunner): def __init__(self): super().__init__() self.create = asyncio.create_subprocess_exec("mammond", "--debug", "--nofork", "--config", "../tests/end_to_end/mammond.conf", stderr=asyncio.subprocess.PIPE) def send_stanza(stanza, xmpp, biboumi): xmpp.send_raw(stanza.format_map(common_replacements)) asyncio.get_event_loop().call_soon(xmpp.run_scenario) def expect_stanza(xpaths, xmpp, biboumi): if isinstance(xpaths, str): xmpp.stanza_checker = partial(check_xpath, [xpaths.format_map(common_replacements)]) elif isinstance(xpaths, tuple): xmpp.stanza_checker = partial(check_xpath, [xpath.format_map(common_replacements) for xpath in xpaths]) else: print("Warning, from argument type passed to expect_stanza: %s" % (type(xpaths))) class BiboumiTest: """ Spawns a biboumi process and a fake XMPP Component that will run a Scenario. It redirects the outputs of the subprocess into separated files, and detects any failure in the running of the scenario. """ def __init__(self, scenario, expected_code=0): self.scenario = scenario self.expected_code = 0 def run(self, with_valgrind=True): print("Running scenario: %s%s" % (self.scenario.name, " (with valgrind)" if with_valgrind else '')) # Redirect the slixmpp logging into a specific file output_filename = "slixmpp_%s_output.txt" % (self.scenario.name,) with open(output_filename, "w"): pass logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s', filename=output_filename) with open("test.conf", "w") as fd: fd.write(confs['basic']) # Start the XMPP component and biboumi biboumi = BiboumiRunner(scenario.name, with_valgrind) xmpp = XMPPComponent(self.scenario, biboumi) asyncio.get_event_loop().run_until_complete(biboumi.start()) asyncio.get_event_loop().call_soon(xmpp.run_scenario) xmpp.process() code = asyncio.get_event_loop().run_until_complete(biboumi.wait()) failed = False if not xmpp.failed: if code != self.expected_code: xmpp.error("Wrong return code from biboumi's process: %d" % (code,)) failed = True else: print("Success!") else: failed = True if xmpp.server: xmpp.accepting_server.close() return not failed confs = {'basic': """hostname=biboumi.localhost password=coucou db_name=biboumi.sqlite port=8811"""} common_replacements = { 'irc_server_one': 'irc.localhost@biboumi.localhost', 'irc_host_one': 'irc.localhost', 'resource_one': 'resource1', 'nick_one': 'Nick', 'jid_one': 'first@example.com', 'jid_two': 'second@example.com', 'nick_two': 'Bobby', } def handshake_sequence(): return (partial(expect_stanza, "//handshake"), partial(send_stanza, "")) def connection_sequence(irc_host, jid): jid = jid.format_map(common_replacements) xpath = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[text()='%s']" xpath_re = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[re:test(text(), '%s')]" return ( partial(expect_stanza, xpath % ('Connecting to %s:6697 (encrypted)' % irc_host)), partial(expect_stanza, xpath % ('Connection failed: Connection refused')), partial(expect_stanza, xpath % ('Connecting to %s:6670 (encrypted)' % irc_host)), partial(expect_stanza, xpath % ('Connection failed: Connection refused')), partial(expect_stanza, xpath % ('Connecting to %s:6667 (not encrypted)' % irc_host)), partial(expect_stanza, xpath % ('Connected to IRC server.')), partial(expect_stanza, xpath % ('%s: *** Looking up your hostname...' % irc_host)), partial(expect_stanza, xpath % ('%s: *** Checking Ident' % irc_host)), # These three messages can be received in any order partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: Your host is .*$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: This server was started at .*$' % irc_host)), partial(expect_stanza, xpath % ("- Default MOTD\n")), ) if __name__ == '__main__': atexit.register(asyncio.get_event_loop().close) # Start the test component, accepting connections on the configured # port. scenarios = ( Scenario("basic_handshake_success", [ handshake_sequence() ]), Scenario("irc_server_connection", [ handshake_sequence(), partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), ]), Scenario("simple_channel_join", [ handshake_sequence(), partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~nick@localhost.localdomain'][@role='participant']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), ]), Scenario("channel_join_with_two_users", [ handshake_sequence(), # First user joins partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~nick@localhost.localdomain'][@role='participant']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), # Second user joins partial(send_stanza, ""), # connection_sequence("irc.localhost", '{jid_two}/{resource_one}'), ]), ) failures = 0 irc = IrcServerRunner() print("Starting mammond server…") asyncio.get_event_loop().run_until_complete(irc.start()) while True: res = asyncio.get_event_loop().run_until_complete(irc.process.stderr.readline()) if b"init finished..." in res: break print("mammond server started.") print("Running %s checks for biboumi." % (len(scenarios))) for scenario in scenarios: test = BiboumiTest(scenario) if not test.run(False): print("You can check the files slixmpp_%s_output.txt and biboumi_%s_output.txt to help you debug." % (scenario.name, scenario.name)) failures += 1 print("Waiting for mammond to exit…") irc.stop() code = asyncio.get_event_loop().run_until_complete(irc.wait()) if failures: print("%d test%s failed, please fix %s." % (failures, 's' if failures > 1 else '', 'them' if failures > 1 else 'it')) sys.exit(1) else: print("All tests passed successfully")