summaryrefslogtreecommitdiff
path: root/poezio/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/common.py')
-rw-r--r--poezio/common.py141
1 files changed, 116 insertions, 25 deletions
diff --git a/poezio/common.py b/poezio/common.py
index ba179310..6b7d2bfe 100644
--- a/poezio/common.py
+++ b/poezio/common.py
@@ -3,12 +3,16 @@
# This file is part of Poezio.
#
# Poezio is free software: you can redistribute it and/or modify
-# it under the terms of the zlib license. See the COPYING file.
+# it under the terms of the GPL-3.0+ license. See the COPYING file.
"""
Various useful functions.
"""
-from datetime import datetime, timedelta
+from datetime import (
+ datetime,
+ timedelta,
+ timezone,
+)
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
@@ -17,8 +21,9 @@ import subprocess
import time
import string
import logging
+import itertools
-from slixmpp import JID, InvalidJID, Message
+from slixmpp import Message
from poezio.poezio_shlex import shlex
log = logging.getLogger(__name__)
@@ -39,7 +44,7 @@ def _get_output_of_command(command: str) -> Optional[List[str]]:
return None
-def _is_in_path(command: str, return_abs_path=False) -> Union[bool, str]:
+def _is_in_path(command: str, return_abs_path: bool = False) -> Union[bool, str]:
"""
Check if *command* is in the $PATH or not.
@@ -106,10 +111,12 @@ def get_os_info() -> str:
stdout=subprocess.PIPE,
close_fds=True)
process.wait()
- output = process.stdout.readline().decode('utf-8').strip()
- # some distros put n/a in places, so remove those
- output = output.replace('n/a', '').replace('N/A', '')
- return output
+ if process.stdout is not None:
+ out = process.stdout.readline().decode('utf-8').strip()
+ # some distros put n/a in places, so remove those
+ out = out.replace('n/a', '').replace('N/A', '')
+ return out
+ return ''
# lsb_release executable not available, so parse files
for distro_name in DISTRO_INFO:
@@ -243,7 +250,7 @@ def find_delayed_tag(message: Message) -> Tuple[bool, Optional[datetime]]:
find_delay = message.xml.find
delay_tag = find_delay('{urn:xmpp:delay}delay')
- date = None # type: Optional[datetime]
+ date: Optional[datetime] = None
if delay_tag is not None:
delayed = True
date = _datetime_tuple(delay_tag.attrib['stamp'])
@@ -282,7 +289,7 @@ def shell_split(st: str) -> List[str]:
return ret
-def find_argument(pos: int, text: str, quoted=True) -> int:
+def find_argument(pos: int, text: str, quoted: bool = True) -> int:
"""
Split an input into a list of arguments, return the number of the
argument selected by pos.
@@ -337,7 +344,7 @@ def _find_argument_unquoted(pos: int, text: str) -> int:
return argnum + 1
-def parse_str_to_secs(duration='') -> int:
+def parse_str_to_secs(duration: str = '') -> int:
"""
Parse a string of with a number of d, h, m, s.
@@ -365,7 +372,7 @@ def parse_str_to_secs(duration='') -> int:
return result
-def parse_secs_to_str(duration=0) -> str:
+def parse_secs_to_str(duration: int = 0) -> str:
"""
Do the reverse operation of :py:func:`parse_str_to_secs`.
@@ -452,19 +459,103 @@ def format_gaming_string(infos: Dict[str, str]) -> str:
return name
-def safeJID(*args, **kwargs) -> JID:
+def unique_prefix_of(a: str, b: str) -> str:
"""
- Construct a :py:class:`slixmpp.JID` object from a string.
+ Return the unique prefix of `a` with `b`.
- Used to avoid tracebacks during is stringprep fails
- (fall back to a JID with an empty string).
+ Corner cases:
+
+ * If `a` and `b` share no prefix, the first letter of `a` is returned.
+ * If `a` and `b` are equal, `a` is returned.
+ * If `a` is a prefix of `b`, `a` is returned.
+ * If `b` is a prefix of `a`, `b` plus the first letter of `a` after the
+ common prefix is returned.
"""
- try:
- return JID(*args, **kwargs)
- except InvalidJID:
- log.debug(
- 'safeJID caught an invalidJID exception: %r, %r',
- args, kwargs,
- exc_info=True,
- )
- return JID('')
+ for i, (ca, cb) in enumerate(itertools.zip_longest(a, b)):
+ if ca != cb:
+ return a[:i+1]
+ # both are equal, return a
+ return a
+
+
+def to_utc(time_: datetime) -> datetime:
+ """Convert a datetime-aware time zone into raw UTC"""
+ if time_.tzinfo is not None: # Convert to UTC
+ time_ = time_.astimezone(tz=timezone.utc)
+ else: # Assume local tz, convert to UTC
+ tzone = datetime.now().astimezone().tzinfo
+ time_ = time_.replace(tzinfo=tzone).astimezone(tz=timezone.utc)
+ # Return an offset-naive datetime
+ return time_.replace(tzinfo=None)
+
+
+# http://xmpp.org/extensions/xep-0045.html#errorstatus
+ERROR_AND_STATUS_CODES = {
+ '401': 'A password is required',
+ '403': 'Permission denied',
+ '404': 'The room doesn’t exist',
+ '405': 'Your are not allowed to create a new room',
+ '406': 'A reserved nick must be used',
+ '407': 'You are not in the member list',
+ '409': 'This nickname is already in use or has been reserved',
+ '503': 'The maximum number of users has been reached',
+}
+
+
+# http://xmpp.org/extensions/xep-0086.html
+DEPRECATED_ERRORS = {
+ '302': 'Redirect',
+ '400': 'Bad request',
+ '401': 'Not authorized',
+ '402': 'Payment required',
+ '403': 'Forbidden',
+ '404': 'Not found',
+ '405': 'Not allowed',
+ '406': 'Not acceptable',
+ '407': 'Registration required',
+ '408': 'Request timeout',
+ '409': 'Conflict',
+ '500': 'Internal server error',
+ '501': 'Feature not implemented',
+ '502': 'Remote server error',
+ '503': 'Service unavailable',
+ '504': 'Remote server timeout',
+ '510': 'Disconnected',
+}
+
+
+def get_error_message(stanza: Message, deprecated: bool = False) -> str:
+ """
+ Takes a stanza of the form <message type='error'><error/></message>
+ and return a well formed string containing error information
+ """
+ sender = stanza['from']
+ msg = stanza['error']['type']
+ condition = stanza['error']['condition']
+ code = stanza['error']['code']
+ body = stanza['error']['text']
+ if not body:
+ if deprecated:
+ if code in DEPRECATED_ERRORS:
+ body = DEPRECATED_ERRORS[code]
+ else:
+ body = condition or 'Unknown error'
+ else:
+ if code in ERROR_AND_STATUS_CODES:
+ body = ERROR_AND_STATUS_CODES[code]
+ else:
+ body = condition or 'Unknown error'
+ if code:
+ message = '%(from)s: %(code)s - %(msg)s: %(body)s' % {
+ 'from': sender,
+ 'msg': msg,
+ 'body': body,
+ 'code': code
+ }
+ else:
+ message = '%(from)s: %(msg)s: %(body)s' % {
+ 'from': sender,
+ 'msg': msg,
+ 'body': body
+ }
+ return message