diff options
Diffstat (limited to 'tests/end_to_end/functions.py')
-rw-r--r-- | tests/end_to_end/functions.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/tests/end_to_end/functions.py b/tests/end_to_end/functions.py new file mode 100644 index 0000000..3a21fcf --- /dev/null +++ b/tests/end_to_end/functions.py @@ -0,0 +1,169 @@ +from functools import partial +import collections +import datetime +import asyncio +import time +import lxml.etree +import io + +common_replacements = { + 'irc_server_one': 'irc.localhost@biboumi.localhost', + 'irc_server_two': 'localhost@biboumi.localhost', + 'irc_host_one': 'irc.localhost', + 'irc_host_two': 'localhost', + 'biboumi_host': 'biboumi.localhost', + 'resource_one': 'resource1', + 'resource_two': 'resource2', + 'nick_one': 'Nick', + 'jid_one': 'first@example.com', + 'jid_two': 'second@example.com', + 'jid_admin': 'admin@example.com', + 'nick_two': 'Nick2', + 'nick_three': 'Nick3', + 'lower_nick_one': 'nick', + 'lower_nick_two': 'nick2', +} + +class SkipStepError(Exception): + """ + Raised by a step when it needs to be skiped, by running + the next available step immediately. + """ + pass + +class StanzaError(Exception): + """ + Raised when a step fails. + """ + pass + +def match(stanza, xpath): + tree = lxml.etree.parse(io.StringIO(str(stanza))) + matched = tree.xpath(xpath, namespaces={'re': 'http://exslt.org/regular-expressions', + 'muc_user': 'http://jabber.org/protocol/muc#user', + 'muc_owner': 'http://jabber.org/protocol/muc#owner', + 'muc': 'http://jabber.org/protocol/muc', + 'disco_info': 'http://jabber.org/protocol/disco#info', + 'muc_traffic': 'http://jabber.org/protocol/muc#traffic', + 'disco_items': 'http://jabber.org/protocol/disco#items', + 'commands': 'http://jabber.org/protocol/commands', + 'dataform': 'jabber:x:data', + 'version': 'jabber:iq:version', + 'mam': 'urn:xmpp:mam:2', + 'rms': 'http://jabber.org/protocol/rsm', + 'delay': 'urn:xmpp:delay', + 'forward': 'urn:xmpp:forward:0', + 'client': 'jabber:client', + 'rsm': 'http://jabber.org/protocol/rsm', + 'carbon': 'urn:xmpp:carbons:2', + 'hints': 'urn:xmpp:hints', + 'stanza': 'urn:ietf:params:xml:ns:xmpp-stanzas', + 'stable_id': 'urn:xmpp:sid:0'}) + return matched + +def check_xpath(xpaths, xmpp, after, stanza): + for xpath in xpaths: + expected = True + real_xpath = xpath + # We can check that a stanza DOESN’T match, by adding a ! before it. + if xpath.startswith('!'): + expected = False + xpath = xpath[1:] + matched = match(stanza, xpath) + if (expected and not matched) or (not expected and matched): + raise StanzaError("Received stanza\n%s\ndid not match expected xpath\n%s" % (stanza, real_xpath)) + if after: + if isinstance(after, collections.Iterable): + for af in after: + af(stanza, xmpp) + else: + after(stanza, xmpp) + +def check_xpath_optional(xpaths, xmpp, after, stanza): + try: + check_xpath(xpaths, xmpp, after, stanza) + except StanzaError: + raise SkipStepError() + +def all_xpaths_match(stanza, xpaths): + try: + check_xpath(xpaths, None, None, stanza) + except StanzaError: + return False + return True + +def check_list_of_xpath(list_of_xpaths, xmpp, stanza): + found = False + for i, xpaths in enumerate(list_of_xpaths): + if all_xpaths_match(stanza, xpaths): + found = True + list_of_xpaths.pop(i) + break + + if not found: + raise StanzaError("Received stanza “%s” did not match any of the expected xpaths:\n%s" % (stanza, list_of_xpaths)) + + if list_of_xpaths: + step = partial(expect_unordered_already_formatted, list_of_xpaths) + xmpp.scenario.steps.insert(0, step) + +def extract_attribute(xpath, name): + def f(xpath, name, stanza): + matched = match(stanza, xpath) + return matched[0].get(name) + return partial(f, xpath, name) + +def extract_text(xpath, stanza): + matched = match(stanza, xpath) + return matched[0].text + +def save_value(name, func): + def f(name, func, stanza, xmpp): + xmpp.saved_values[name] = func(stanza) + return partial(f, name, func) + +def expect_stanza(*args, optional=False, after=None): + def f(*xpaths, xmpp, biboumi, optional, after): + replacements = common_replacements + replacements.update(xmpp.saved_values) + check_func = check_xpath if not optional else check_xpath_optional + formatted_xpaths = [xpath.format_map(replacements) for xpath in xpaths] + xmpp.stanza_checker = partial(check_func, formatted_xpaths, xmpp, after) + xmpp.timeout_handler = asyncio.get_event_loop().call_later(10, partial(xmpp.on_timeout, formatted_xpaths)) + return partial(f, *args, optional=optional, after=after) + +def send_stanza(stanza): + def internal(stanza, xmpp, biboumi): + replacements = common_replacements + replacements.update(xmpp.saved_values) + xmpp.send_raw(stanza.format_map(replacements)) + asyncio.get_event_loop().call_soon(xmpp.run_scenario) + return partial(internal, stanza) + +def expect_unordered(*args): + def f(*lists_of_xpaths, xmpp, biboumi): + formatted_list_of_xpaths = [] + for list_of_xpaths in lists_of_xpaths: + formatted_xpaths = [] + for xpath in list_of_xpaths: + formatted_xpath = xpath.format_map(common_replacements) + formatted_xpaths.append(formatted_xpath) + formatted_list_of_xpaths.append(tuple(formatted_xpaths)) + expect_unordered_already_formatted(formatted_list_of_xpaths, xmpp, biboumi) + xmpp.timeout_handler = asyncio.get_event_loop().call_later(10, partial(xmpp.on_timeout, formatted_list_of_xpaths)) + return partial(f, *args) + +def expect_unordered_already_formatted(formatted_list_of_xpaths, xmpp, biboumi): + xmpp.stanza_checker = partial(check_list_of_xpath, formatted_list_of_xpaths, xmpp) + +def sleep_for(duration): + def f(duration, xmpp, biboumi): + time.sleep(duration) + asyncio.get_event_loop().call_soon(xmpp.run_scenario) + return partial(f, duration) + +def save_current_timestamp_plus_delta(key, delta): + def f(key, delta, message, xmpp): + now_plus_delta = datetime.datetime.utcnow() + delta + xmpp.saved_values[key] = now_plus_delta.strftime("%FT%T.967Z") + return partial(f, key, delta) |