from functools import partial import collections import datetime import asyncio import time import lxml.etree import io common_replacements = { 'irc_server_one': 'irc.localhost@biboumi.localhost', 'irc_server_two': 'localhost@biboumi.localhost', 'irc_host_one': 'irc.localhost', 'irc_host_two': 'localhost', 'biboumi_host': 'biboumi.localhost', 'resource_one': 'resource1', 'resource_two': 'resource2', 'nick_one': 'Nick', 'jid_one': 'first@example.com', 'jid_two': 'second@example.com', 'jid_admin': 'admin@example.com', 'nick_two': 'Nick2', 'nick_three': 'Nick3', 'lower_nick_one': 'nick', 'lower_nick_two': 'nick2', } class SkipStepError(Exception): """ Raised by a step when it needs to be skiped, by running the next available step immediately. """ pass class StanzaError(Exception): """ Raised when a step fails. """ pass def match(stanza, xpath): tree = lxml.etree.parse(io.StringIO(str(stanza))) matched = tree.xpath(xpath, namespaces={'re': 'http://exslt.org/regular-expressions', 'muc_user': 'http://jabber.org/protocol/muc#user', 'muc_owner': 'http://jabber.org/protocol/muc#owner', 'muc': 'http://jabber.org/protocol/muc', 'disco_info': 'http://jabber.org/protocol/disco#info', 'muc_traffic': 'http://jabber.org/protocol/muc#traffic', 'disco_items': 'http://jabber.org/protocol/disco#items', 'commands': 'http://jabber.org/protocol/commands', 'dataform': 'jabber:x:data', 'version': 'jabber:iq:version', 'mam': 'urn:xmpp:mam:2', 'rms': 'http://jabber.org/protocol/rsm', 'delay': 'urn:xmpp:delay', 'forward': 'urn:xmpp:forward:0', 'client': 'jabber:client', 'rsm': 'http://jabber.org/protocol/rsm', 'carbon': 'urn:xmpp:carbons:2', 'hints': 'urn:xmpp:hints', 'stanza': 'urn:ietf:params:xml:ns:xmpp-stanzas', 'stable_id': 'urn:xmpp:sid:0'}) return matched def check_xpath(xpaths, xmpp, after, stanza): for xpath in xpaths: expected = True real_xpath = xpath # We can check that a stanza DOESN’T match, by adding a ! before it. if xpath.startswith('!'): expected = False xpath = xpath[1:] matched = match(stanza, xpath) if (expected and not matched) or (not expected and matched): raise StanzaError("Received stanza\n%s\ndid not match expected xpath\n%s" % (stanza, real_xpath)) if after: if isinstance(after, collections.Iterable): for af in after: af(stanza, xmpp) else: after(stanza, xmpp) def check_xpath_optional(xpaths, xmpp, after, stanza): try: check_xpath(xpaths, xmpp, after, stanza) except StanzaError: raise SkipStepError() def all_xpaths_match(stanza, xpaths): try: check_xpath(xpaths, None, None, stanza) except StanzaError: return False return True def check_list_of_xpath(list_of_xpaths, xmpp, stanza): found = False for i, xpaths in enumerate(list_of_xpaths): if all_xpaths_match(stanza, xpaths): found = True list_of_xpaths.pop(i) break if not found: raise StanzaError("Received stanza “%s” did not match any of the expected xpaths:\n%s" % (stanza, list_of_xpaths)) if list_of_xpaths: step = partial(expect_unordered_already_formatted, list_of_xpaths) xmpp.scenario.steps.insert(0, step) def extract_attribute(xpath, name): def f(xpath, name, stanza): matched = match(stanza, xpath) return matched[0].get(name) return partial(f, xpath, name) def extract_text(xpath, stanza): matched = match(stanza, xpath) return matched[0].text def save_value(name, func): def f(name, func, stanza, xmpp): xmpp.saved_values[name] = func(stanza) return partial(f, name, func) def expect_stanza(*args, optional=False, after=None): def f(*xpaths, xmpp, biboumi, optional, after): replacements = common_replacements replacements.update(xmpp.saved_values) check_func = check_xpath if not optional else check_xpath_optional formatted_xpaths = [xpath.format_map(replacements) for xpath in xpaths] xmpp.stanza_checker = partial(check_func, formatted_xpaths, xmpp, after) xmpp.timeout_handler = asyncio.get_event_loop().call_later(10, partial(xmpp.on_timeout, formatted_xpaths)) return partial(f, *args, optional=optional, after=after) def send_stanza(stanza): def internal(stanza, xmpp, biboumi): replacements = common_replacements replacements.update(xmpp.saved_values) xmpp.send_raw(stanza.format_map(replacements)) asyncio.get_event_loop().call_soon(xmpp.run_scenario) return partial(internal, stanza) def expect_unordered(*args): def f(*lists_of_xpaths, xmpp, biboumi): formatted_list_of_xpaths = [] for list_of_xpaths in lists_of_xpaths: formatted_xpaths = [] for xpath in list_of_xpaths: formatted_xpath = xpath.format_map(common_replacements) formatted_xpaths.append(formatted_xpath) formatted_list_of_xpaths.append(tuple(formatted_xpaths)) expect_unordered_already_formatted(formatted_list_of_xpaths, xmpp, biboumi) xmpp.timeout_handler = asyncio.get_event_loop().call_later(10, partial(xmpp.on_timeout, formatted_list_of_xpaths)) return partial(f, *args) def expect_unordered_already_formatted(formatted_list_of_xpaths, xmpp, biboumi): xmpp.stanza_checker = partial(check_list_of_xpath, formatted_list_of_xpaths, xmpp) def sleep_for(duration): def f(duration, xmpp, biboumi): time.sleep(duration) asyncio.get_event_loop().call_soon(xmpp.run_scenario) return partial(f, duration) def save_current_timestamp_plus_delta(key, delta): def f(key, delta, message, xmpp): now_plus_delta = datetime.datetime.utcnow() + delta xmpp.saved_values[key] = now_plus_delta.strftime("%FT%T.967Z") return partial(f, key, delta)