#!/usr/bin/env python3
import collections
import slixmpp
import asyncio
import logging
import signal
import atexit
import lxml.etree
import sys
import io
from functools import partial
from slixmpp.xmlstream.matcher.base import MatcherBase
class MatchAll(MatcherBase):
"""match everything"""
def match(self, xml):
return True
class StanzaError(Exception):
pass
class XMPPComponent(slixmpp.BaseXMPP):
"""
XMPPComponent sending a “scenario” of stanzas, checking that the responses
match the expected results.
"""
def __init__(self, scenario, biboumi):
super().__init__(jid="biboumi.localhost", default_ns="jabber:component:accept")
self.is_component = True
self.stream_header = '' % (
'xmlns="jabber:component:accept"',
'xmlns:stream="%s"' % self.stream_ns,
self.boundjid, self.get_id())
self.stream_footer = ""
self.register_handler(slixmpp.Callback('Match All',
MatchAll(None),
self.handle_incoming_stanza))
self.add_event_handler("session_end", self.on_end_session)
asyncio.async(self.accept_routine())
self.scenario = scenario
self.biboumi = biboumi
# A callable, taking a stanza as argument and raising a StanzaError
# exception if the test should fail.
self.stanza_checker = None
self.failed = False
self.accepting_server = None
def error(self, message):
print("[31;1mFailure[0m: %s" % (message,))
self.scenario.steps = []
self.failed = True
def on_end_session(self, event):
self.loop.stop()
def handle_incoming_stanza(self, stanza):
if self.stanza_checker:
try:
self.stanza_checker(stanza)
except StanzaError as e:
self.error(e)
self.stanza_checker = None
self.run_scenario()
def run_scenario(self):
if scenario.steps:
step = scenario.steps.pop(0)
step(self, self.biboumi)
else:
self.biboumi.stop()
@asyncio.coroutine
def accept_routine(self):
self.accepting_server = yield from self.loop.create_server(lambda: self,
"127.0.0.1", "8811", reuse_address=True)
def check_stanza_against_all_expected_xpaths(self):
pass
def check_xpath(xpaths, stanza):
for xpath in xpaths:
tree = lxml.etree.parse(io.StringIO(str(stanza)))
matched = tree.xpath(xpath, namespaces={'re': 'http://exslt.org/regular-expressions',
'muc_user': 'http://jabber.org/protocol/muc#user'})
if not matched:
raise StanzaError("Received stanza “%s” did not match expected xpath “%s”" % (stanza, xpath))
class Scenario:
"""Defines a list of actions that are executed in sequence, until one of
them throws an exception, or until the end. An action can be something
like “send a stanza”, “receive the next stanza and check that it matches
the given XPath”, “send a signal”, “wait for the end of the process”,
etc
"""
def __init__(self, name, steps):
"""
Steps is a list of 2-tuple:
[(action, answer), (action, answer)]
"""
self.name = name
self.steps = []
for elem in steps:
if isinstance(elem, collections.Iterable):
for step in elem:
self.steps.append(step)
else:
self.steps.append(elem)
class ProcessRunner:
def __init__(self):
self.process = None
self.signal_sent = False
@asyncio.coroutine
def start(self):
self.process = yield from self.create
@asyncio.coroutine
def wait(self):
code = yield from self.process.wait()
return code
def stop(self):
if not self.signal_sent:
self.signal_sent = True
if self.process:
self.process.send_signal(signal.SIGINT)
def __del__(self):
self.stop()
class BiboumiRunner(ProcessRunner):
def __init__(self, name, with_valgrind):
super().__init__()
self.name = name
self.fd = open("biboumi_%s_output.txt" % (name,), "w")
if with_valgrind:
self.create = asyncio.create_subprocess_exec("valgrind", "--leak-check=full", "--show-leak-kinds=all",
"--errors-for-leak-kinds=all", "--error-exitcode=16",
"./biboumi", "test.conf", stdin=None, stdout=self.fd,
stderr=self.fd, loop=None, limit=None)
else:
self.create = asyncio.create_subprocess_exec("./biboumi", "test.conf", stdin=None, stdout=self.fd,
stderr=self.fd, loop=None, limit=None)
class IrcServerRunner(ProcessRunner):
def __init__(self):
super().__init__()
self.create = asyncio.create_subprocess_exec("mammond", "--debug", "--nofork",
"--config", "../tests/end_to_end/mammond.conf",
stderr=asyncio.subprocess.PIPE)
def send_stanza(stanza, xmpp, biboumi):
xmpp.send_raw(stanza.format_map(common_replacements))
asyncio.get_event_loop().call_soon(xmpp.run_scenario)
def expect_stanza(xpaths, xmpp, biboumi):
if isinstance(xpaths, str):
xmpp.stanza_checker = partial(check_xpath, [xpaths.format_map(common_replacements)])
elif isinstance(xpaths, tuple):
xmpp.stanza_checker = partial(check_xpath, [xpath.format_map(common_replacements) for xpath in xpaths])
else:
print("Warning, from argument type passed to expect_stanza: %s" % (type(xpaths)))
class BiboumiTest:
"""
Spawns a biboumi process and a fake XMPP Component that will run a
Scenario. It redirects the outputs of the subprocess into separated
files, and detects any failure in the running of the scenario.
"""
def __init__(self, scenario, expected_code=0):
self.scenario = scenario
self.expected_code = 0
def run(self, with_valgrind=True):
print("Running scenario: [33;1m%s[0m%s" % (self.scenario.name, " (with valgrind)" if with_valgrind else ''))
# Redirect the slixmpp logging into a specific file
output_filename = "slixmpp_%s_output.txt" % (self.scenario.name,)
with open(output_filename, "w"):
pass
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)-8s %(message)s',
filename=output_filename)
with open("test.conf", "w") as fd:
fd.write(confs['basic'])
# Start the XMPP component and biboumi
biboumi = BiboumiRunner(scenario.name, with_valgrind)
xmpp = XMPPComponent(self.scenario, biboumi)
asyncio.get_event_loop().run_until_complete(biboumi.start())
asyncio.get_event_loop().call_soon(xmpp.run_scenario)
xmpp.process()
code = asyncio.get_event_loop().run_until_complete(biboumi.wait())
failed = False
if not xmpp.failed:
if code != self.expected_code:
xmpp.error("Wrong return code from biboumi's process: %d" % (code,))
failed = True
else:
print("[32;1mSuccess![0m")
else:
failed = True
if xmpp.server:
xmpp.accepting_server.close()
return not failed
confs = {'basic':
"""hostname=biboumi.localhost
password=coucou
db_name=biboumi.sqlite
port=8811"""}
common_replacements = {
'irc_server_one': 'irc.localhost@biboumi.localhost',
'irc_host_one': 'irc.localhost',
'resource_one': 'resource1',
'nick_one': 'Nick',
'jid_one': 'first@example.com',
'jid_two': 'second@example.com',
'nick_two': 'Bobby',
}
def handshake_sequence():
return (partial(expect_stanza, "//handshake"),
partial(send_stanza, ""))
def connection_sequence(irc_host, jid):
jid = jid.format_map(common_replacements)
xpath = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[text()='%s']"
xpath_re = "/message[@to='" + jid + "'][@from='irc.localhost@biboumi.localhost']/body[re:test(text(), '%s')]"
return (
partial(expect_stanza,
xpath % ('Connecting to %s:6697 (encrypted)' % irc_host)),
partial(expect_stanza,
xpath % ('Connection failed: Connection refused')),
partial(expect_stanza,
xpath % ('Connecting to %s:6670 (encrypted)' % irc_host)),
partial(expect_stanza,
xpath % ('Connection failed: Connection refused')),
partial(expect_stanza,
xpath % ('Connecting to %s:6667 (not encrypted)' % irc_host)),
partial(expect_stanza,
xpath % ('Connected to IRC server.')),
partial(expect_stanza,
xpath % ('%s: *** Looking up your hostname...' % irc_host)),
partial(expect_stanza,
xpath % ('%s: *** Checking Ident' % irc_host)),
# These three messages can be received in any order
partial(expect_stanza,
xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)),
partial(expect_stanza,
xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)),
partial(expect_stanza,
xpath_re % (r'^%s: (\*\*\* Found your hostname: .*|NAK multi-prefix |\*\*\* No Ident response)$' % irc_host)),
partial(expect_stanza,
xpath_re % (r'^%s: Your host is .*$' % irc_host)),
partial(expect_stanza,
xpath_re % (r'^%s: This server was started at .*$' % irc_host)),
partial(expect_stanza,
xpath % ("- Default MOTD\n")),
)
if __name__ == '__main__':
atexit.register(asyncio.get_event_loop().close)
# Start the test component, accepting connections on the configured
# port.
scenarios = (
Scenario("basic_handshake_success",
[
handshake_sequence()
]),
Scenario("irc_server_connection",
[
handshake_sequence(),
partial(send_stanza,
""),
connection_sequence("irc.localhost", '{jid_one}/{resource_one}'),
]),
Scenario("simple_channel_join",
[
handshake_sequence(),
partial(send_stanza,
""),
connection_sequence("irc.localhost", '{jid_one}/{resource_one}'),
partial(expect_stanza,
("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~nick@localhost.localdomain'][@role='participant']",
"/presence/muc_user:x/muc_user:status[@code='110']")
),
partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"),
]),
Scenario("channel_join_with_two_users",
[
handshake_sequence(),
# First user joins
partial(send_stanza,
""),
connection_sequence("irc.localhost", '{jid_one}/{resource_one}'),
partial(expect_stanza,
("/presence[@to='{jid_one}/{resource_one}'][@from='#foo%{irc_server_one}/{nick_one}']/muc_user:x/muc_user:item[@affiliation='none'][@jid='~nick@localhost.localdomain'][@role='participant']",
"/presence/muc_user:x/muc_user:status[@code='110']")
),
partial(expect_stanza, "/message[@from='#foo%{irc_server_one}'][@type='groupchat']/subject[not(text())]"),
# Second user joins
partial(send_stanza,
""),
# connection_sequence("irc.localhost", '{jid_two}/{resource_one}'),
]),
)
failures = 0
irc = IrcServerRunner()
print("Starting mammond server…")
asyncio.get_event_loop().run_until_complete(irc.start())
while True:
res = asyncio.get_event_loop().run_until_complete(irc.process.stderr.readline())
if b"init finished..." in res:
break
print("mammond server started.")
print("Running %s checks for biboumi." % (len(scenarios)))
for scenario in scenarios:
test = BiboumiTest(scenario)
if not test.run(False):
print("You can check the files slixmpp_%s_output.txt and biboumi_%s_output.txt to help you debug." %
(scenario.name, scenario.name))
failures += 1
print("Waiting for mammond to exit…")
irc.stop()
code = asyncio.get_event_loop().run_until_complete(irc.wait())
if failures:
print("%d test%s failed, please fix %s." % (failures, 's' if failures > 1 else '',
'them' if failures > 1 else 'it'))
sys.exit(1)
else:
print("All tests passed successfully")