#!/usr/bin/env python3 import collections import slixmpp import asyncio import logging import signal import atexit import lxml.etree import sys import io from functools import partial from slixmpp.xmlstream.matcher.base import MatcherBase class MatchAll(MatcherBase): """match everything""" def match(self, xml): return True class StanzaError(Exception): """ Raised when a step fails. """ pass class SkipStepError(Exception): """ Raised by a step when it needs to be skiped, by running the next available step immediately. """ pass class XMPPComponent(slixmpp.BaseXMPP): """ XMPPComponent sending a “scenario” of stanzas, checking that the responses match the expected results. """ def __init__(self, scenario, biboumi): super().__init__(jid="biboumi.localhost", default_ns="jabber:component:accept") self.is_component = True self.stream_header = '' % ( 'xmlns="jabber:component:accept"', 'xmlns:stream="%s"' % self.stream_ns, self.boundjid, self.get_id()) self.stream_footer = "" self.register_handler(slixmpp.Callback('Match All', MatchAll(None), self.handle_incoming_stanza)) self.add_event_handler("session_end", self.on_end_session) asyncio.async(self.accept_routine()) self.scenario = scenario self.biboumi = biboumi # A callable, taking a stanza as argument and raising a StanzaError # exception if the test should fail. self.stanza_checker = None self.failed = False self.accepting_server = None def error(self, message): print("Failure: %s" % (message,)) self.scenario.steps = [] self.failed = True def on_end_session(self, event): self.loop.stop() def handle_incoming_stanza(self, stanza): if self.stanza_checker: try: self.stanza_checker(stanza) except StanzaError as e: self.error(e) except SkipStepError: # Run the next step and then re-handle this same stanza self.run_scenario() return self.handle_incoming_stanza(stanza) self.stanza_checker = None self.run_scenario() def run_scenario(self): if scenario.steps: step = scenario.steps.pop(0) step(self, self.biboumi) else: self.biboumi.stop() @asyncio.coroutine def accept_routine(self): self.accepting_server = yield from self.loop.create_server(lambda: self, "127.0.0.1", "8811", reuse_address=True) def check_stanza_against_all_expected_xpaths(self): pass def check_xpath(xpaths, stanza): for xpath in xpaths: tree = lxml.etree.parse(io.StringIO(str(stanza))) matched = tree.xpath(xpath, namespaces={'re': 'http://exslt.org/regular-expressions', 'muc_user': 'http://jabber.org/protocol/muc#user'}) if not matched: raise StanzaError("Received stanza “%s” did not match expected xpath “%s”" % (stanza, xpath)) def check_xpath_optional(xpaths, stanza): try: check_xpath(xpaths, stanza) except StanzaError: raise SkipStepError() class Scenario: """Defines a list of actions that are executed in sequence, until one of them throws an exception, or until the end. An action can be something like “send a stanza”, “receive the next stanza and check that it matches the given XPath”, “send a signal”, “wait for the end of the process”, etc """ def __init__(self, name, steps): """ Steps is a list of 2-tuple: [(action, answer), (action, answer)] """ self.name = name self.steps = [] for elem in steps: if isinstance(elem, collections.Iterable): for step in elem: self.steps.append(step) else: self.steps.append(elem) class ProcessRunner: def __init__(self): self.process = None self.signal_sent = False @asyncio.coroutine def start(self): self.process = yield from self.create @asyncio.coroutine def wait(self): code = yield from self.process.wait() return code def stop(self): if not self.signal_sent: self.signal_sent = True if self.process: self.process.send_signal(signal.SIGINT) def __del__(self): self.stop() class BiboumiRunner(ProcessRunner): def __init__(self, name, with_valgrind): super().__init__() self.name = name self.fd = open("biboumi_%s_output.txt" % (name,), "w") if with_valgrind: self.create = asyncio.create_subprocess_exec("valgrind", "--leak-check=full", "--show-leak-kinds=all", "--errors-for-leak-kinds=all", "--error-exitcode=16", "./biboumi", "test.conf", stdin=None, stdout=self.fd, stderr=self.fd, loop=None, limit=None) else: self.create = asyncio.create_subprocess_exec("./biboumi", "test.conf", stdin=None, stdout=self.fd, stderr=self.fd, loop=None, limit=None) class IrcServerRunner(ProcessRunner): def __init__(self): super().__init__() self.create = asyncio.create_subprocess_exec("charybdis", "-foreground", "-configfile", "../tests/end_to_end/ircd.conf", stderr=asyncio.subprocess.PIPE) def send_stanza(stanza, xmpp, biboumi): xmpp.send_raw(stanza.format_map(common_replacements)) asyncio.get_event_loop().call_soon(xmpp.run_scenario) def expect_stanza(xpaths, xmpp, biboumi, optional=False): check_func = check_xpath if not optional else check_xpath_optional if isinstance(xpaths, str): xmpp.stanza_checker = partial(check_func, [xpaths.format_map(common_replacements)]) elif isinstance(xpaths, tuple): xmpp.stanza_checker = partial(check_func, [xpath.format_map(common_replacements) for xpath in xpaths]) else: print("Warning, from argument type passed to expect_stanza: %s" % (type(xpaths))) class BiboumiTest: """ Spawns a biboumi process and a fake XMPP Component that will run a Scenario. It redirects the outputs of the subprocess into separated files, and detects any failure in the running of the scenario. """ def __init__(self, scenario, expected_code=0): self.scenario = scenario self.expected_code = 0 def run(self, with_valgrind=True): print("Running scenario: %s%s" % (self.scenario.name, " (with valgrind)" if with_valgrind else '')) # Redirect the slixmpp logging into a specific file output_filename = "slixmpp_%s_output.txt" % (self.scenario.name,) with open(output_filename, "w"): pass logging.basicConfig(level=logging.DEBUG, format='%(levelname)-8s %(message)s', filename=output_filename) with open("test.conf", "w") as fd: fd.write(confs['basic']) # Start the XMPP component and biboumi biboumi = BiboumiRunner(scenario.name, with_valgrind) xmpp = XMPPComponent(self.scenario, biboumi) asyncio.get_event_loop().run_until_complete(biboumi.start()) asyncio.get_event_loop().call_soon(xmpp.run_scenario) xmpp.process() code = asyncio.get_event_loop().run_until_complete(biboumi.wait()) failed = False if not xmpp.failed: if code != self.expected_code: xmpp.error("Wrong return code from biboumi's process: %d" % (code,)) failed = True else: print("Success!") else: failed = True if xmpp.server: xmpp.accepting_server.close() return not failed confs = {'basic': """hostname=biboumi.localhost password=coucou db_name=biboumi.sqlite port=8811"""} common_replacements = { 'irc_server_one': 'irc.localhost@biboumi.localhost', 'irc_host_one': 'irc.localhost', 'resource_one': 'resource1', 'nick_one': 'Nick', 'jid_one': 'first@example.com', 'jid_two': 'second@example.com', 'nick_two': 'Bobby', } def handshake_sequence(): return (partial(expect_stanza, "//handshake"), partial(send_stanza, "")) def connection_sequence(irc_host, jid): jid = jid.format_map(common_replacements) xpath = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[text()='%s']" xpath_re = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[re:test(text(), '%s')]" return ( partial(expect_stanza, xpath % ('Connecting to %s:6697 (encrypted)' % irc_host)), partial(expect_stanza, xpath % 'Connection failed: Connection refused'), partial(expect_stanza, xpath % ('Connecting to %s:6670 (encrypted)' % irc_host)), partial(expect_stanza, xpath % 'Connection failed: Connection refused'), partial(expect_stanza, xpath % ('Connecting to %s:6667 (not encrypted)' % irc_host)), partial(expect_stanza, xpath % 'Connected to IRC server.'), # These two messages can be receive in any order partial(expect_stanza, xpath_re % (r'^%s: \*\*\* (Checking Ident|Looking up your hostname...)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: \*\*\* (Checking Ident|Looking up your hostname...)$' % irc_host)), # These three messages can be received in any order partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|ACK multi-prefix|\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|ACK multi-prefix|\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|ACK multi-prefix|\*\*\* No Ident response)$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: Your host is .*$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: This server was created .*$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: There are \d+ users and \d+ invisible on \d+ servers$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: \d+ channels formed$' % irc_host), optional=True), partial(expect_stanza, xpath_re % (r'^%s: I have \d+ clients and \d+ servers$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: \d+ \d+ Current local users \d+, max \d+$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: \d+ \d+ Current global users \d+, max \d+$' % irc_host)), partial(expect_stanza, xpath_re % (r'^%s: Highest connection count: \d+ \(\d+ clients\) \(\d+ connections received\)$' % irc_host)), partial(expect_stanza, xpath % "- This is charybdis MOTD you might replace it, but if not your friends will\n- laugh at you.\n"), partial(expect_stanza, xpath_re % r'^User mode for \w+ is \[\+i\]$'), ) if __name__ == '__main__': atexit.register(asyncio.get_event_loop().close) # Start the test component, accepting connections on the configured # port. scenarios = ( Scenario("basic_handshake_success", [ handshake_sequence() ]), Scenario("irc_server_connection", [ handshake_sequence(), partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), ]), Scenario("simple_channel_join", [ handshake_sequence(), partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), partial(expect_stanza, "/message/body[text()='Mode #foo [+nt] by {irc_host_one}']"), partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='admin'][@role='moderator']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), ]), Scenario("channel_join_with_two_users", [ handshake_sequence(), # First user joins partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), partial(expect_stanza, "/message/body[text()='Mode #foo [+nt] by {irc_host_one}']"), partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='admin'][@jid='~nick@localhost'][@role='moderator']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), # Second user joins partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_two}/{resource_one}'), # Our presence, sent to the other user partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_two}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~bobby@localhost'][@role='participant']",)), # The other user presence partial(expect_stanza, "/presence[@to='{jid_second}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~nick@localhost'][@role='participant']"), # Our own presence partial(expect_stanza, ("/presence[@to='{jid_two}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_two}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~bobby@localhost'][@role='participant']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), ]), Scenario("channel_custom_topic", [ handshake_sequence(), # First user joins partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_one}/{resource_one}'), partial(expect_stanza, "/message/body[text()='Mode #foo [+nt] by {irc_host_one}']"), partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='admin'][@jid='~nick@localhost'][@role='moderator']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"), # First user sets the topic partial(send_stanza, "TOPIC TEST"), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}/{nick_one}'][@type='groupchat'][@to='{jid_one}/{resource_one}']/subject[text()='TOPIC TEST']"), # Second user joins partial(send_stanza, ""), connection_sequence("irc.localhost", '{jid_two}/{resource_one}'), # Our presence, sent to the other user partial(expect_stanza, ("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_two}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~bobby@localhost'][@role='participant']",)), # The other user presence partial(expect_stanza, "/presence[@to='{jid_second}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='admin'][@jid='~nick@localhost'][@role='moderator']"), # Our own presence partial(expect_stanza, ("/presence[@to='{jid_two}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_two}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~bobby@localhost'][@role='participant']", "/presence/muc_user:x/muc_user:status[@code='110']") ), partial(expect_stanza, "/message[@from='#foo%{irc_server_one}/{nick_one}'][@type='groupchat']/subject[text()='TOPIC TEST']"), ]), ) failures = 0 irc = IrcServerRunner() print("Starting irc server…") asyncio.get_event_loop().run_until_complete(irc.start()) while True: res = asyncio.get_event_loop().run_until_complete(irc.process.stderr.readline()) if b"now running in foreground mode" in res: break print("irc server started.") print("Running %s checks for biboumi." % (len(scenarios))) for scenario in scenarios: test = BiboumiTest(scenario) if not test.run(False): print("You can check the files slixmpp_%s_output.txt and biboumi_%s_output.txt to help you debug." % (scenario.name, scenario.name)) failures += 1 print("Waiting for irc server to exit…") irc.stop() code = asyncio.get_event_loop().run_until_complete(irc.wait()) if failures: print("%d test%s failed, please fix %s." % (failures, 's' if failures > 1 else '', 'them' if failures > 1 else 'it')) sys.exit(1) else: print("All tests passed successfully")