diff options
Diffstat (limited to 'poezio/common.py')
-rw-r--r-- | poezio/common.py | 141 |
1 files changed, 116 insertions, 25 deletions
diff --git a/poezio/common.py b/poezio/common.py index ba179310..6b7d2bfe 100644 --- a/poezio/common.py +++ b/poezio/common.py @@ -3,12 +3,16 @@ # This file is part of Poezio. # # Poezio is free software: you can redistribute it and/or modify -# it under the terms of the zlib license. See the COPYING file. +# it under the terms of the GPL-3.0+ license. See the COPYING file. """ Various useful functions. """ -from datetime import datetime, timedelta +from datetime import ( + datetime, + timedelta, + timezone, +) from pathlib import Path from typing import Dict, List, Optional, Tuple, Union @@ -17,8 +21,9 @@ import subprocess import time import string import logging +import itertools -from slixmpp import JID, InvalidJID, Message +from slixmpp import Message from poezio.poezio_shlex import shlex log = logging.getLogger(__name__) @@ -39,7 +44,7 @@ def _get_output_of_command(command: str) -> Optional[List[str]]: return None -def _is_in_path(command: str, return_abs_path=False) -> Union[bool, str]: +def _is_in_path(command: str, return_abs_path: bool = False) -> Union[bool, str]: """ Check if *command* is in the $PATH or not. @@ -106,10 +111,12 @@ def get_os_info() -> str: stdout=subprocess.PIPE, close_fds=True) process.wait() - output = process.stdout.readline().decode('utf-8').strip() - # some distros put n/a in places, so remove those - output = output.replace('n/a', '').replace('N/A', '') - return output + if process.stdout is not None: + out = process.stdout.readline().decode('utf-8').strip() + # some distros put n/a in places, so remove those + out = out.replace('n/a', '').replace('N/A', '') + return out + return '' # lsb_release executable not available, so parse files for distro_name in DISTRO_INFO: @@ -243,7 +250,7 @@ def find_delayed_tag(message: Message) -> Tuple[bool, Optional[datetime]]: find_delay = message.xml.find delay_tag = find_delay('{urn:xmpp:delay}delay') - date = None # type: Optional[datetime] + date: Optional[datetime] = None if delay_tag is not None: delayed = True date = _datetime_tuple(delay_tag.attrib['stamp']) @@ -282,7 +289,7 @@ def shell_split(st: str) -> List[str]: return ret -def find_argument(pos: int, text: str, quoted=True) -> int: +def find_argument(pos: int, text: str, quoted: bool = True) -> int: """ Split an input into a list of arguments, return the number of the argument selected by pos. @@ -337,7 +344,7 @@ def _find_argument_unquoted(pos: int, text: str) -> int: return argnum + 1 -def parse_str_to_secs(duration='') -> int: +def parse_str_to_secs(duration: str = '') -> int: """ Parse a string of with a number of d, h, m, s. @@ -365,7 +372,7 @@ def parse_str_to_secs(duration='') -> int: return result -def parse_secs_to_str(duration=0) -> str: +def parse_secs_to_str(duration: int = 0) -> str: """ Do the reverse operation of :py:func:`parse_str_to_secs`. @@ -452,19 +459,103 @@ def format_gaming_string(infos: Dict[str, str]) -> str: return name -def safeJID(*args, **kwargs) -> JID: +def unique_prefix_of(a: str, b: str) -> str: """ - Construct a :py:class:`slixmpp.JID` object from a string. + Return the unique prefix of `a` with `b`. - Used to avoid tracebacks during is stringprep fails - (fall back to a JID with an empty string). + Corner cases: + + * If `a` and `b` share no prefix, the first letter of `a` is returned. + * If `a` and `b` are equal, `a` is returned. + * If `a` is a prefix of `b`, `a` is returned. + * If `b` is a prefix of `a`, `b` plus the first letter of `a` after the + common prefix is returned. """ - try: - return JID(*args, **kwargs) - except InvalidJID: - log.debug( - 'safeJID caught an invalidJID exception: %r, %r', - args, kwargs, - exc_info=True, - ) - return JID('') + for i, (ca, cb) in enumerate(itertools.zip_longest(a, b)): + if ca != cb: + return a[:i+1] + # both are equal, return a + return a + + +def to_utc(time_: datetime) -> datetime: + """Convert a datetime-aware time zone into raw UTC""" + if time_.tzinfo is not None: # Convert to UTC + time_ = time_.astimezone(tz=timezone.utc) + else: # Assume local tz, convert to UTC + tzone = datetime.now().astimezone().tzinfo + time_ = time_.replace(tzinfo=tzone).astimezone(tz=timezone.utc) + # Return an offset-naive datetime + return time_.replace(tzinfo=None) + + +# http://xmpp.org/extensions/xep-0045.html#errorstatus +ERROR_AND_STATUS_CODES = { + '401': 'A password is required', + '403': 'Permission denied', + '404': 'The room doesn’t exist', + '405': 'Your are not allowed to create a new room', + '406': 'A reserved nick must be used', + '407': 'You are not in the member list', + '409': 'This nickname is already in use or has been reserved', + '503': 'The maximum number of users has been reached', +} + + +# http://xmpp.org/extensions/xep-0086.html +DEPRECATED_ERRORS = { + '302': 'Redirect', + '400': 'Bad request', + '401': 'Not authorized', + '402': 'Payment required', + '403': 'Forbidden', + '404': 'Not found', + '405': 'Not allowed', + '406': 'Not acceptable', + '407': 'Registration required', + '408': 'Request timeout', + '409': 'Conflict', + '500': 'Internal server error', + '501': 'Feature not implemented', + '502': 'Remote server error', + '503': 'Service unavailable', + '504': 'Remote server timeout', + '510': 'Disconnected', +} + + +def get_error_message(stanza: Message, deprecated: bool = False) -> str: + """ + Takes a stanza of the form <message type='error'><error/></message> + and return a well formed string containing error information + """ + sender = stanza['from'] + msg = stanza['error']['type'] + condition = stanza['error']['condition'] + code = stanza['error']['code'] + body = stanza['error']['text'] + if not body: + if deprecated: + if code in DEPRECATED_ERRORS: + body = DEPRECATED_ERRORS[code] + else: + body = condition or 'Unknown error' + else: + if code in ERROR_AND_STATUS_CODES: + body = ERROR_AND_STATUS_CODES[code] + else: + body = condition or 'Unknown error' + if code: + message = '%(from)s: %(code)s - %(msg)s: %(body)s' % { + 'from': sender, + 'msg': msg, + 'body': body, + 'code': code + } + else: + message = '%(from)s: %(msg)s: %(body)s' % { + 'from': sender, + 'msg': msg, + 'body': body + } + return message |