summaryrefslogtreecommitdiff
path: root/poezio/tabs
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/tabs')
-rw-r--r--poezio/tabs/adhoc_commands_list.py4
-rw-r--r--poezio/tabs/basetabs.py505
-rw-r--r--poezio/tabs/bookmarkstab.py48
-rw-r--r--poezio/tabs/confirmtab.py6
-rw-r--r--poezio/tabs/conversationtab.py236
-rw-r--r--poezio/tabs/data_forms.py4
-rw-r--r--poezio/tabs/listtab.py6
-rw-r--r--poezio/tabs/muclisttab.py7
-rw-r--r--poezio/tabs/muctab.py1300
-rw-r--r--poezio/tabs/privatetab.py292
-rw-r--r--poezio/tabs/rostertab.py541
-rw-r--r--poezio/tabs/xmltab.py67
12 files changed, 1740 insertions, 1276 deletions
diff --git a/poezio/tabs/adhoc_commands_list.py b/poezio/tabs/adhoc_commands_list.py
index b62166b0..3b6bc1db 100644
--- a/poezio/tabs/adhoc_commands_list.py
+++ b/poezio/tabs/adhoc_commands_list.py
@@ -16,8 +16,8 @@ log = logging.getLogger(__name__)
class AdhocCommandsListTab(ListTab):
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core, jid):
ListTab.__init__(
diff --git a/poezio/tabs/basetabs.py b/poezio/tabs/basetabs.py
index dbe92a32..793eae62 100644
--- a/poezio/tabs/basetabs.py
+++ b/poezio/tabs/basetabs.py
@@ -13,26 +13,57 @@ This module also defines ChatTabs, the parent class for all tabs
revolving around chats.
"""
+from __future__ import annotations
+
import logging
import string
-import time
+import asyncio
+from copy import copy
+from math import ceil, log10
from datetime import datetime
-from xml.etree import cElementTree as ET
-from typing import Any, Callable, Dict, List, Optional, Union
-
-from slixmpp import JID, InvalidJID, Message
-
+from xml.etree import ElementTree as ET
+from xml.sax import SAXParseException
+from typing import (
+ Any,
+ Callable,
+ cast,
+ Dict,
+ List,
+ Optional,
+ Union,
+ Tuple,
+ TYPE_CHECKING,
+)
+
+from poezio import (
+ poopt,
+ timed_events,
+ xhtml,
+ windows
+)
from poezio.core.structs import Command, Completion, Status
-from poezio import timed_events
-from poezio import windows
-from poezio import xhtml
-from poezio.common import safeJID
from poezio.config import config
-from poezio.decorators import refresh_wrapper
+from poezio.decorators import command_args_parser, refresh_wrapper
from poezio.logger import logger
+from poezio.log_loader import MAMFiller, LogLoader
from poezio.text_buffer import TextBuffer
from poezio.theming import get_theme, dump_tuple
-from poezio.decorators import command_args_parser
+from poezio.user import User
+from poezio.ui.funcs import truncate_nick
+from poezio.timed_events import DelayedEvent
+from poezio.ui.types import (
+ BaseMessage,
+ Message,
+ PersistentInfoMessage,
+ LoggableTrait,
+)
+
+from slixmpp import JID, InvalidJID, Message as SMessage
+
+if TYPE_CHECKING:
+ from _curses import _CursesWindow # pylint: disable=E0611
+ from poezio.size_manager import SizeManager
+ from poezio.core.core import Core
log = logging.getLogger(__name__)
@@ -90,29 +121,42 @@ SHOW_NAME = {
class Tab:
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
# Placeholder values, set on resize
- height = 1
- width = 1
-
- def __init__(self, core):
+ height: int = 1
+ width: int = 1
+ core: Core
+ input: Optional[windows.Input]
+ key_func: Dict[str, Callable[[], Any]]
+ commands: Dict[str, Command]
+ need_resize: bool
+ ui_config_changed: bool
+
+ def __init__(self, core: Core):
self.core = core
self.nb = 0
- if not hasattr(self, 'name'):
- self.name = self.__class__.__name__
+ if not hasattr(self, '_name'):
+ self._name = self.__class__.__name__
self.input = None
self.closed = False
self._state = 'normal'
self._prev_state = None
self.need_resize = False
+ self.ui_config_changed = False
self.key_func = {} # each tab should add their keys in there
# and use them in on_input
self.commands = {} # and their own commands
@property
- def size(self) -> int:
+ def name(self) -> str:
+ if hasattr(self, '_name'):
+ return self._name
+ return ''
+
+ @property
+ def size(self) -> SizeManager:
return self.core.size
@staticmethod
@@ -121,23 +165,27 @@ class Tab:
Returns 1 or 0, depending on if we are using the vertical tab list
or not.
"""
- if config.get('enable_vertical_tab_list'):
+ if config.getbool('enable_vertical_tab_list'):
return 0
return 1
@property
- def info_win(self):
+ def info_win(self) -> windows.TextWin:
return self.core.information_win
@property
- def color(self):
+ def color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]:
return STATE_COLORS[self._state]()
@property
- def vertical_color(self):
+ def vertical_color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]:
return VERTICAL_STATE_COLORS[self._state]()
@property
+ def priority(self) -> Union[int, float]:
+ return STATE_PRIORITY.get(self._state, -1)
+
+ @property
def state(self) -> str:
return self._state
@@ -175,7 +223,7 @@ class Tab:
self._state = 'normal'
@staticmethod
- def resize(scr):
+ def initial_resize(scr: _CursesWindow):
Tab.height, Tab.width = scr.getmaxyx()
windows.base_wins.TAB_WIN = scr
@@ -212,7 +260,7 @@ class Tab:
*,
desc='',
shortdesc='',
- completion: Optional[Callable] = None,
+ completion: Optional[Callable[[windows.Input], Completion]] = None,
usage=''):
"""
Add a command
@@ -264,7 +312,6 @@ class Tab:
comp = command.comp(the_input)
if comp:
return comp.run()
- return comp
return False
def execute_command(self, provided_text: str) -> bool:
@@ -272,8 +319,10 @@ class Tab:
Execute the command in the input and return False if
the input didn't contain a command
"""
+ if self.input is None:
+ raise NotImplementedError
txt = provided_text or self.input.key_enter()
- if txt.startswith('/') and not txt.startswith('//') and\
+ if txt and txt.startswith('/') and not txt.startswith('//') and\
not txt.startswith('/me '):
command = txt.strip().split()[0][1:]
arg = txt[2 + len(command):] # jump the '/' and the ' '
@@ -301,13 +350,16 @@ class Tab:
if func:
if hasattr(self.input, "reset_completion"):
self.input.reset_completion()
- func(arg)
+ if asyncio.iscoroutinefunction(func):
+ asyncio.create_task(func(arg))
+ else:
+ func(arg)
return True
else:
return False
- def refresh_tab_win(self):
- if config.get('enable_vertical_tab_list'):
+ def refresh_tab_win(self) -> None:
+ if config.getbool('enable_vertical_tab_list'):
left_tab_win = self.core.left_tab_win
if left_tab_win and not self.size.core_degrade_x:
left_tab_win.refresh()
@@ -338,24 +390,18 @@ class Tab:
"""
return self.name
- def get_text_window(self) -> Optional[windows.TextWin]:
- """
- Returns the principal TextWin window, if there's one
- """
- return None
-
def on_input(self, key: str, raw: bool):
"""
raw indicates if the key should activate the associated command or not.
"""
pass
- def update_commands(self):
+ def update_commands(self) -> None:
for c in self.plugin_commands:
if c not in self.commands:
self.commands[c] = self.plugin_commands[c]
- def update_keys(self):
+ def update_keys(self) -> None:
for k in self.plugin_keys:
if k not in self.key_func:
self.key_func[k] = self.plugin_keys[k]
@@ -414,7 +460,7 @@ class Tab:
"""
pass
- def on_close(self):
+ def on_close(self) -> None:
"""
Called when the tab is to be closed
"""
@@ -422,7 +468,7 @@ class Tab:
self.input.on_delete()
self.closed = True
- def matching_names(self) -> List[str]:
+ def matching_names(self) -> List[Tuple[int, str]]:
"""
Returns a list of strings that are used to name a tab with the /win
command. For example you could switch to a tab that returns
@@ -436,6 +482,9 @@ class Tab:
class GapTab(Tab):
+ def __init__(self):
+ return
+
def __bool__(self):
return False
@@ -443,7 +492,7 @@ class GapTab(Tab):
return 0
@property
- def name(self):
+ def name(self) -> str:
return ''
def refresh(self):
@@ -458,9 +507,14 @@ class ChatTab(Tab):
Also, ^M is already bound to on_enter
And also, add the /say command
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
+ last_sent_message: Optional[SMessage]
message_type = 'chat'
+ timed_event_paused: Optional[DelayedEvent]
+ timed_event_not_paused: Optional[DelayedEvent]
+ mam_filler: Optional[MAMFiller]
+ e2e_encryption: Optional[str] = None
def __init__(self, core, jid: Union[JID, str]):
Tab.__init__(self, core)
@@ -469,18 +523,21 @@ class ChatTab(Tab):
jid = JID(jid)
assert jid.domain
self._jid = jid
-
- self._name = jid.full # type: Optional[str]
- self.text_win = None
+ #: Is the tab currently requesting MAM data?
+ self.query_status = False
+ self._name = jid.full
+ self.text_win = windows.TextWin()
self.directed_presence = None
self._text_buffer = TextBuffer()
+ self._text_buffer.add_window(self.text_win)
+ self.mam_filler = None
self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive"
# We keep a reference of the event that will set our chatstate to "paused", so that
# we can delete it or change it if we need to
self.timed_event_paused = None
self.timed_event_not_paused = None
# Keeps the last sent message to complete it easily in completion_correct, and to replace it.
- self.last_sent_message = {}
+ self.last_sent_message = None
self.key_func['M-v'] = self.move_separator
self.key_func['M-h'] = self.scroll_separator
self.key_func['M-/'] = self.last_words_completion
@@ -491,6 +548,12 @@ class ChatTab(Tab):
usage='<message>',
shortdesc='Send the message.')
self.register_command(
+ 'scrollback',
+ self.command_scrollback,
+ usage="end home clear status goto <+|-linecount>|<linenum>|<timestamp>",
+ shortdesc='Scrollback to the given line number, message, or clear the buffer.')
+ self.commands['sb'] = self.commands['scrollback']
+ self.register_command(
'xhtml',
self.command_xhtml,
usage='<custom xhtml>',
@@ -503,18 +566,10 @@ class ChatTab(Tab):
desc='Fix the last message with whatever you want.',
shortdesc='Correct the last message.',
completion=self.completion_correct)
- self.chat_state = None
+ self.chat_state: Optional[str] = None
self.update_commands()
self.update_keys()
- # Get the logs
- log_nb = config.get('load_log')
- logs = self.load_logs(log_nb)
-
- if logs:
- for message in logs:
- self._text_buffer.add_message(**message)
-
@property
def name(self) -> str:
if self._name is not None:
@@ -531,13 +586,18 @@ class ChatTab(Tab):
if value.domain:
self._jid = value
except InvalidJID:
- self._name = value
+ self._name = str(value)
else:
raise TypeError("Name %r must be of type JID or str." % value)
@property
+ def log_name(self) -> str:
+ """Name used for the log filename"""
+ return self.jid.bare
+
+ @property
def jid(self) -> JID:
- return self._jid
+ return copy(self._jid)
@jid.setter
def jid(self, value: JID) -> None:
@@ -550,56 +610,35 @@ class ChatTab(Tab):
def general_jid(self) -> JID:
raise NotImplementedError
- def load_logs(self, log_nb: int) -> Optional[List[Dict[str, Any]]]:
- return logger.get_logs(self.jid.bare, log_nb)
-
- def log_message(self,
- txt: str,
- nickname: str,
- time: Optional[datetime] = None,
- typ=1):
+ def log_message(self, message: BaseMessage):
"""
Log the messages in the archives.
"""
- name = self.jid.bare
- if not logger.log_message(name, nickname, txt, date=time, typ=typ):
+ if not isinstance(message, LoggableTrait):
+ return
+ if not logger.log_message(self.log_name, message):
self.core.information('Unable to write in the log file', 'Error')
- def add_message(self,
- txt,
- time=None,
- nickname=None,
- forced_user=None,
- nick_color=None,
- identifier=None,
- jid=None,
- history=None,
- typ=1,
- highlight=False):
- self.log_message(txt, nickname, time=time, typ=typ)
- self._text_buffer.add_message(
- txt,
- time=time,
- nickname=nickname,
- highlight=highlight,
- nick_color=nick_color,
- history=history,
- user=forced_user,
- identifier=identifier,
- jid=jid)
+ def add_message(self, message: BaseMessage):
+ self.log_message(message)
+ self._text_buffer.add_message(message)
def modify_message(self,
- txt,
- old_id,
- new_id,
- user=None,
- jid=None,
- nickname=None):
- self.log_message(txt, nickname, typ=1)
+ txt: str,
+ old_id: str,
+ new_id: str,
+ time: Optional[datetime],
+ delayed: bool = False,
+ nickname: Optional[str] = None,
+ user: Optional[User] = None,
+ jid: Optional[JID] = None,
+ ) -> bool:
message = self._text_buffer.modify_message(
- txt, old_id, new_id, time=time, user=user, jid=jid)
+ txt, old_id, new_id, user=user, jid=jid, time=time
+ )
if message:
- self.text_win.modify_message(old_id, message)
+ self.log_message(message)
+ self.text_win.modify_message(message.identifier, message)
self.core.refresh_window()
return True
return False
@@ -620,16 +659,20 @@ class ChatTab(Tab):
for word in txt.split():
if len(word) >= 4 and word not in words:
words.append(word)
- words.extend([word for word in config.get('words').split(':') if word])
+ words.extend([word for word in config.getlist('words') if word])
self.input.auto_completion(words, ' ', quotify=False)
def on_enter(self):
+ if self.input is None:
+ raise NotImplementedError
txt = self.input.key_enter()
if txt:
if not self.execute_command(txt):
if txt.startswith('//'):
txt = txt[1:]
- self.command_say(xhtml.convert_simple_to_full_colors(txt))
+ asyncio.ensure_future(
+ self.command_say(xhtml.convert_simple_to_full_colors(txt))
+ )
self.cancel_paused_delay()
@command_args_parser.raw
@@ -641,19 +684,19 @@ class ChatTab(Tab):
if message:
message.send()
- def generate_xhtml_message(self, arg: str) -> Message:
+ def generate_xhtml_message(self, arg: str) -> Optional[SMessage]:
if not arg:
- return
+ return None
try:
body = xhtml.clean_text(
xhtml.xhtml_to_poezio_colors(arg, force=True))
ET.fromstring(arg)
- except:
+ except SAXParseException:
self.core.information('Could not send custom xhtml', 'Error')
- log.error('/xhtml: Unable to send custom xhtml', exc_info=True)
- return
+ log.error('/xhtml: Unable to send custom xhtml')
+ return None
- msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid())
msg['body'] = body
msg.enable('html')
msg['html']['body'] = arg
@@ -670,27 +713,31 @@ class ChatTab(Tab):
self._text_buffer.messages = []
self.text_win.rebuild_everything(self._text_buffer)
- def check_send_chat_state(self):
+ def check_send_chat_state(self) -> bool:
"If we should send a chat state"
return True
- def send_chat_state(self, state, always_send=False):
+ def send_chat_state(self, state: str, always_send: bool = False) -> None:
"""
Send an empty chatstate message
"""
+ from poezio.tabs import PrivateTab
+
if self.check_send_chat_state():
if state in ('active', 'inactive',
'gone') and self.inactive and not always_send:
return
if config.get_by_tabname('send_chat_states', self.general_jid):
- msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid())
msg['type'] = self.message_type
msg['chat_state'] = state
self.chat_state = state
+ msg['no-store'] = True
+ if isinstance(self, PrivateTab):
+ msg.enable('muc')
msg.send()
- return True
- def send_composing_chat_state(self, empty_after):
+ def send_composing_chat_state(self, empty_after: bool) -> None:
"""
Send the "active" or "composing" chatstate, depending
on the the current status of the input
@@ -726,7 +773,7 @@ class ChatTab(Tab):
self.core.add_timed_event(new_event)
self.timed_event_not_paused = new_event
- def cancel_paused_delay(self):
+ def cancel_paused_delay(self) -> None:
"""
Remove that event from the list and set it to None.
Called for example when the input is emptied, or when the message
@@ -735,11 +782,22 @@ class ChatTab(Tab):
if self.timed_event_paused is not None:
self.core.remove_timed_event(self.timed_event_paused)
self.timed_event_paused = None
- self.core.remove_timed_event(self.timed_event_not_paused)
- self.timed_event_not_paused = None
+ if self.timed_event_not_paused is not None:
+ self.core.remove_timed_event(self.timed_event_not_paused)
+ self.timed_event_not_paused = None
+
+ def set_last_sent_message(self, msg: SMessage, correct: bool = False) -> None:
+ """Ensure last_sent_message is set with the correct attributes"""
+ if correct:
+ # XXX: Is the copy needed. Is the object passed here reused
+ # afterwards? Who knows.
+ msg = cast(SMessage, copy(msg))
+ if self.last_sent_message is not None:
+ msg['id'] = self.last_sent_message['id']
+ self.last_sent_message = msg
@command_args_parser.raw
- def command_correct(self, line):
+ async def command_correct(self, line: str) -> None:
"""
/correct <fixed message>
"""
@@ -749,7 +807,7 @@ class ChatTab(Tab):
if not self.last_sent_message:
self.core.information('There is no message to correct.', 'Error')
return
- self.command_say(line, correct=True)
+ await self.command_say(line, correct=True)
def completion_correct(self, the_input):
if self.last_sent_message and the_input.get_argument_position() == 1:
@@ -762,26 +820,153 @@ class ChatTab(Tab):
@property
def inactive(self) -> bool:
"""Whether we should send inactive or active as a chatstate"""
- return self.core.status.show in ('xa', 'away') or\
- (hasattr(self, 'directed_presence') and not self.directed_presence)
+ return self.core.status.show in ('xa', 'away') or (
+ hasattr(self, 'directed_presence')
+ and self.directed_presence is not None
+ and self.directed_presence
+ )
- def move_separator(self):
+ def move_separator(self) -> None:
self.text_win.remove_line_separator()
self.text_win.add_line_separator(self._text_buffer)
self.text_win.refresh()
- self.input.refresh()
+ if self.input:
+ self.input.refresh()
def get_conversation_messages(self):
return self._text_buffer.messages
- def check_scrolled(self):
+ def check_scrolled(self) -> None:
if self.text_win.pos != 0:
self.state = 'scrolled'
@command_args_parser.raw
- def command_say(self, line, correct=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
pass
+ def goto_build_lines(self, new_date):
+ text_buffer = self._text_buffer
+ built_lines = []
+ message_count = 0
+ timestamp = config.getbool('show_timestamps')
+ nick_size = config.getint('max_nick_length')
+ theme = get_theme()
+ for message in text_buffer.messages:
+ # Build lines of a message
+ txt = message.txt
+ nick = truncate_nick(message.nickname, nick_size)
+ offset = 0
+ theme = get_theme()
+ if message.ack:
+ if message.ack > 0:
+ offset += poopt.wcswidth(theme.CHAR_ACK_RECEIVED) + 1
+ else:
+ offset += poopt.wcswidth(theme.CHAR_NACK) + 1
+ if nick:
+ offset += poopt.wcswidth(nick) + 2
+ if message.revisions > 0:
+ offset += ceil(log10(message.revisions + 1))
+ if message.me:
+ offset += 1
+ if timestamp:
+ if message.history:
+ offset += 1 + theme.LONG_TIME_FORMAT_LENGTH
+ lines = poopt.cut_text(txt, self.text_win.width - offset - 1)
+ for line in lines:
+ built_lines.append(line)
+ # Find the message with timestamp less than or equal to the queried
+ # timestamp and goto that location in the tab.
+ if message.time <= new_date:
+ message_count += 1
+ if len(self.text_win.built_lines) - self.text_win.height >= len(built_lines):
+ self.text_win.pos = len(self.text_win.built_lines) - self.text_win.height - len(built_lines) + 1
+ else:
+ self.text_win.pos = 0
+ if message_count == 0:
+ self.text_win.scroll_up(len(self.text_win.built_lines))
+ self.core.refresh_window()
+
+ @command_args_parser.quoted(0, 2)
+ def command_scrollback(self, args):
+ """
+ /sb clear
+ /sb home
+ /sb end
+ /sb goto <+|-linecount>|<linenum>|<timestamp>
+ The format of timestamp must be ‘[dd[.mm]-<days ago>] hh:mi[:ss]’
+ """
+ if args is None or len(args) == 0:
+ args = ['end']
+ if len(args) == 1:
+ if args[0] == 'end':
+ self.text_win.scroll_down(len(self.text_win.built_lines))
+ self.core.refresh_window()
+ return
+ elif args[0] == 'home':
+ self.text_win.scroll_up(len(self.text_win.built_lines))
+ self.core.refresh_window()
+ return
+ elif args[0] == 'clear':
+ self._text_buffer.messages = []
+ self.text_win.rebuild_everything(self._text_buffer)
+ self.core.refresh_window()
+ return
+ elif args[0] == 'status':
+ self.core.information('Total %s lines in this tab.' % len(self.text_win.built_lines), 'Info')
+ return
+ elif len(args) == 2 and args[0] == 'goto':
+ for fmt in ('%d %H:%M', '%d %H:%M:%S', '%d:%m %H:%M', '%d:%m %H:%M:%S', '%H:%M', '%H:%M:%S'):
+ try:
+ new_date = datetime.strptime(args[1], fmt)
+ if 'd' in fmt and 'm' in fmt:
+ new_date = new_date.replace(year=datetime.now().year)
+ elif 'd' in fmt:
+ new_date = new_date.replace(year=datetime.now().year, month=datetime.now().month)
+ else:
+ new_date = new_date.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day)
+ except ValueError:
+ pass
+ if args[1].startswith('-'):
+ # Check if the user is giving argument of type goto <-linecount> or goto [-<days ago>] hh:mi[:ss]
+ if ' ' in args[1]:
+ new_args = args[1].split(' ')
+ new_args[0] = new_args[0].strip('-')
+ new_date = datetime.now()
+ if new_args[0].isdigit():
+ new_date = new_date.replace(day=new_date.day - int(new_args[0]))
+ for fmt in ('%H:%M', '%H:%M:%S'):
+ try:
+ arg_date = datetime.strptime(new_args[1], fmt)
+ new_date = new_date.replace(hour=arg_date.hour, minute=arg_date.minute, second=arg_date.second)
+ except ValueError:
+ pass
+ else:
+ scroll_len = args[1].strip('-')
+ if scroll_len.isdigit():
+ self.text_win.scroll_down(int(scroll_len))
+ self.core.refresh_window()
+ return
+ elif args[1].startswith('+'):
+ scroll_len = args[1].strip('+')
+ if scroll_len.isdigit():
+ self.text_win.scroll_up(int(scroll_len))
+ self.core.refresh_window()
+ return
+ # Check for the argument of type goto <linenum>
+ elif args[1].isdigit():
+ if len(self.text_win.built_lines) - self.text_win.height >= int(args[1]):
+ self.text_win.pos = len(self.text_win.built_lines) - self.text_win.height - int(args[1])
+ self.core.refresh_window()
+ return
+ else:
+ self.text_win.pos = 0
+ self.core.refresh_window()
+ return
+ elif args[1] == '0':
+ args = ['home']
+ # new_date is the timestamp for which the user has queried.
+ self.goto_build_lines(new_date)
+
def on_line_up(self):
return self.text_win.scroll_up(1)
@@ -789,6 +974,11 @@ class ChatTab(Tab):
return self.text_win.scroll_down(1)
def on_scroll_up(self):
+ if not self.query_status:
+ from poezio.log_loader import LogLoader
+ asyncio.create_task(
+ LogLoader(logger, self, config.getbool('use_log')).scroll_requested()
+ )
return self.text_win.scroll_up(self.text_win.height - 1)
def on_scroll_down(self):
@@ -806,15 +996,15 @@ class ChatTab(Tab):
class OneToOneTab(ChatTab):
- def __init__(self, core, jid):
+ def __init__(self, core, jid, initial=None):
ChatTab.__init__(self, core, jid)
self.__status = Status("", "")
self.last_remote_message = datetime.now()
+ self._initial_log = asyncio.Event()
# Set to true once the first disco is done
self.__initial_disco = False
- self.check_features()
self.register_command(
'unquery', self.command_unquery, shortdesc='Close the tab.')
self.register_command(
@@ -826,6 +1016,30 @@ class OneToOneTab(ChatTab):
shortdesc='Request the attention.',
desc='Attention: Request the attention of the contact. Can also '
'send a message along with the attention.')
+ asyncio.create_task(self.init_logs(initial=initial))
+
+ async def init_logs(self, initial: Optional[SMessage] = None) -> None:
+ use_log = config.get_by_tabname('use_log', self.jid)
+ mam_sync = config.get_by_tabname('mam_sync', self.jid)
+ if use_log and mam_sync:
+ limit = config.get_by_tabname('mam_sync_limit', self.jid)
+ mam_filler = MAMFiller(logger, self, limit)
+ self.mam_filler = mam_filler
+
+ if initial is not None:
+ # If there is an initial message, throw it back into the
+ # text buffer if it cannot be fetched from mam
+ await mam_filler.done.wait()
+ if mam_filler.result == 0:
+ await self.handle_message(initial)
+ elif use_log and initial:
+ await self.handle_message(initial, display=False)
+ elif initial:
+ await self.handle_message(initial)
+ await LogLoader(logger, self, use_log, self._initial_log).tab_open()
+
+ async def handle_message(self, msg: SMessage, display: bool = True):
+ pass
def remote_user_color(self):
return dump_tuple(get_theme().COLOR_REMOTE_USER)
@@ -852,7 +1066,9 @@ class OneToOneTab(ChatTab):
msg += 'status: %s, ' % status.message
if status.show in SHOW_NAME:
msg += 'show: %s, ' % SHOW_NAME[status.show]
- self.add_message(msg[:-2], typ=2)
+ self.add_message(
+ PersistentInfoMessage(txt=msg[:-2])
+ )
def ack_message(self, msg_id: str, msg_jid: JID):
"""
@@ -884,26 +1100,21 @@ class OneToOneTab(ChatTab):
message.send()
body = xhtml.xhtml_to_poezio_colors(xhtml_data, force=True)
self._text_buffer.add_message(
- body,
- nickname=self.core.own_nick,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=message['id'],
- jid=self.core.xmpp.boundjid)
+ Message(
+ body,
+ nickname=self.core.own_nick,
+ nick_color=get_theme().COLOR_OWN_NICK,
+ identifier=message['id'],
+ jid=self.core.xmpp.boundjid,
+ )
+ )
self.refresh()
- def check_features(self):
- "check the features supported by the other party"
- if safeJID(self.get_dest_jid()).resource:
- self.core.xmpp.plugin['xep_0030'].get_info(
- jid=self.get_dest_jid(),
- timeout=5,
- callback=self.features_checked)
-
@command_args_parser.raw
- def command_attention(self, message):
+ async def command_attention(self, message):
"""/attention [message]"""
- if message is not '':
- self.command_say(message, attention=True)
+ if message != '':
+ await self.command_say(message, attention=True)
else:
msg = self.core.xmpp.make_message(self.get_dest_jid())
msg['type'] = 'chat'
@@ -911,7 +1122,7 @@ class OneToOneTab(ChatTab):
msg.send()
@command_args_parser.raw
- def command_say(self, line, correct=False, attention=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
pass
@command_args_parser.ignored
@@ -935,7 +1146,3 @@ class OneToOneTab(ChatTab):
msg = msg % (self.name, feature, command_name)
self.core.information(msg, 'Info')
return True
-
- def features_checked(self, iq):
- "Features check callback"
- features = iq['disco_info'].get_features() or []
diff --git a/poezio/tabs/bookmarkstab.py b/poezio/tabs/bookmarkstab.py
index 816402a7..d21b5630 100644
--- a/poezio/tabs/bookmarkstab.py
+++ b/poezio/tabs/bookmarkstab.py
@@ -2,14 +2,18 @@
Defines the data-forms Tab
"""
+import asyncio
import logging
from typing import Dict, Callable, List
+from slixmpp.exceptions import IqError, IqTimeout
+
from poezio import windows
from poezio.bookmarks import Bookmark, BookmarkList
from poezio.core.structs import Command
from poezio.tabs import Tab
-from poezio.common import safeJID
+
+from slixmpp import JID, InvalidJID
log = logging.getLogger(__name__)
@@ -19,20 +23,19 @@ class BookmarksTab(Tab):
A tab displaying lines of bookmarks, each bookmark having
a 4 widgets to set the jid/password/autojoin/storage method
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core, bookmarks: BookmarkList):
Tab.__init__(self, core)
- self.name = "Bookmarks"
+ self._name = "Bookmarks"
self.bookmarks = bookmarks
- self.new_bookmarks = [] # type: List[Bookmark]
- self.removed_bookmarks = [] # type: List[Bookmark]
+ self.new_bookmarks: List[Bookmark] = []
+ self.removed_bookmarks: List[Bookmark] = []
self.header_win = windows.ColumnHeaderWin(
- ('name', 'room@server/nickname', 'password', 'autojoin',
- 'storage'))
- self.bookmarks_win = windows.BookmarksWin(
- self.bookmarks, self.height - 4, self.width, 1, 0)
+ ['name', 'room@server/nickname', 'password', 'autojoin',
+ 'storage'])
+ self.bookmarks_win = windows.BookmarksWin(self.bookmarks)
self.help_win = windows.HelpText('Ctrl+Y: save, Ctrl+G: cancel, '
'↑↓: change lines, tab: change '
'column, M-a: add bookmark, C-k'
@@ -50,7 +53,7 @@ class BookmarksTab(Tab):
def add_bookmark(self):
new_bookmark = Bookmark(
- safeJID('room@example.tld/nick'), method='local')
+ JID('room@example.tld/nick'), method='local')
self.new_bookmarks.append(new_bookmark)
self.bookmarks_win.add_bookmark(new_bookmark)
@@ -78,26 +81,31 @@ class BookmarksTab(Tab):
'Duplicate bookmarks in list (saving aborted)', 'Error')
return
for bm in self.new_bookmarks:
- if safeJID(bm.jid):
+ try:
+ JID(bm.jid)
if not self.bookmarks[bm.jid]:
self.bookmarks.append(bm)
- else:
+ except InvalidJID:
self.core.information(
'Invalid JID for bookmark: %s/%s' % (bm.jid, bm.nick),
'Error')
return
+
for bm in self.removed_bookmarks:
if bm in self.bookmarks:
self.bookmarks.remove(bm)
- def send_cb(success):
- if success:
- self.core.information('Bookmarks saved.', 'Info')
- else:
- self.core.information('Remote bookmarks not saved.', 'Error')
+ asyncio.create_task(
+ self.save_routine()
+ )
- self.bookmarks.save(self.core.xmpp, callback=send_cb)
+ async def save_routine(self):
+ try:
+ await self.bookmarks.save(self.core.xmpp)
+ self.core.information('Bookmarks saved', 'Info')
+ except (IqError, IqTimeout):
+ self.core.information('Remote bookmarks not saved.', 'Error')
self.core.close_tab(self)
return True
@@ -108,7 +116,7 @@ class BookmarksTab(Tab):
return res
self.bookmarks_win.refresh_current_input()
else:
- self.bookmarks_win.on_input(key)
+ self.bookmarks_win.on_input(key, raw=raw)
def resize(self):
self.need_resize = False
diff --git a/poezio/tabs/confirmtab.py b/poezio/tabs/confirmtab.py
index c13de4a6..d7488de7 100644
--- a/poezio/tabs/confirmtab.py
+++ b/poezio/tabs/confirmtab.py
@@ -13,8 +13,8 @@ log = logging.getLogger(__name__)
class ConfirmTab(Tab):
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self,
core,
@@ -34,7 +34,7 @@ class ConfirmTab(Tab):
"""
Tab.__init__(self, core)
self.state = 'highlight'
- self.name = name
+ self._name = name
self.default_help_message = windows.HelpText(
"Choose with arrow keys and press enter")
self.input = self.default_help_message
diff --git a/poezio/tabs/conversationtab.py b/poezio/tabs/conversationtab.py
index f8490233..de1f988a 100644
--- a/poezio/tabs/conversationtab.py
+++ b/poezio/tabs/conversationtab.py
@@ -11,23 +11,27 @@ There are two different instances of a ConversationTab:
the time.
"""
+import asyncio
import curses
import logging
+from datetime import datetime
from typing import Dict, Callable
+from slixmpp import JID, InvalidJID, Message as SMessage
+
from poezio.tabs.basetabs import OneToOneTab, Tab
from poezio import common
from poezio import windows
from poezio import xhtml
-from poezio.common import safeJID
-from poezio.config import config
+from poezio.config import config, get_image_cache
from poezio.core.structs import Command
from poezio.decorators import refresh_wrapper
from poezio.roster import roster
-from poezio.text_buffer import CorrectionError
from poezio.theming import get_theme, dump_tuple
from poezio.decorators import command_args_parser
+from poezio.ui.types import InfoMessage, Message
+from poezio.text_buffer import CorrectionError
log = logging.getLogger(__name__)
@@ -37,18 +41,16 @@ class ConversationTab(OneToOneTab):
The tab containing a normal conversation (not from a MUC)
Must not be instantiated, use Static or Dynamic version only.
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
- additional_information = {} # type: Dict[str, Callable[[str], str]]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
+ additional_information: Dict[str, Callable[[str], str]] = {}
message_type = 'chat'
- def __init__(self, core, jid):
- OneToOneTab.__init__(self, core, jid)
+ def __init__(self, core, jid, initial=None):
+ OneToOneTab.__init__(self, core, jid, initial=initial)
self.nick = None
self.nick_sent = False
self.state = 'normal'
- self.text_win = windows.TextWin()
- self._text_buffer.add_window(self.text_win)
self.upper_bar = windows.ConversationStatusMessageWin()
self.input = windows.MessageInput()
# keys
@@ -81,8 +83,8 @@ class ConversationTab(OneToOneTab):
self.update_keys()
@property
- def general_jid(self):
- return self.jid.bare
+ def general_jid(self) -> JID:
+ return JID(self.jid.bare)
def get_info_header(self):
raise NotImplementedError
@@ -103,9 +105,88 @@ class ConversationTab(OneToOneTab):
def completion(self):
self.complete_commands(self.input)
+ async def handle_message(self, message: SMessage, display: bool = True):
+ """Handle a received message.
+
+ The message can come from us (carbon copy).
+ """
+
+ # Prevent messages coming from our own devices (1:1) to be reflected
+ if message['to'].bare == self.core.xmpp.boundjid.bare and \
+ message['from'].bare == self.core.xmpp.boundjid.bare:
+ _, index = self._text_buffer._find_message(message['id'])
+ if index != -1:
+ return
+
+ use_xhtml = config.get_by_tabname(
+ 'enable_xhtml_im',
+ message['from'].bare
+ )
+ tmp_dir = get_image_cache()
+
+ # normal message, we are the recipient
+ if message['to'].bare == self.core.xmpp.boundjid.bare:
+ conv_jid = message['from']
+ jid = conv_jid
+ color = get_theme().COLOR_REMOTE_USER
+ self.last_remote_message = datetime.now()
+ remote_nick = self.get_nick()
+ # we wrote the message (happens with carbons)
+ elif message['from'].bare == self.core.xmpp.boundjid.bare:
+ conv_jid = message['to']
+ jid = self.core.xmpp.boundjid
+ color = get_theme().COLOR_OWN_NICK
+ remote_nick = self.core.own_nick
+ # we are not part of that message, drop it
+ else:
+ return
+
+ await self.core.events.trigger_async('conversation_msg', message, self)
+
+ if not message['body']:
+ return
+ body = xhtml.get_body_from_message_stanza(
+ message, use_xhtml=use_xhtml, extract_images_to=tmp_dir)
+ delayed, date = common.find_delayed_tag(message)
+
+ replaced = False
+ if message.get_plugin('replace', check=True):
+ replaced_id = message['replace']['id']
+ if replaced_id and config.get_by_tabname('group_corrections',
+ conv_jid.bare):
+ try:
+ replaced = self.modify_message(
+ body,
+ replaced_id,
+ message['id'],
+ time=date,
+ jid=jid,
+ nickname=remote_nick)
+ except CorrectionError:
+ log.debug('Unable to correct the message: %s', message)
+ if not replaced:
+ msg = Message(
+ txt=body,
+ time=date,
+ nickname=remote_nick,
+ nick_color=color,
+ history=delayed,
+ identifier=message['id'],
+ jid=jid,
+ )
+ if display:
+ self.add_message(msg)
+ else:
+ self.log_message(msg)
+
+ @refresh_wrapper.always
@command_args_parser.raw
- def command_say(self, line, attention=False, correct=False):
- msg = self.core.xmpp.make_message(self.get_dest_jid())
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
+ await self._initial_log.wait()
+ msg: SMessage = self.core.xmpp.make_message(
+ mto=self.get_dest_jid(),
+ mfrom=self.core.xmpp.boundjid
+ )
msg['type'] = 'chat'
msg['body'] = line
if not self.nick_sent:
@@ -117,24 +198,9 @@ class ConversationTab(OneToOneTab):
# be converted in xhtml.
self.core.events.trigger('conversation_say', msg, self)
if not msg['body']:
- self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
return
- replaced = False
if correct or msg['replace']['id']:
- msg['replace']['id'] = self.last_sent_message['id']
- if config.get_by_tabname('group_corrections', self.jid.full):
- try:
- self.modify_message(
- msg['body'],
- self.last_sent_message['id'],
- msg['id'],
- jid=self.core.xmpp.boundjid,
- nickname=self.core.own_nick)
- replaced = True
- except CorrectionError:
- log.error('Unable to correct a message', exc_info=True)
+ msg['replace']['id'] = self.last_sent_message['id'] # type: ignore
else:
del msg['replace']
if msg['body'].find('\x19') != -1:
@@ -142,31 +208,21 @@ class ConversationTab(OneToOneTab):
msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body'])
msg['body'] = xhtml.clean_text(msg['body'])
if config.get_by_tabname('send_chat_states', self.general_jid):
- needed = 'inactive' if self.inactive else 'active'
- msg['chat_state'] = needed
+ if self.inactive:
+ self.send_chat_state('inactive', always_send=True)
+ else:
+ msg['chat_state'] = 'active'
if attention:
msg['attention'] = True
self.core.events.trigger('conversation_say_after', msg, self)
if not msg['body']:
- self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
return
- if not replaced:
- self.add_message(
- msg['body'],
- nickname=self.core.own_nick,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=msg['id'],
- jid=self.core.xmpp.boundjid,
- typ=1)
-
- self.last_sent_message = msg
- msg._add_receipt = True
+ self.set_last_sent_message(msg, correct=correct)
+ msg._add_receipt = True # type: ignore
msg.send()
+ await self.core.handler.on_normal_message(msg)
+ # Our receipts slixmpp hack
self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
@command_args_parser.quoted(0, 1)
def command_last_activity(self, args):
@@ -190,7 +246,13 @@ class ConversationTab(OneToOneTab):
status = iq['last_activity']['status']
from_ = iq['from']
msg = '\x19%s}The last activity of %s was %s ago%s'
- if not safeJID(from_).user:
+ user = ''
+ try:
+ user = JID(from_).user
+ except InvalidJID:
+ pass
+
+ if not user:
msg = '\x19%s}The uptime of %s is %s.' % (
dump_tuple(get_theme().COLOR_INFORMATION_TEXT), from_,
common.parse_secs_to_str(seconds))
@@ -202,7 +264,7 @@ class ConversationTab(OneToOneTab):
(' and their last status was %s' % status)
if status else '',
)
- self.add_message(msg)
+ self.add_message(InfoMessage(msg))
self.core.refresh_window()
self.core.xmpp.plugin['xep_0012'].get_last_activity(
@@ -212,7 +274,10 @@ class ConversationTab(OneToOneTab):
@command_args_parser.ignored
def command_info(self):
contact = roster[self.get_dest_jid()]
- jid = safeJID(self.get_dest_jid())
+ try:
+ jid = JID(self.get_dest_jid())
+ except InvalidJID:
+ jid = JID('')
if contact:
if jid.resource:
resource = contact[jid.full]
@@ -221,35 +286,29 @@ class ConversationTab(OneToOneTab):
else:
resource = None
if resource:
- status = (
- 'Status: %s' % resource.status) if resource.status else ''
- self._text_buffer.add_message(
- "\x19%(info_col)s}Show: %(show)s, %(status)s\x19o" % {
- 'show': resource.presence or 'available',
- 'status': status,
- 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- })
- return True
- else:
- self._text_buffer.add_message(
- "\x19%(info_col)s}No information available\x19o" %
- {'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)})
+ status = (f', Status: {resource.status}') if resource.status else ''
+ show = f"Show: {resource.presence or 'available'}"
+ self.add_message(InfoMessage(f'{show}{status}'))
return True
+ self.add_message(
+ InfoMessage("No information available"),
+ )
+ return True
@command_args_parser.quoted(0, 1)
- def command_version(self, args):
+ async def command_version(self, args):
"""
/version [jid]
"""
if args:
- return self.core.command.version(args[0])
+ return await self.core.command.version(args[0])
jid = self.jid
if not jid.resource:
if jid in roster:
resource = roster[jid].get_highest_priority_resource()
jid = resource.jid if resource else jid
- self.core.xmpp.plugin['xep_0092'].get_version(
- jid, callback=self.core.handler.on_version_result)
+ iq = await self.core.xmpp.plugin['xep_0092'].get_version(jid)
+ self.core.handler.on_version_result(iq)
def resize(self):
self.need_resize = False
@@ -266,8 +325,10 @@ class ConversationTab(OneToOneTab):
self.text_win.resize(
self.height - 2 - bar_height - info_win_height - tab_win_height,
- self.width, bar_height, 0)
- self.text_win.rebuild_everything(self._text_buffer)
+ self.width, bar_height, 0, self._text_buffer,
+ force=self.ui_config_changed
+ )
+ self.ui_config_changed = False
if display_bar:
self.upper_bar.resize(1, self.width, 0, 0)
self.get_info_header().resize(
@@ -308,7 +369,7 @@ class ConversationTab(OneToOneTab):
else:
if self.nick:
return self.nick
- return self.jid.user
+ return self.jid.user or self.jid.domain
def on_input(self, key, raw):
if not raw and key in self.key_func:
@@ -323,7 +384,10 @@ class ConversationTab(OneToOneTab):
def on_lose_focus(self):
contact = roster[self.get_dest_jid()]
- jid = safeJID(self.get_dest_jid())
+ try:
+ jid = JID(self.get_dest_jid())
+ except InvalidJID:
+ jid = JID('')
if contact:
if jid.resource:
resource = contact[jid.full]
@@ -344,7 +408,10 @@ class ConversationTab(OneToOneTab):
def on_gain_focus(self):
contact = roster[self.get_dest_jid()]
- jid = safeJID(self.get_dest_jid())
+ try:
+ jid = JID(self.get_dest_jid())
+ except InvalidJID:
+ jid = JID('')
if contact:
if jid.resource:
resource = contact[jid.full]
@@ -371,9 +438,6 @@ class ConversationTab(OneToOneTab):
1, self.width, self.height - 2 - self.core.information_win_size -
Tab.tab_win_height(), 0)
- def get_text_window(self):
- return self.text_win
-
def on_close(self):
Tab.on_close(self)
if config.get_by_tabname('send_chat_states', self.general_jid):
@@ -397,12 +461,12 @@ class DynamicConversationTab(ConversationTab):
bad idea so it has been removed.
Only one DynamicConversationTab can be opened for a given jid.
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
- def __init__(self, core, jid, resource=None):
+ def __init__(self, core, jid, initial=None):
self.locked_resource = None
- ConversationTab.__init__(self, core, jid)
+ ConversationTab.__init__(self, core, jid, initial=initial)
self.jid.resource = None
self.info_header = windows.DynamicConversationInfoWin()
self.register_command(
@@ -467,16 +531,20 @@ class StaticConversationTab(ConversationTab):
A conversation tab associated with one Full JID. It cannot be locked to
an different resource or unlocked.
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
- def __init__(self, core, jid):
- ConversationTab.__init__(self, core, jid)
+ def __init__(self, core, jid, initial=None):
+ ConversationTab.__init__(self, core, jid, initial=initial)
assert jid.resource
self.info_header = windows.ConversationInfoWin()
self.resize()
self.update_commands()
self.update_keys()
+ async def init_logs(self, initial=None) -> None:
+ # Disable local logs because…
+ pass
+
def get_info_header(self):
return self.info_header
diff --git a/poezio/tabs/data_forms.py b/poezio/tabs/data_forms.py
index f4ed63e5..8e13a84c 100644
--- a/poezio/tabs/data_forms.py
+++ b/poezio/tabs/data_forms.py
@@ -17,8 +17,8 @@ class DataFormsTab(Tab):
A tab containing various window type, displaying
a form that the user needs to fill.
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core, form, on_cancel, on_send, kwargs):
Tab.__init__(self, core)
diff --git a/poezio/tabs/listtab.py b/poezio/tabs/listtab.py
index 87e7d9f4..049f7076 100644
--- a/poezio/tabs/listtab.py
+++ b/poezio/tabs/listtab.py
@@ -18,8 +18,8 @@ log = logging.getLogger(__name__)
class ListTab(Tab):
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core, name, help_message, header_text, cols):
"""Parameters:
@@ -34,7 +34,7 @@ class ListTab(Tab):
Tab.__init__(self, core)
self.state = 'normal'
self._error_message = ''
- self.name = name
+ self._name = name
columns = collections.OrderedDict()
for col, num in cols:
columns[col] = num
diff --git a/poezio/tabs/muclisttab.py b/poezio/tabs/muclisttab.py
index 4c1e492f..53fce727 100644
--- a/poezio/tabs/muclisttab.py
+++ b/poezio/tabs/muclisttab.py
@@ -4,6 +4,7 @@ A MucListTab is a tab listing the rooms on a conference server.
It has no functionality except scrolling the list, and allowing the
user to join the rooms.
"""
+import asyncio
import logging
from typing import Dict, Callable
@@ -20,8 +21,8 @@ class MucListTab(ListTab):
A tab listing rooms from a specific server, displaying various information,
scrollable, and letting the user join them, etc
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core, server):
ListTab.__init__(self, core, server.full, "“j”: join room.",
@@ -74,4 +75,4 @@ class MucListTab(ListTab):
row = self.listview.get_selected_row()
if not row:
return
- self.core.command.join(row[1])
+ asyncio.ensure_future(self.core.command.join(row[1]))
diff --git a/poezio/tabs/muctab.py b/poezio/tabs/muctab.py
index 81bb5f0b..e2d546c9 100644
--- a/poezio/tabs/muctab.py
+++ b/poezio/tabs/muctab.py
@@ -7,6 +7,9 @@ It keeps track of many things such as part/joins, maintains an
user list, and updates private tabs when necessary.
"""
+from __future__ import annotations
+
+import asyncio
import bisect
import curses
import logging
@@ -15,76 +18,113 @@ import random
import re
import functools
from copy import copy
+from dataclasses import dataclass
from datetime import datetime
-from typing import Dict, Callable, List, Optional, Union, Set
-
-from slixmpp import InvalidJID, JID
+from typing import (
+ cast,
+ Any,
+ Dict,
+ Callable,
+ List,
+ Optional,
+ Tuple,
+ Union,
+ Set,
+ Type,
+ Pattern,
+ TYPE_CHECKING,
+)
+
+from slixmpp import InvalidJID, JID, Presence, Iq, Message as SMessage
+from slixmpp.exceptions import IqError, IqTimeout
from poezio.tabs import ChatTab, Tab, SHOW_NAME
from poezio import common
-from poezio import fixes
from poezio import multiuserchat as muc
from poezio import timed_events
from poezio import windows
from poezio import xhtml
-from poezio.common import safeJID
-from poezio.config import config
+from poezio.common import to_utc
+from poezio.config import config, get_image_cache
from poezio.core.structs import Command
from poezio.decorators import refresh_wrapper, command_args_parser
from poezio.logger import logger
+from poezio.log_loader import LogLoader, MAMFiller
from poezio.roster import roster
+from poezio.text_buffer import CorrectionError
from poezio.theming import get_theme, dump_tuple
from poezio.user import User
from poezio.core.structs import Completion, Status
+from poezio.ui.types import (
+ BaseMessage,
+ InfoMessage,
+ Message,
+ MucOwnJoinMessage,
+ MucOwnLeaveMessage,
+ PersistentInfoMessage,
+)
+
+if TYPE_CHECKING:
+ from poezio.core.core import Core
+ from slixmpp.plugins.xep_0004 import Form
log = logging.getLogger(__name__)
NS_MUC_USER = 'http://jabber.org/protocol/muc#user'
-STATUS_XPATH = '{%s}x/{%s}status' % (NS_MUC_USER, NS_MUC_USER)
COMPARE_USERS_LAST_TALKED = lambda x: x.last_talked
+@dataclass
+class MessageData:
+ message: SMessage
+ delayed: bool
+ date: Optional[datetime]
+ nick: str
+ user: Optional[User]
+ room_from: str
+ body: str
+ is_history: bool
+
+
class MucTab(ChatTab):
"""
The tab containing a multi-user-chat room.
It contains a userlist, an input, a topic, an information and a chat zone
"""
message_type = 'groupchat'
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
- additional_information = {} # type: Dict[str, Callable[[str], str]]
- lagged = False
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable[..., Any]] = {}
+ additional_information: Dict[str, Callable[[str], str]] = {}
+ lagged: bool = False
- def __init__(self, core, jid, nick, password=None):
+ def __init__(self, core: Core, jid: JID, nick: str, password: Optional[str] = None) -> None:
ChatTab.__init__(self, core, jid)
self.joined = False
self._state = 'disconnected'
# our nick in the MUC
self.own_nick = nick
# self User object
- self.own_user = None # type: Optional[User]
+ self.own_user: Optional[User] = None
self.password = password
# buffered presences
- self.presence_buffer = []
+ self.presence_buffer: List[Presence] = []
# userlist
- self.users = [] # type: List[User]
+ self.users: List[User] = []
# private conversations
- self.privates = [] # type: List[Tab]
+ self.privates: List[Tab] = []
self.topic = ''
self.topic_from = ''
# Self ping event, so we can cancel it when we leave the room
- self.self_ping_event = None
+ self.self_ping_event: Optional[timed_events.DelayedEvent] = None
# UI stuff
self.topic_win = windows.Topic()
- self.text_win = windows.TextWin()
- self._text_buffer.add_window(self.text_win)
self.v_separator = windows.VerticalSeparator()
self.user_win = windows.UserList()
self.info_header = windows.MucInfoWin()
- self.input = windows.MessageInput()
+ self.input: windows.MessageInput = windows.MessageInput()
# List of ignored users
- self.ignores = [] # type: List[User]
+ self.ignores: List[User] = []
# keys
self.register_keys()
self.update_keys()
@@ -94,7 +134,7 @@ class MucTab(ChatTab):
self.resize()
@property
- def general_jid(self):
+ def general_jid(self) -> JID:
return self.jid
def check_send_chat_state(self) -> bool:
@@ -124,40 +164,49 @@ class MucTab(ChatTab):
"""
del MucTab.additional_information[plugin_name]
- def cancel_config(self, form):
+ def cancel_config(self, form: Form) -> None:
"""
The user do not want to send their config, send an iq cancel
"""
- muc.cancel_config(self.core.xmpp, self.jid.bare)
+ asyncio.create_task(self.core.xmpp['xep_0045'].cancel_config(self.jid))
self.core.close_tab()
- def send_config(self, form):
+ def send_config(self, form: Form) -> None:
"""
The user sends their config to the server
"""
- muc.configure_room(self.core.xmpp, self.jid.bare, form)
+ asyncio.create_task(self.core.xmpp['xep_0045'].set_room_config(self.jid, form))
self.core.close_tab()
- def join(self):
+ def join(self) -> None:
"""
Join the room
"""
+ seconds: Optional[int]
status = self.core.get_status()
if self.last_connection:
- delta = datetime.now() - self.last_connection
+ delta = to_utc(datetime.now()) - to_utc(self.last_connection)
seconds = delta.seconds + delta.days * 24 * 3600
else:
+ last_message = self._text_buffer.find_last_message()
seconds = None
+ if last_message is not None:
+ seconds = (datetime.now() - last_message.time).seconds
+ use_log = config.get_by_tabname('mam_sync', self.general_jid)
+ mam_sync = config.get_by_tabname('mam_sync', self.general_jid)
+ if self.mam_filler is None and use_log and mam_sync:
+ limit = config.get_by_tabname('mam_sync_limit', self.jid)
+ self.mam_filler = MAMFiller(logger, self, limit)
muc.join_groupchat(
self.core,
- self.jid.bare,
+ self.jid,
self.own_nick,
- self.password,
+ self.password or '',
status=status.message,
show=status.show,
seconds=seconds)
- def leave_room(self, message: str):
+ def leave_room(self, message: str) -> None:
if self.joined:
theme = get_theme()
info_col = dump_tuple(theme.COLOR_INFORMATION_TEXT)
@@ -192,80 +241,103 @@ class MucTab(ChatTab):
'color_spec': spec_col,
'nick': self.own_nick,
}
-
- self.add_message(msg, typ=2)
+ self.add_message(MucOwnLeaveMessage(msg))
self.disconnect()
- muc.leave_groupchat(self.core.xmpp, self.jid.bare, self.own_nick,
+ muc.leave_groupchat(self.core.xmpp, self.jid, self.own_nick,
message)
self.core.disable_private_tabs(self.jid.bare, reason=msg)
else:
- muc.leave_groupchat(self.core.xmpp, self.jid.bare, self.own_nick,
+ self.presence_buffer = []
+ self.users = []
+ muc.leave_groupchat(self.core.xmpp, self.jid, self.own_nick,
message)
- def change_affiliation(self,
- nick_or_jid: Union[str, JID],
- affiliation: str,
- reason=''):
+ async def change_affiliation(
+ self,
+ nick_or_jid: Union[str, JID],
+ affiliation: str,
+ reason: str = ''
+ ) -> None:
"""
Change the affiliation of a nick or JID
"""
-
- def callback(iq):
- if iq['type'] == 'error':
- self.core.information(
- "Could not set affiliation '%s' for '%s'." %
- (affiliation, nick_or_jid), "Warning")
-
if not self.joined:
return
valid_affiliations = ('outcast', 'none', 'member', 'admin', 'owner')
if affiliation not in valid_affiliations:
- return self.core.information(
+ self.core.information(
'The affiliation must be one of ' +
', '.join(valid_affiliations), 'Error')
- if nick_or_jid in [user.nick for user in self.users]:
- muc.set_user_affiliation(
- self.core.xmpp,
- self.jid.bare,
- affiliation,
- nick=nick_or_jid,
- callback=callback,
- reason=reason)
- else:
- muc.set_user_affiliation(
- self.core.xmpp,
- self.jid.bare,
- affiliation,
- jid=safeJID(nick_or_jid),
- callback=callback,
- reason=reason)
+ return
+ jid = None
+ nick = None
+ for user in self.users:
+ if user.nick == nick_or_jid:
+ jid = user.jid
+ nick = user.nick
+ break
+ if jid is None:
+ try:
+ jid = JID(nick_or_jid)
+ except InvalidJID:
+ self.core.information(
+ f'Invalid JID or missing occupant: {nick_or_jid}',
+ 'Error'
+ )
+ return
- def change_role(self, nick: str, role: str, reason=''):
+ try:
+ if affiliation != 'member':
+ nick = None
+ await self.core.xmpp['xep_0045'].set_affiliation(
+ self.jid,
+ jid=jid,
+ nick=nick,
+ affiliation=affiliation,
+ reason=reason
+ )
+ self.core.information(
+ f"Affiliation of {jid} set to {affiliation} successfully",
+ "Info"
+ )
+ except (IqError, IqTimeout) as exc:
+ self.core.information(
+ f"Could not set affiliation '{affiliation}' for '{jid}': {exc}",
+ "Warning",
+ )
+
+ async def change_role(self, nick: str, role: str, reason: str = '') -> None:
"""
Change the role of a nick
"""
- def callback(iq):
- if iq['type'] == 'error':
- self.core.information(
- "Could not set role '%s' for '%s'." % (role, nick),
- "Warning")
-
valid_roles = ('none', 'visitor', 'participant', 'moderator')
if not self.joined or role not in valid_roles:
- return self.core.information(
+ self.core.information(
'The role must be one of ' + ', '.join(valid_roles), 'Error')
+ return
try:
target_jid = copy(self.jid)
target_jid.resource = nick
except InvalidJID:
- return self.core.information('Invalid nick', 'Info')
+ self.core.information('Invalid nick', 'Info')
+ return
- muc.set_user_role(
- self.core.xmpp, self.jid.bare, nick, reason, role, callback=callback)
+ try:
+ await self.core.xmpp['xep_0045'].set_role(
+ self.jid, nick, role=role, reason=reason
+ )
+ self.core.information(
+ f'Role of {nick} changed to {role} successfully.'
+ 'Info'
+ )
+ except (IqError, IqTimeout) as e:
+ self.core.information(
+ "Could not set role '%s' for '%s': %s" % (role, nick, e),
+ "Warning")
@refresh_wrapper.conditional
def print_info(self, nick: str) -> bool:
@@ -296,15 +368,15 @@ class MucTab(ChatTab):
'role': user.role or 'None',
'status': '\n%s' % user.status if user.status else ''
}
- self.add_message(info, typ=0)
+ self.add_message(InfoMessage(info))
return True
- def change_topic(self, topic: str):
+ def change_topic(self, topic: str) -> None:
"""Change the current topic"""
- muc.change_subject(self.core.xmpp, self.jid.bare, topic)
+ self.core.xmpp.plugin['xep_0045'].set_subject(self.jid, topic)
@refresh_wrapper.always
- def show_topic(self):
+ def show_topic(self) -> None:
"""
Print the current topic
"""
@@ -322,42 +394,23 @@ class MucTab(ChatTab):
else:
user_string = ''
- self._text_buffer.add_message(
- "\x19%s}The subject of the room is: \x19%s}%s %s" %
- (info_text, norm_text, self.topic, user_string))
+ self.add_message(
+ InfoMessage(
+ "The subject of the room is: \x19%s}%s %s" %
+ (norm_text, self.topic, user_string),
+ ),
+ )
@refresh_wrapper.always
- def recolor(self, random_colors=False):
+ def recolor(self) -> None:
"""Recolor the current MUC users"""
- deterministic = config.get_by_tabname('deterministic_nick_colors',
- self.jid.bare)
- if deterministic:
- for user in self.users:
- if user is self.own_user:
- continue
- color = self.search_for_color(user.nick)
- if color != '':
- continue
- user.set_deterministic_color()
- return
- # Sort the user list by last talked, to avoid color conflicts
- # on active participants
- sorted_users = sorted(self.users, key=COMPARE_USERS_LAST_TALKED, reverse=True)
- full_sorted_users = sorted_users[:]
- # search our own user, to remove it from the list
- # Also remove users whose color is fixed
- for user in full_sorted_users:
- color = self.search_for_color(user.nick)
+ for user in self.users:
if user is self.own_user:
- sorted_users.remove(user)
- elif color != '':
- sorted_users.remove(user)
- user.change_color(color, deterministic)
- colors = list(get_theme().LIST_COLOR_NICKNAMES)
- if random_colors:
- random.shuffle(colors)
- for i, user in enumerate(sorted_users):
- user.color = colors[i % len(colors)]
+ continue
+ color = self.search_for_color(user.nick)
+ if color != '':
+ continue
+ user.set_deterministic_color()
self.text_win.rebuild_everything(self._text_buffer)
@refresh_wrapper.conditional
@@ -379,7 +432,7 @@ class MucTab(ChatTab):
user.change_color(color)
config.set_and_save(nick, color, 'muc_colors')
nick_color_aliases = config.get_by_tabname('nick_color_aliases',
- self.jid.bare)
+ self.jid)
if nick_color_aliases:
# if any user in the room has a nick which is an alias of the
# nick, update its color
@@ -392,7 +445,7 @@ class MucTab(ChatTab):
self.text_win.rebuild_everything(self._text_buffer)
return True
- def on_input(self, key, raw):
+ def on_input(self, key: str, raw: bool) -> bool:
if not raw and key in self.key_func:
self.key_func[key]()
return False
@@ -405,18 +458,15 @@ class MucTab(ChatTab):
return False
def get_nick(self) -> str:
- if config.get('show_muc_jid'):
- return self.jid.bare
- bookmark = self.core.bookmarks[self.jid.bare]
+ if config.getbool('show_muc_jid'):
+ return cast(str, self.jid)
+ bookmark = self.core.bookmarks[self.jid]
if bookmark is not None and bookmark.name:
return bookmark.name
# TODO: send the disco#info identity name here, if it exists.
- return self.jid.user
-
- def get_text_window(self):
- return self.text_win
+ return self.jid.node
- def on_lose_focus(self):
+ def on_lose_focus(self) -> None:
if self.joined:
if self.input.text:
self.state = 'nonempty'
@@ -432,10 +482,10 @@ class MucTab(ChatTab):
self.send_chat_state('inactive')
self.check_scrolled()
- def on_gain_focus(self):
+ def on_gain_focus(self) -> None:
self.state = 'current'
if (self.text_win.built_lines and self.text_win.built_lines[-1] is None
- and not config.get('show_useless_separator')):
+ and not config.getbool('show_useless_separator')):
self.text_win.remove_line_separator()
curses.curs_set(1)
if self.joined and config.get_by_tabname(
@@ -443,18 +493,134 @@ class MucTab(ChatTab):
self.general_jid) and not self.input.get_text():
self.send_chat_state('active')
- def handle_presence(self, presence):
+ async def handle_message(self, message: SMessage) -> bool:
+ """Parse an incoming message
+
+ Returns False if the message was dropped silently.
"""
- Handle MUC presence
+ room_from = message['from'].bare
+ nick_from = message['mucnick']
+ user = self.get_user_by_name(nick_from)
+ if user and user in self.ignores:
+ return False
+
+ await self.core.events.trigger_async('muc_msg', message, self)
+ use_xhtml = config.get_by_tabname('enable_xhtml_im', room_from)
+ tmp_dir = get_image_cache()
+ body = xhtml.get_body_from_message_stanza(
+ message, use_xhtml=use_xhtml, extract_images_to=tmp_dir)
+
+ # TODO: #3314. Is this a MUC reflection?
+ # Is this an encrypted message? Is so ignore.
+ # It is not possible in the OMEMO case to decrypt these messages
+ # since we don't encrypt for our own device (something something
+ # forward secrecy), but even for non-FS encryption schemes anyway
+ # messages shouldn't have changed after a round-trip to the room.
+ # Otherwire replace the matching message we sent.
+ if not body:
+ return False
+
+ old_state = self.state
+ delayed, date = common.find_delayed_tag(message)
+ is_history = not self.joined and delayed
+
+ mdata = MessageData(
+ message, delayed, date, nick_from, user, room_from, body,
+ is_history
+ )
+
+ replaced = False
+ if message.xml.find('{urn:xmpp:message-correct:0}replace') is not None:
+ replaced = await self._handle_correction_message(mdata)
+ if not replaced:
+ await self._handle_normal_message(mdata)
+ if mdata.nick == self.own_nick:
+ self.set_last_sent_message(message, correct=replaced)
+ self._refresh_after_message(old_state)
+ return True
+
+ def _refresh_after_message(self, old_state: str) -> None:
+ """Refresh the appropriate UI after a message is received"""
+ if self is self.core.tabs.current_tab:
+ self.refresh()
+ elif self.state != old_state:
+ self.core.refresh_tab_win()
+ current = self.core.tabs.current_tab
+ current.refresh_input()
+ self.core.doupdate()
+
+ async def _handle_correction_message(self, message: MessageData) -> bool:
+ """Process a correction message.
+
+ Returns true if a message was actually corrected.
"""
+ replaced_id = message.message['replace']['id']
+ if replaced_id != '' and config.get_by_tabname(
+ 'group_corrections', JID(message.room_from)):
+ try:
+ delayed_date = message.date or datetime.now()
+ modify_hl = self.modify_message(
+ message.body,
+ replaced_id,
+ message.message['id'],
+ time=delayed_date,
+ delayed=message.delayed,
+ nickname=message.nick,
+ user=message.user
+ )
+ if modify_hl:
+ await self.core.events.trigger_async(
+ 'highlight',
+ message.message,
+ self
+ )
+ return True
+ except CorrectionError:
+ log.debug('Unable to correct a message', exc_info=True)
+ return False
+
+ async def _handle_normal_message(self, message: MessageData) -> None:
+ """
+ Process the non-correction groupchat message.
+ """
+ ui_msg: Union[InfoMessage, Message]
+ # Messages coming from MUC barejid (Server maintenance, IRC mode
+ # changes from biboumi, etc.) have no nick/resource and are displayed
+ # as info messages.
+ highlight = False
+ if message.nick:
+ highlight = self.message_is_highlight(
+ message.body, message.nick, message.is_history
+ )
+ ui_msg = Message(
+ txt=message.body,
+ time=message.date,
+ nickname=message.nick,
+ history=message.is_history,
+ delayed=message.delayed,
+ identifier=message.message['id'],
+ jid=message.message['from'],
+ user=message.user,
+ highlight=highlight,
+ )
+ else:
+ ui_msg = InfoMessage(
+ txt=message.body,
+ time=message.date,
+ identifier=message.message['id'],
+ )
+ self.add_message(ui_msg)
+ if highlight:
+ await self.core.events.trigger_async('highlight', message, self)
+
+ def handle_presence(self, presence: Presence) -> None:
+ """Handle MUC presence"""
self.reset_lag()
- status_codes = set()
- for status_code in presence.xml.findall(STATUS_XPATH):
- status_codes.add(status_code.attrib['code'])
+ status_codes = presence['muc']['status_codes']
if presence['type'] == 'error':
self.core.room_error(presence, self.jid.bare)
elif not self.joined:
- own = '110' in status_codes or self.own_nick == presence['from'].resource
+ own = 110 in status_codes
if own or len(self.presence_buffer) >= 10:
self.process_presence_buffer(presence, own)
else:
@@ -474,20 +640,17 @@ class MucTab(ChatTab):
self.input.refresh()
self.core.doupdate()
- def process_presence_buffer(self, last_presence, own):
+ def process_presence_buffer(self, last_presence: Presence, own: bool) -> None:
"""
Batch-process all the initial presences
"""
- deterministic = config.get_by_tabname('deterministic_nick_colors',
- self.jid.bare)
-
for stanza in self.presence_buffer:
try:
- self.handle_presence_unjoined(stanza, deterministic)
+ self.handle_presence_unjoined(stanza)
except PresenceError:
self.core.room_error(stanza, stanza['from'].bare)
self.presence_buffer = []
- self.handle_presence_unjoined(last_presence, deterministic, own)
+ self.handle_presence_unjoined(last_presence, own)
self.users.sort()
# Enable the self ping event, to regularly check if we
# are still in the room.
@@ -498,34 +661,35 @@ class MucTab(ChatTab):
self.core.tabs.current_tab.refresh_input()
self.core.doupdate()
- def handle_presence_unjoined(self, presence, deterministic, own=False):
+ def handle_presence_unjoined(self, presence: Presence, own: bool = False) -> None:
"""
Presence received while we are not in the room (before code=110)
"""
- from_nick, _, affiliation, show, status, role, jid, typ = dissect_presence(
- presence)
+ # If presence is coming from MUC barejid, ignore.
+ if not presence['from'].resource:
+ return None
+ dissected_presence = dissect_presence(presence)
+ from_nick, _, affiliation, show, status, role, jid, typ = dissected_presence
if typ == 'unavailable':
return
user_color = self.search_for_color(from_nick)
new_user = User(from_nick, affiliation, show, status, role, jid,
- deterministic, user_color)
+ user_color)
self.users.append(new_user)
self.core.events.trigger('muc_join', presence, self)
if own:
- status_codes = set()
- for status_code in presence.xml.findall(STATUS_XPATH):
- status_codes.add(status_code.attrib['code'])
+ status_codes = presence['muc']['status_codes']
self.own_join(from_nick, new_user, status_codes)
- def own_join(self, from_nick: str, new_user: User, status_codes: Set[str]):
+ def own_join(self, from_nick: str, new_user: User, status_codes: Set[int]) -> None:
"""
Handle the last presence we received, entering the room
"""
self.own_nick = from_nick
self.own_user = new_user
self.joined = True
- if self.jid.bare in self.core.initial_joins:
- self.core.initial_joins.remove(self.jid.bare)
+ if self.jid in self.core.initial_joins:
+ self.core.initial_joins.remove(self.jid)
self._state = 'normal'
elif self != self.core.tabs.current_tab:
self._state = 'joined'
@@ -553,42 +717,51 @@ class MucTab(ChatTab):
'nick_col': color,
'info_col': info_col,
}
- self.add_message(enable_message, typ=2)
+ self.add_message(MucOwnJoinMessage(enable_message))
self.core.enable_private_tabs(self.jid.bare, enable_message)
- if '201' in status_codes:
+ if 201 in status_codes:
self.add_message(
- '\x19%(info_col)s}Info: The room '
- 'has been created' % {'info_col': info_col},
- typ=0)
- if '170' in status_codes:
+ PersistentInfoMessage('Info: The room has been created'),
+ )
+ if 170 in status_codes:
self.add_message(
- '\x19%(warn_col)s}Warning:\x19%(info_col)s}'
- ' This room is publicly logged' % {
- 'info_col': info_col,
- 'warn_col': warn_col
- },
- typ=0)
- if '100' in status_codes:
+ InfoMessage(
+ '\x19%(warn_col)s}Warning:\x19%(info_col)s}'
+ ' This room is publicly logged' % {
+ 'info_col': info_col,
+ 'warn_col': warn_col
+ }
+ ),
+ )
+ if 100 in status_codes:
self.add_message(
- '\x19%(warn_col)s}Warning:\x19%(info_col)s}'
- ' This room is not anonymous.' % {
- 'info_col': info_col,
- 'warn_col': warn_col
- },
- typ=0)
-
- def handle_presence_joined(self, presence, status_codes):
+ InfoMessage(
+ '\x19%(warn_col)s}Warning:\x19%(info_col)s}'
+ ' This room is not anonymous.' % {
+ 'info_col': info_col,
+ 'warn_col': warn_col
+ },
+ ),
+ )
+ asyncio.create_task(LogLoader(
+ logger, self, config.get_by_tabname('use_log', self.general_jid)
+ ).tab_open())
+
+ def handle_presence_joined(self, presence: Presence, status_codes: Set[int]) -> None:
"""
Handle new presences when we are already in the room
"""
- from_nick, from_room, affiliation, show, status, role, jid, typ = dissect_presence(
- presence)
- change_nick = '303' in status_codes
- kick = '307' in status_codes and typ == 'unavailable'
- ban = '301' in status_codes and typ == 'unavailable'
- shutdown = '332' in status_codes and typ == 'unavailable'
- server_initiated = '333' in status_codes and typ == 'unavailable'
- non_member = '322' in status_codes and typ == 'unavailable'
+ # If presence is coming from MUC barejid, ignore.
+ if not presence['from'].resource:
+ return None
+ dissected_presence = dissect_presence(presence)
+ from_nick, from_room, affiliation, show, status, role, jid, typ = dissected_presence
+ change_nick = 303 in status_codes
+ kick = 307 in status_codes and typ == 'unavailable'
+ ban = 301 in status_codes and typ == 'unavailable'
+ shutdown = 332 in status_codes and typ == 'unavailable'
+ server_initiated = 333 in status_codes and typ == 'unavailable'
+ non_member = 322 in status_codes and typ == 'unavailable'
user = self.get_user_by_name(from_nick)
# New user
if not user and typ != "unavailable":
@@ -597,11 +770,11 @@ class MucTab(ChatTab):
self.on_user_join(from_nick, affiliation, show, status, role, jid,
user_color)
elif user is None:
- log.error('BUG: User %s in %s is None', from_nick, self.jid.bare)
+ log.error('BUG: User %s in %s is None', from_nick, self.jid)
return
elif change_nick:
self.core.events.trigger('muc_nickchange', presence, self)
- self.on_user_nick_change(presence, user, from_nick, from_room)
+ self.on_user_nick_change(presence, user, from_nick)
elif ban:
self.core.events.trigger('muc_ban', presence, self)
self.core.on_user_left_private_conversation(
@@ -621,39 +794,50 @@ class MucTab(ChatTab):
# user quit
elif typ == 'unavailable':
self.on_user_leave_groupchat(user, jid, status, from_nick,
- from_room, server_initiated)
+ JID(from_room), server_initiated)
+ ns = 'http://jabber.org/protocol/muc#user'
+ if presence.xml.find(f'{{{ns}}}x/{{{ns}}}destroy') is not None:
+ info = f'Room {self.jid} was destroyed.'
+ if presence['muc']['destroy']:
+ reason = presence['muc']['destroy']['reason']
+ altroom = presence['muc']['destroy']['jid']
+ if reason:
+ info += f' “{reason}”.'
+ if altroom:
+ info += f' The new address now is {altroom}.'
+ self.core.information(info, 'Info')
# status change
else:
self.on_user_change_status(user, from_nick, from_room, affiliation,
role, show, status)
- def on_non_member_kicked(self):
+ def on_non_member_kicked(self) -> None:
"""We have been kicked because the MUC is members-only"""
self.add_message(
- '\x19%(info_col)s}You have been kicked because you '
- 'are not a member and the room is now members-only.' %
- {'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)},
- typ=2)
+ MucOwnLeaveMessage(
+ 'You have been kicked because you '
+ 'are not a member and the room is now members-only.'
+ )
+ )
self.disconnect()
- def on_muc_shutdown(self):
+ def on_muc_shutdown(self) -> None:
"""We have been kicked because the MUC service is shutting down"""
self.add_message(
- '\x19%(info_col)s}You have been kicked because the'
- ' MUC service is shutting down.' %
- {'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)},
- typ=2)
+ MucOwnLeaveMessage(
+ 'You have been kicked because the'
+ ' MUC service is shutting down.'
+ )
+ )
self.disconnect()
- def on_user_join(self, from_nick, affiliation, show, status, role, jid,
- color):
+ def on_user_join(self, from_nick: str, affiliation: str, show: str, status: str, role: str, jid: JID,
+ color: str) -> None:
"""
When a new user joins the groupchat
"""
- deterministic = config.get_by_tabname('deterministic_nick_colors',
- self.jid.bare)
user = User(from_nick, affiliation, show, status, role, jid,
- deterministic, color)
+ color)
bisect.insort_left(self.users, user)
hide_exit_join = config.get_by_tabname('hide_exit_join',
self.general_jid)
@@ -662,7 +846,7 @@ class MucTab(ChatTab):
self.general_jid):
color = dump_tuple(user.color)
else:
- color = 3
+ color = "3"
theme = get_theme()
info_col = dump_tuple(theme.COLOR_INFORMATION_TEXT)
spec_col = dump_tuple(theme.COLOR_JOIN_CHAR)
@@ -688,13 +872,14 @@ class MucTab(ChatTab):
'jid_color': dump_tuple(theme.COLOR_MUC_JID),
'color_spec': spec_col,
}
- self.add_message(msg, typ=2)
+ self.add_message(PersistentInfoMessage(msg))
self.core.on_user_rejoined_private_conversation(self.jid.bare, from_nick)
- def on_user_nick_change(self, presence, user, from_nick, from_room):
- new_nick = presence.xml.find(
- '{%s}x/{%s}item' % (NS_MUC_USER, NS_MUC_USER)).attrib['nick']
- old_color = user.color
+ def on_user_nick_change(self, presence: Presence, user: User, from_nick: str) -> None:
+ new_nick = presence['muc']['item']['nick']
+ if not new_nick:
+ return # should not happen
+ old_color_tuple = user.color
if user.nick == self.own_nick:
self.own_nick = new_nick
# also change our nick in all private discussions of this room
@@ -702,58 +887,56 @@ class MucTab(ChatTab):
user.change_nick(new_nick)
else:
user.change_nick(new_nick)
- deterministic = config.get_by_tabname('deterministic_nick_colors',
- self.jid.bare)
- color = config.get_by_tabname(new_nick, 'muc_colors') or None
- if color or deterministic:
- user.change_color(color, deterministic)
+ color = config.getstr(new_nick, section='muc_colors') or None
+ user.change_color(color)
self.users.remove(user)
bisect.insort_left(self.users, user)
if config.get_by_tabname('display_user_color_in_join_part',
self.general_jid):
color = dump_tuple(user.color)
- old_color = dump_tuple(old_color)
+ old_color = dump_tuple(old_color_tuple)
else:
- old_color = color = 3
+ old_color = color = "3"
info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
self.add_message(
- '\x19%(old_color)s}%(old)s\x19%(info_col)s} is'
- ' now known as \x19%(color)s}%(new)s' % {
- 'old': from_nick,
- 'new': new_nick,
- 'color': color,
- 'old_color': old_color,
- 'info_col': info_col
- },
- typ=2)
+ PersistentInfoMessage(
+ '\x19%(old_color)s}%(old)s\x19%(info_col)s} is'
+ ' now known as \x19%(color)s}%(new)s' % {
+ 'old': from_nick,
+ 'new': new_nick,
+ 'color': color,
+ 'old_color': old_color,
+ 'info_col': info_col
+ },
+ )
+ )
# rename the private tabs if needed
self.core.rename_private_tabs(self.jid.bare, from_nick, user)
- def on_user_banned(self, presence, user, from_nick):
+ def on_user_banned(self, presence: Presence, user: User, from_nick: str) -> None:
"""
When someone is banned from a muc
"""
+ cls: Type[InfoMessage] = PersistentInfoMessage
self.users.remove(user)
- by = presence.xml.find('{%s}x/{%s}item/{%s}actor' %
- (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
- reason = presence.xml.find('{%s}x/{%s}item/{%s}reason' %
- (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
- if by:
- by = by.get('jid') or by.get('nick') or None
- else:
- by = None
+ by = presence['muc']['item'].get_plugin('actor', check=True)
+ reason = presence['muc']['item']['reason']
+ by_repr: Union[JID, str, None] = None
+ if by is not None:
+ by_repr = by['jid'] or by['nick'] or None
theme = get_theme()
info_col = dump_tuple(theme.COLOR_INFORMATION_TEXT)
char_kick = theme.CHAR_KICK
if from_nick == self.own_nick: # we are banned
+ cls = MucOwnLeaveMessage
if by:
kick_msg = ('\x191}%(spec)s \x193}You\x19%(info_col)s}'
' have been banned by \x194}%(by)s') % {
'spec': char_kick,
- 'by': by,
+ 'by': by_repr,
'info_col': info_col
}
else:
@@ -771,11 +954,11 @@ class MucTab(ChatTab):
self.general_jid)
delay = common.parse_str_to_secs(delay)
if delay <= 0:
- muc.join_groupchat(self.core, self.jid.bare, self.own_nick)
+ muc.join_groupchat(self.core, self.jid, self.own_nick)
else:
self.core.add_timed_event(
timed_events.DelayedEvent(delay, muc.join_groupchat,
- self.core, self.jid.bare,
+ self.core, self.jid,
self.own_nick))
else:
@@ -783,16 +966,16 @@ class MucTab(ChatTab):
self.general_jid):
color = dump_tuple(user.color)
else:
- color = 3
+ color = "3"
- if by:
+ if by_repr:
kick_msg = ('\x191}%(spec)s \x19%(color)s}'
'%(nick)s\x19%(info_col)s} '
'has been banned by \x194}%(by)s') % {
'spec': char_kick,
'nick': from_nick,
'color': color,
- 'by': by,
+ 'by': by_repr,
'info_col': info_col
}
else:
@@ -803,30 +986,30 @@ class MucTab(ChatTab):
'color': color,
'info_col': info_col
}
- if reason is not None and reason.text:
+ if reason:
kick_msg += ('\x19%(info_col)s} Reason: \x196}'
'%(reason)s\x19%(info_col)s}') % {
- 'reason': reason.text,
+ 'reason': reason,
'info_col': info_col
}
- self.add_message(kick_msg, typ=2)
+ self.add_message(cls(kick_msg))
- def on_user_kicked(self, presence, user, from_nick):
+ def on_user_kicked(self, presence: Presence, user: User, from_nick: str) -> None:
"""
When someone is kicked from a muc
"""
+ cls: Type[InfoMessage] = PersistentInfoMessage
self.users.remove(user)
- actor_elem = presence.xml.find('{%s}x/{%s}item/{%s}actor' %
- (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
- reason = presence.xml.find('{%s}x/{%s}item/{%s}reason' %
- (NS_MUC_USER, NS_MUC_USER, NS_MUC_USER))
+ actor_elem = presence['muc']['item'].get_plugin('actor', check=True)
+ reason = presence['muc']['item']['reason']
by = None
theme = get_theme()
info_col = dump_tuple(theme.COLOR_INFORMATION_TEXT)
char_kick = theme.CHAR_KICK
if actor_elem is not None:
- by = actor_elem.get('nick') or actor_elem.get('jid')
+ by = actor_elem['nick'] or actor_elem.get['jid'] or None
if from_nick == self.own_nick: # we are kicked
+ cls = MucOwnLeaveMessage
if by:
kick_msg = ('\x191}%(spec)s \x193}You\x19'
'%(info_col)s} have been kicked'
@@ -851,18 +1034,18 @@ class MucTab(ChatTab):
self.general_jid)
delay = common.parse_str_to_secs(delay)
if delay <= 0:
- muc.join_groupchat(self.core, self.jid.bare, self.own_nick)
+ muc.join_groupchat(self.core, self.jid, self.own_nick)
else:
self.core.add_timed_event(
timed_events.DelayedEvent(delay, muc.join_groupchat,
- self.core, self.jid.bare,
+ self.core, self.jid,
self.own_nick))
else:
if config.get_by_tabname('display_user_color_in_join_part',
self.general_jid):
color = dump_tuple(user.color)
else:
- color = 3
+ color = "3"
if by:
kick_msg = ('\x191}%(spec)s \x19%(color)s}%(nick)s'
'\x19%(info_col)s} has been kicked by '
@@ -881,13 +1064,13 @@ class MucTab(ChatTab):
'color': color,
'info_col': info_col
}
- if reason is not None and reason.text:
+ if reason:
kick_msg += ('\x19%(info_col)s} Reason: \x196}'
'%(reason)s') % {
- 'reason': reason.text,
+ 'reason': reason,
'info_col': info_col
}
- self.add_message(kick_msg, typ=2)
+ self.add_message(cls(kick_msg))
def on_user_leave_groupchat(self,
user: User,
@@ -895,7 +1078,7 @@ class MucTab(ChatTab):
status: str,
from_nick: str,
from_room: JID,
- server_initiated=False):
+ server_initiated: bool = False) -> None:
"""
When a user leaves a groupchat
"""
@@ -904,7 +1087,7 @@ class MucTab(ChatTab):
# We are now out of the room.
# Happens with some buggy (? not sure) servers
self.disconnect()
- self.core.disable_private_tabs(from_room)
+ self.core.disable_private_tabs(from_room.bare)
self.refresh_tab_win()
hide_exit_join = config.get_by_tabname('hide_exit_join',
@@ -915,7 +1098,7 @@ class MucTab(ChatTab):
self.general_jid):
color = dump_tuple(user.color)
else:
- color = 3
+ color = "3"
theme = get_theme()
info_col = dump_tuple(theme.COLOR_INFORMATION_TEXT)
spec_col = dump_tuple(theme.COLOR_QUIT_CHAR)
@@ -952,11 +1135,11 @@ class MucTab(ChatTab):
}
if status:
leave_msg += ' (\x19o%s\x19%s})' % (status, info_col)
- self.add_message(leave_msg, typ=2)
- self.core.on_user_left_private_conversation(from_room, user, status)
+ self.add_message(PersistentInfoMessage(leave_msg))
+ self.core.on_user_left_private_conversation(from_room.bare, user, status)
- def on_user_change_status(self, user, from_nick, from_room, affiliation,
- role, show, status):
+ def on_user_change_status(self, user: User, from_nick: str, from_room: str, affiliation: str,
+ role: str, show: str, status: str) -> None:
"""
When a user changes her status
"""
@@ -967,7 +1150,7 @@ class MucTab(ChatTab):
self.general_jid):
color = dump_tuple(user.color)
else:
- color = 3
+ color = "3"
info_col = dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
if from_nick == self.own_nick:
msg = '\x19%(color)s}You\x19%(info_col)s} changed: ' % {
@@ -1011,15 +1194,16 @@ class MucTab(ChatTab):
or show != user.show or status != user.status)) or (
affiliation != user.affiliation or role != user.role):
# display the message in the room
- self._text_buffer.add_message(msg)
+ self.add_message(InfoMessage(msg))
self.core.on_user_changed_status_in_private(
- '%s/%s' % (from_room, from_nick), Status(show, status))
+ JID('%s/%s' % (from_room, from_nick)), Status(show, status)
+ )
self.users.remove(user)
# finally, effectively change the user status
user.update(affiliation, show, status, role)
bisect.insort_left(self.users, user)
- def disconnect(self):
+ def disconnect(self) -> None:
"""
Set the state of the room as not joined, so
we can know if we can join it, send messages to it, etc
@@ -1031,23 +1215,13 @@ class MucTab(ChatTab):
self.joined = False
self.disable_self_ping_event()
- def get_single_line_topic(self):
+ def get_single_line_topic(self) -> str:
"""
Return the topic as a single-line string (for the window header)
"""
return self.topic.replace('\n', '|')
- def log_message(self, txt, nickname, time=None, typ=1):
- """
- Log the messages in the archives, if it needs
- to be
- """
- if time is None and self.joined: # don't log the history messages
- if not logger.log_message(self.jid.bare, nickname, txt, typ=typ):
- self.core.information('Unable to write in the log file',
- 'Error')
-
- def get_user_by_name(self, nick):
+ def get_user_by_name(self, nick: str) -> Optional[User]:
"""
Gets the user associated with the given nick, or None if not found
"""
@@ -1056,65 +1230,34 @@ class MucTab(ChatTab):
return user
return None
- def add_message(self, txt, time=None, nickname=None, **kwargs):
- """
- Note that user can be None even if nickname is not None. It happens
- when we receive an history message said by someone who is not
- in the room anymore
- Return True if the message highlighted us. False otherwise.
- """
-
+ def add_message(self, msg: BaseMessage) -> None:
+ """Add a message to the text buffer and set various tab status"""
# reset self-ping interval
if self.self_ping_event:
self.enable_self_ping_event()
-
- self.log_message(txt, nickname, time=time, typ=kwargs.get('typ', 1))
- args = dict()
- for key, value in kwargs.items():
- if key not in ('typ', 'forced_user'):
- args[key] = value
- if nickname is not None:
- user = self.get_user_by_name(nickname)
- else:
- user = None
-
- if user:
- user.set_last_talked(datetime.now())
- args['user'] = user
- if not user and kwargs.get('forced_user'):
- args['user'] = kwargs['forced_user']
-
- if (not time and nickname and nickname != self.own_nick
- and self.state != 'current'):
- if (self.state != 'highlight'
- and config.get_by_tabname('notify_messages', self.jid.bare)):
+ super().add_message(msg)
+ if not isinstance(msg, Message):
+ return
+ if msg.user:
+ msg.user.set_last_talked(msg.time)
+ if config.get_by_tabname('notify_messages', self.jid) and self.state != 'current':
+ if msg.nickname != self.own_nick and not msg.history:
self.state = 'message'
- if time and not txt.startswith('/me'):
- txt = '\x19%(info_col)s}%(txt)s' % {
- 'txt': txt,
- 'info_col': dump_tuple(get_theme().COLOR_LOG_MSG)
- }
- elif not nickname:
- txt = '\x19%(info_col)s}%(txt)s' % {
- 'txt': txt,
- 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- }
- elif not kwargs.get('highlight'): # TODO
- args['highlight'] = self.do_highlight(txt, time, nickname)
- time = time or datetime.now()
- self._text_buffer.add_message(txt, time, nickname, **args)
- return args.get('highlight', False)
+ if msg.txt and msg.nickname:
+ self.do_highlight(msg.txt, msg.nickname, msg.history)
def modify_message(self,
- txt,
- old_id,
- new_id,
- time=None,
- nickname=None,
- user=None,
- jid=None):
- self.log_message(txt, nickname, time=time, typ=1)
- highlight = self.do_highlight(txt, time, nickname, corrected=True)
+ txt: str,
+ old_id: str,
+ new_id: str,
+ time: Optional[datetime] = None,
+ delayed: bool = False,
+ nickname: Optional[str] = None,
+ user: Optional[User] = None,
+ jid: Optional[JID] = None) -> bool:
+ highlight = self.message_is_highlight(
+ txt, nickname, delayed, corrected=True
+ )
message = self._text_buffer.modify_message(
txt,
old_id,
@@ -1124,14 +1267,15 @@ class MucTab(ChatTab):
user=user,
jid=jid)
if message:
- self.text_win.modify_message(old_id, message)
+ self.log_message(message)
+ self.text_win.modify_message(message.identifier, message)
return highlight
return False
- def matching_names(self):
- return [(1, self.jid.user), (3, self.jid.full)]
+ def matching_names(self) -> List[Tuple[int, str]]:
+ return [(1, self.jid.node), (3, self.jid.full)]
- def enable_self_ping_event(self):
+ def enable_self_ping_event(self) -> None:
delay = config.get_by_tabname(
"self_ping_delay", self.general_jid, default=0)
interval = int(
@@ -1144,22 +1288,25 @@ class MucTab(ChatTab):
interval, self.send_self_ping)
self.core.add_timed_event(self.self_ping_event)
- def disable_self_ping_event(self):
+ def disable_self_ping_event(self) -> None:
if self.self_ping_event is not None:
self.core.remove_timed_event(self.self_ping_event)
self.self_ping_event = None
- def send_self_ping(self):
- timeout = config.get_by_tabname(
- "self_ping_timeout", self.general_jid, default=60)
- to = self.jid.bare + "/" + self.own_nick
- self.core.xmpp.plugin['xep_0199'].send_ping(
- jid=to,
- callback=self.on_self_ping_result,
- timeout_callback=self.on_self_ping_failed,
- timeout=timeout)
-
- def on_self_ping_result(self, iq):
+ def send_self_ping(self) -> None:
+ if self.core.xmpp.is_connected():
+ timeout = config.get_by_tabname(
+ "self_ping_timeout", self.general_jid, default=60)
+ to = self.jid.bare + "/" + self.own_nick
+ self.core.xmpp.plugin['xep_0199'].send_ping(
+ jid=JID(to),
+ callback=self.on_self_ping_result,
+ timeout_callback=self.on_self_ping_failed,
+ timeout=timeout)
+ else:
+ self.enable_self_ping_event()
+
+ def on_self_ping_result(self, iq: Iq) -> None:
if iq["type"] == "error" and iq["error"]["condition"] not in \
("feature-not-implemented", "service-unavailable", "item-not-found"):
self.command_cycle(iq["error"]["text"] or "not in this room")
@@ -1168,38 +1315,40 @@ class MucTab(ChatTab):
self.reset_lag()
self.enable_self_ping_event()
- def search_for_color(self, nick):
+ def search_for_color(self, nick: str) -> str:
"""
Search for the color of a nick in the config file.
Also, look at the colors of its possible aliases if nick_color_aliases
is set.
"""
- color = config.get_by_tabname(nick, 'muc_colors')
+ color = config.getstr(nick, section='muc_colors')
if color != '':
return color
nick_color_aliases = config.get_by_tabname('nick_color_aliases',
- self.jid.bare)
+ self.jid)
if nick_color_aliases:
nick_alias = re.sub('^_*(.*?)_*$', '\\1', nick)
- color = config.get_by_tabname(nick_alias, 'muc_colors')
+ color = config.getstr(nick_alias, section='muc_colors')
return color
- def on_self_ping_failed(self, iq):
+ def on_self_ping_failed(self, iq: Any = None) -> None:
if not self.lagged:
self.lagged = True
- info_text = dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
self._text_buffer.add_message(
- "\x19%s}MUC service not responding." % info_text)
+ InfoMessage(
+ "MUC service not responding."
+ ),
+ )
self._state = 'disconnected'
self.core.refresh_window()
self.enable_self_ping_event()
- def reset_lag(self):
+ def reset_lag(self) -> None:
if self.lagged:
self.lagged = False
- info_text = dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- self._text_buffer.add_message(
- "\x19%s}MUC service is responding again." % info_text)
+ self.add_message(
+ InfoMessage("MUC service is responding again.")
+ )
if self != self.core.tabs.current_tab:
self._state = 'joined'
else:
@@ -1209,35 +1358,35 @@ class MucTab(ChatTab):
########################## UI ONLY #####################################
@refresh_wrapper.always
- def go_to_next_hl(self):
+ def go_to_next_hl(self) -> None:
"""
Go to the next HL in the room, or the last
"""
self.text_win.next_highlight()
@refresh_wrapper.always
- def go_to_prev_hl(self):
+ def go_to_prev_hl(self) -> None:
"""
Go to the previous HL in the room, or the first
"""
self.text_win.previous_highlight()
@refresh_wrapper.always
- def scroll_user_list_up(self):
+ def scroll_user_list_up(self) -> None:
"Scroll up in the userlist"
self.user_win.scroll_up()
@refresh_wrapper.always
- def scroll_user_list_down(self):
+ def scroll_user_list_down(self) -> None:
"Scroll down in the userlist"
self.user_win.scroll_down()
- def resize(self):
+ def resize(self) -> None:
"""
Resize the whole window. i.e. all its sub-windows
"""
self.need_resize = False
- if config.get('hide_user_list') or self.size.tab_degrade_x:
+ if config.getbool('hide_user_list') or self.size.tab_degrade_x:
text_width = self.width
else:
text_width = (self.width // 10) * 9
@@ -1261,18 +1410,18 @@ class MucTab(ChatTab):
self.text_win.resize(
self.height - 3 - info_win_height - tab_win_height, text_width, 1,
- 0)
- self.text_win.rebuild_everything(self._text_buffer)
+ 0, self._text_buffer, force=self.ui_config_changed)
+ self.ui_config_changed = False
self.info_header.resize(
1, self.width, self.height - 2 - info_win_height - tab_win_height,
0)
self.input.resize(1, self.width, self.height - 1, 0)
- def refresh(self):
+ def refresh(self) -> None:
if self.need_resize:
self.resize()
log.debug(' TAB Refresh: %s', self.__class__.__name__)
- if config.get('hide_user_list') or self.size.tab_degrade_x:
+ if config.getbool('hide_user_list') or self.size.tab_degrade_x:
display_user_list = False
else:
display_user_list = True
@@ -1291,10 +1440,10 @@ class MucTab(ChatTab):
self.info_win.refresh()
self.input.refresh()
- def on_info_win_size_changed(self):
+ def on_info_win_size_changed(self) -> None:
if self.core.information_win_size >= self.height - 3:
return
- if config.get("hide_user_list"):
+ if config.getbool("hide_user_list"):
text_width = self.width
else:
text_width = (self.width // 10) * 9
@@ -1307,7 +1456,7 @@ class MucTab(ChatTab):
Tab.tab_win_height(), 1, 1, 9 * (self.width // 10))
self.text_win.resize(
self.height - 3 - self.core.information_win_size -
- Tab.tab_win_height(), text_width, 1, 0)
+ Tab.tab_win_height(), text_width, 1, 0, self._text_buffer)
self.info_header.resize(
1, self.width, self.height - 2 - self.core.information_win_size -
Tab.tab_win_height(), 0)
@@ -1315,37 +1464,42 @@ class MucTab(ChatTab):
# This maxsize is kinda arbitrary, but most users won’t have that many
# nicknames anyway.
@functools.lru_cache(maxsize=8)
- def build_highlight_regex(self, nickname):
+ def build_highlight_regex(self, nickname: str) -> Pattern:
return re.compile(r"(^|\W)" + re.escape(nickname) + r"(\W|$)", re.I)
- def is_highlight(self, txt, time, nickname, own_nick, highlight_on,
- corrected=False):
+ def message_is_highlight(self, txt: str, nickname: Optional[str], history: bool,
+ corrected: bool = False) -> bool:
+ """Highlight algorithm for MUC tabs"""
+ # Don't highlight on info message or our own messages
+ if not nickname or nickname == self.own_nick:
+ return False
+ highlight_on = config.get_by_tabname(
+ 'highlight_on',
+ self.general_jid,
+ ).split(':')
highlighted = False
- if (not time or corrected) and nickname and nickname != own_nick:
- if self.build_highlight_regex(own_nick).search(txt):
+ if not history:
+ if self.build_highlight_regex(self.own_nick).search(txt):
highlighted = True
else:
- highlight_words = highlight_on.split(':')
- for word in highlight_words:
+ for word in highlight_on:
if word and word.lower() in txt.lower():
highlighted = True
break
return highlighted
- def do_highlight(self, txt, time, nickname, corrected=False):
- """
- Set the tab color and returns the nick color
- """
- own_nick = self.own_nick
- highlight_on = config.get_by_tabname('highlight_on', self.general_jid)
- highlighted = self.is_highlight(txt, time, nickname, own_nick,
- highlight_on, corrected)
- if highlighted and self.joined:
+ def do_highlight(self, txt: str, nickname: str, history: bool,
+ corrected: bool = False) -> bool:
+ """Set the tab color and returns the highlight state"""
+ highlighted = self.message_is_highlight(
+ txt, nickname, history, corrected
+ )
+ if highlighted and self.joined and not corrected:
if self.state != 'current':
self.state = 'highlight'
- beep_on = config.get('beep_on').split()
+ beep_on = config.getstr('beep_on').split()
if 'highlight' in beep_on and 'message' not in beep_on:
- if not config.get_by_tabname('disable_beep', self.jid.bare):
+ if not config.get_by_tabname('disable_beep', self.jid):
curses.beep()
return True
return False
@@ -1353,56 +1507,57 @@ class MucTab(ChatTab):
########################## COMMANDS ####################################
@command_args_parser.quoted(1, 1, [''])
- def command_invite(self, args):
+ async def command_invite(self, args: List[str]) -> None:
"""/invite <jid> [reason]"""
if args is None:
- return self.core.command.help('invite')
+ self.core.command.help('invite')
+ return
jid, reason = args
- self.core.command.invite('%s %s "%s"' % (jid, self.jid.bare, reason))
+ await self.core.command.invite('%s %s "%s"' % (jid, self.jid, reason))
@command_args_parser.quoted(1)
- def command_info(self, args):
+ def command_info(self, args: List[str]) -> None:
"""
/info <nick>
"""
if args is None:
- return self.core.command.help('info')
+ self.core.command.help('info')
+ return
nick = args[0]
if not self.print_info(nick):
self.core.information("Unknown user: %s" % nick, "Error")
@command_args_parser.quoted(0)
- def command_configure(self, ignored):
+ async def command_configure(self, ignored: Any) -> None:
"""
/configure
"""
- def on_form_received(form):
- if not form:
- self.core.information(
- 'Could not retrieve the configuration form', 'Error')
- return
+ try:
+ form = await self.core.xmpp.plugin['xep_0045'].get_room_config(
+ self.jid
+ )
self.core.open_new_form(form, self.cancel_config, self.send_config)
-
- fixes.get_room_form(self.core.xmpp, self.jid.bare, on_form_received)
+ except (IqError, IqTimeout, ValueError):
+ self.core.information(
+ 'Could not retrieve the configuration form', 'Error')
@command_args_parser.raw
- def command_cycle(self, msg):
+ def command_cycle(self, msg: str) -> None:
"""/cycle [reason]"""
self.leave_room(msg)
self.join()
- @command_args_parser.quoted(0, 1, [''])
- def command_recolor(self, args):
+ @command_args_parser.ignored
+ def command_recolor(self) -> None:
"""
/recolor [random]
Re-assigns color to the participants of the room
"""
- random_colors = args[0] == 'random'
- self.recolor(random_colors)
+ self.recolor()
@command_args_parser.quoted(2, 2, [''])
- def command_color(self, args):
+ def command_color(self, args: List[str]) -> None:
"""
/color <nick> <color>
Fix a color for a nick.
@@ -1410,58 +1565,71 @@ class MucTab(ChatTab):
User "random" to attribute a random color.
"""
if args is None:
- return self.core.command.help('color')
+ self.core.command.help('color')
+ return
nick = args[0]
color = args[1].lower()
if nick == self.own_nick:
- return self.core.information(
+ self.core.information(
"You cannot change the color of your"
- " own nick.", 'Error')
+ " own nick.", 'Error'
+ )
elif color not in xhtml.colors and color not in ('unset', 'random'):
- return self.core.information("Unknown color: %s" % color, 'Error')
- self.set_nick_color(nick, color)
+ self.core.information("Unknown color: %s" % color, 'Error')
+ else:
+ self.set_nick_color(nick, color)
@command_args_parser.quoted(1)
- def command_version(self, args):
+ async def command_version(self, args: List[str]) -> None:
"""
/version <jid or nick>
"""
if args is None:
- return self.core.command.help('version')
+ self.core.command.help('version')
+ return
nick = args[0]
try:
- if nick in [user.nick for user in self.users]:
+ if nick in {user.nick for user in self.users}:
jid = copy(self.jid)
jid.resource = nick
else:
jid = JID(nick)
except InvalidJID:
- return self.core.information('Invalid jid or nick %r' % nick, 'Error')
- self.core.xmpp.plugin['xep_0092'].get_version(
- jid, callback=self.core.handler.on_version_result)
+ self.core.information('Invalid jid or nick %r' % nick, 'Error')
+ return
+ iq = await self.core.xmpp.plugin['xep_0092'].get_version(jid)
+ self.core.handler.on_version_result(iq)
@command_args_parser.quoted(1)
- def command_nick(self, args):
+ def command_nick(self, args: List[str]) -> None:
"""
/nick <nickname>
"""
if args is None:
- return self.core.command.help('nick')
+ self.core.command.help('nick')
+ return
nick = args[0]
if not self.joined:
- return self.core.information('/nick only works in joined rooms',
+ self.core.information('/nick only works in joined rooms',
'Info')
+ return
current_status = self.core.get_status()
try:
target_jid = copy(self.jid)
target_jid.resource = nick
except InvalidJID:
- return self.core.information('Invalid nick', 'Info')
- muc.change_nick(self.core, self.jid.bare, nick, current_status.message,
- current_status.show)
+ self.core.information('Invalid nick', 'Info')
+ return
+ muc.change_nick(
+ self.core,
+ self.jid,
+ nick,
+ current_status.message,
+ current_status.show,
+ )
@command_args_parser.quoted(0, 1, [''])
- def command_part(self, args):
+ def command_part(self, args: List[str]) -> None:
"""
/part [msg]
"""
@@ -1472,24 +1640,41 @@ class MucTab(ChatTab):
self.core.doupdate()
@command_args_parser.raw
- def command_close(self, msg):
+ def command_leave(self, msg: str) -> None:
+ """
+ /leave [msg]
+ """
+ self.command_close(msg)
+
+ @command_args_parser.raw
+ def command_close(self, msg: str) -> None:
"""
/close [msg]
"""
self.leave_room(msg)
+ if config.getbool('synchronise_open_rooms'):
+ if self.jid in self.core.bookmarks:
+ bookmark = self.core.bookmarks[self.jid]
+ if bookmark:
+ bookmark.autojoin = False
+ asyncio.create_task(
+ self.core.bookmarks.save(self.core.xmpp)
+ )
self.core.close_tab(self)
- def on_close(self):
+ def on_close(self) -> None:
super().on_close()
- self.leave_room('')
+ if self.joined:
+ self.leave_room('')
@command_args_parser.quoted(1, 1)
- def command_query(self, args):
+ def command_query(self, args: List[str]) -> None:
"""
/query <nick> [message]
"""
if args is None:
- return self.core.command.help('query')
+ self.core.command.help('query')
+ return
nick = args[0]
r = None
for user in self.users:
@@ -1497,13 +1682,16 @@ class MucTab(ChatTab):
r = self.core.open_private_window(self.jid.bare, user.nick)
if r and len(args) == 2:
msg = args[1]
- self.core.tabs.current_tab.command_say(
- xhtml.convert_simple_to_full_colors(msg))
+ asyncio.ensure_future(
+ r.command_say(
+ xhtml.convert_simple_to_full_colors(msg)
+ )
+ )
if not r:
self.core.information("Cannot find user: %s" % nick, 'Error')
@command_args_parser.raw
- def command_topic(self, subject):
+ def command_topic(self, subject: str) -> None:
"""
/topic [new topic]
"""
@@ -1513,7 +1701,7 @@ class MucTab(ChatTab):
self.change_topic(subject)
@command_args_parser.quoted(0)
- def command_names(self, args):
+ def command_names(self, args: Any) -> None:
"""
/names
"""
@@ -1546,79 +1734,137 @@ class MucTab(ChatTab):
buff.append('\n')
message = ' '.join(buff)
- self._text_buffer.add_message(message)
+ self.add_message(InfoMessage(message))
self.text_win.refresh()
self.input.refresh()
@command_args_parser.quoted(1, 1)
- def command_kick(self, args):
+ async def command_kick(self, args: List[str]) -> None:
"""
/kick <nick> [reason]
"""
if args is None:
- return self.core.command.help('kick')
+ self.core.command.help('kick')
+ return
if len(args) == 2:
reason = args[1]
else:
reason = ''
nick = args[0]
- self.change_role(nick, 'none', reason)
+ await self.change_role(nick, 'none', reason)
@command_args_parser.quoted(1, 1)
- def command_ban(self, args):
+ async def command_ban(self, args: List[str]) -> None:
"""
/ban <nick> [reason]
"""
if args is None:
- return self.core.command.help('ban')
+ self.core.command.help('ban')
+ return
nick = args[0]
msg = args[1] if len(args) == 2 else ''
- self.change_affiliation(nick, 'outcast', msg)
+ await self.change_affiliation(nick, 'outcast', msg)
@command_args_parser.quoted(2, 1, [''])
- def command_role(self, args):
+ async def command_role(self, args: List[str]) -> None:
"""
/role <nick> <role> [reason]
Changes the role of a user
roles can be: none, visitor, participant, moderator
"""
-
- def callback(iq):
- if iq['type'] == 'error':
- self.core.room_error(iq, self.jid.bare)
-
if args is None:
- return self.core.command.help('role')
+ self.core.command.help('role')
+ return
nick, role, reason = args[0], args[1].lower(), args[2]
- self.change_role(nick, role, reason)
+ try:
+ await self.change_role(nick, role, reason)
+ except IqError as iq:
+ self.core.room_error(iq, self.jid.bare)
- @command_args_parser.quoted(2)
- def command_affiliation(self, args):
+ @command_args_parser.quoted(0, 2)
+ async def command_affiliation(self, args: List[str]) -> None:
"""
- /affiliation <nick or jid> <affiliation>
+ /affiliation [<nick or jid> <affiliation>]
Changes the affiliation of a user
affiliations can be: outcast, none, member, admin, owner
"""
- def callback(iq):
- if iq['type'] == 'error':
- self.core.room_error(iq, self.jid.bare)
+ room = JID(self.name)
+ if not room:
+ self.core.information('affiliation: requires a valid chat address', 'Error')
+ return
- if args is None:
- return self.core.command.help('affiliation')
+ # List affiliations
+ if not args:
+ await self.get_users_affiliations(room)
+ return None
+
+ if len(args) != 2:
+ self.core.command.help('affiliation')
+ return
nick, affiliation = args[0], args[1].lower()
- self.change_affiliation(nick, affiliation)
+ # Set affiliation
+ await self.change_affiliation(nick, affiliation)
+
+ async def get_users_affiliations(self, jid: JID) -> None:
+ owners, admins, members, outcasts = await asyncio.gather(
+ self.core.xmpp['xep_0045'].get_affiliation_list(jid, 'owner'),
+ self.core.xmpp['xep_0045'].get_affiliation_list(jid, 'admin'),
+ self.core.xmpp['xep_0045'].get_affiliation_list(jid, 'member'),
+ self.core.xmpp['xep_0045'].get_affiliation_list(jid, 'outcast'),
+ return_exceptions=True,
+ )
+
+ all_errors = functools.reduce(
+ lambda acc, iq: acc and isinstance(iq, (IqError, IqTimeout)),
+ (owners, admins, members, outcasts),
+ True,
+ )
+ if all_errors:
+ self.core.information(
+ 'Can’t access affiliations for %s' % jid.bare,
+ 'Error',
+ )
+ return None
+
+ theme = get_theme()
+ aff_colors = {
+ 'owner': theme.CHAR_AFFILIATION_OWNER,
+ 'admin': theme.CHAR_AFFILIATION_ADMIN,
+ 'member': theme.CHAR_AFFILIATION_MEMBER,
+ 'outcast': theme.CHAR_AFFILIATION_OUTCAST,
+ }
+
+
+
+ lines = ['Affiliations for %s' % jid.bare]
+ affiliation_dict = {
+ 'owner': owners,
+ 'admin': admins,
+ 'member': members,
+ 'outcast': outcasts,
+ }
+ for affiliation, items in affiliation_dict.items():
+ if isinstance(items, BaseException) or not items:
+ continue
+ aff_char = aff_colors[affiliation]
+ lines.append(' %s%s' % (aff_char, affiliation.capitalize()))
+ for ajid in sorted(items):
+ lines.append(' %s' % ajid)
+
+ self.core.information('\n'.join(lines), 'Info')
+ return None
@command_args_parser.raw
- def command_say(self, line, correct=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
"""
/say <message>
Or normal input + enter
"""
- needed = 'inactive' if self.inactive else 'active'
- msg = self.core.xmpp.make_message(self.jid.bare)
+ chatstate = 'inactive' if self.inactive else 'active'
+ msg: SMessage = self.core.xmpp.make_message(self.jid)
msg['type'] = 'groupchat'
msg['body'] = line
# trigger the event BEFORE looking for colors.
@@ -1635,9 +1881,12 @@ class MucTab(ChatTab):
msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body'])
msg['body'] = xhtml.clean_text(msg['body'])
if config.get_by_tabname('send_chat_states', self.general_jid):
- msg['chat_state'] = needed
+ if chatstate == 'inactive':
+ self.send_chat_state(chatstate, always_send=True)
+ else:
+ msg['chat_state'] = chatstate
if correct:
- msg['replace']['id'] = self.last_sent_message['id']
+ msg['replace']['id'] = self.last_sent_message['id'] # type: ignore
self.cancel_paused_delay()
self.core.events.trigger('muc_say_after', msg, self)
if not msg['body']:
@@ -1645,24 +1894,26 @@ class MucTab(ChatTab):
self.text_win.refresh()
self.input.refresh()
return
- self.last_sent_message = msg
+ # TODO: #3314. Display outgoing MUC message.
+ self.set_last_sent_message(msg, correct=correct)
msg.send()
- self.chat_state = needed
+ self.chat_state = chatstate
@command_args_parser.raw
- def command_xhtml(self, msg):
+ def command_xhtml(self, msg: str) -> None:
message = self.generate_xhtml_message(msg)
if message:
message['type'] = 'groupchat'
message.send()
@command_args_parser.quoted(1)
- def command_ignore(self, args):
+ def command_ignore(self, args: List[str]) -> None:
"""
/ignore <nick>
"""
if args is None:
- return self.core.command.help('ignore')
+ self.core.command.help('ignore')
+ return
nick = args[0]
user = self.get_user_by_name(nick)
@@ -1675,12 +1926,13 @@ class MucTab(ChatTab):
self.core.information("%s is now ignored" % nick, 'info')
@command_args_parser.quoted(1)
- def command_unignore(self, args):
+ def command_unignore(self, args: List[str]) -> None:
"""
/unignore <nick>
"""
if args is None:
- return self.core.command.help('unignore')
+ self.core.command.help('unignore')
+ return
nick = args[0]
user = self.get_user_by_name(nick)
@@ -1692,9 +1944,33 @@ class MucTab(ChatTab):
self.ignores.remove(user)
self.core.information('%s is now unignored' % nick)
+ @command_args_parser.quoted(0, 1)
+ def command_request_voice(self, args: List[str]) -> None:
+ """
+ /request_voice [role]
+ Request voice in a moderated room
+ role can be: participant, moderator
+ """
+
+ room = JID(self.name)
+ if not room:
+ self.core.information('request_voice: requires a valid chat address', 'Error')
+ return
+
+ if len(args) > 1:
+ self.core.command.help('request_voice')
+ return
+
+ if args:
+ role = args[0]
+ else:
+ role = 'participant'
+
+ self.core.xmpp['xep_0045'].request_voice(room, role)
+
########################## COMPLETIONS #################################
- def completion(self):
+ def completion(self) -> None:
"""
Called when Tab is pressed, complete the nickname in the input
"""
@@ -1707,14 +1983,15 @@ class MucTab(ChatTab):
for user in sorted(self.users, key=COMPARE_USERS_LAST_TALKED, reverse=True):
if user.nick != self.own_nick:
word_list.append(user.nick)
- after = config.get('after_completion') + ' '
+ after = config.getstr('after_completion') + ' '
input_pos = self.input.pos
- if ' ' not in self.input.get_text()[:input_pos] or (
+ text_before = self.input.get_text()[:input_pos]
+ if (' ' not in text_before and '\n' not in text_before) or (
self.input.last_completion and self.input.get_text()
[:input_pos] == self.input.last_completion + after):
add_after = after
else:
- if not config.get('add_space_after_completion'):
+ if not config.getbool('add_space_after_completion'):
add_after = ''
else:
add_after = ' '
@@ -1725,7 +2002,7 @@ class MucTab(ChatTab):
and not self.input.get_text().startswith('//'))
self.send_composing_chat_state(empty_after)
- def completion_version(self, the_input):
+ def completion_version(self, the_input: windows.MessageInput) -> Completion:
"""Completion for /version"""
userlist = []
for user in sorted(self.users, key=COMPARE_USERS_LAST_TALKED, reverse=True):
@@ -1740,30 +2017,30 @@ class MucTab(ChatTab):
return Completion(the_input.auto_completion, userlist, quotify=False)
- def completion_info(self, the_input):
+ def completion_info(self, the_input: windows.MessageInput) -> Completion:
"""Completion for /info"""
userlist = []
for user in sorted(self.users, key=COMPARE_USERS_LAST_TALKED, reverse=True):
userlist.append(user.nick)
return Completion(the_input.auto_completion, userlist, quotify=False)
- def completion_nick(self, the_input):
+ def completion_nick(self, the_input: windows.MessageInput) -> Completion:
"""Completion for /nick"""
- nicks = [
+ nicks_list = [
os.environ.get('USER'),
- config.get('default_nick'),
+ config.getstr('default_nick'),
self.core.get_bookmark_nickname(self.jid.bare)
]
- nicks = [i for i in nicks if i]
+ nicks = [i for i in nicks_list if i]
return Completion(the_input.auto_completion, nicks, '', quotify=False)
- def completion_recolor(self, the_input):
+ def completion_recolor(self, the_input: windows.MessageInput) -> Optional[Completion]:
if the_input.get_argument_position() == 1:
return Completion(
the_input.new_completion, ['random'], 1, '', quotify=False)
- return True
+ return None
- def completion_color(self, the_input):
+ def completion_color(self, the_input: windows.MessageInput) -> Optional[Completion]:
"""Completion for /color"""
n = the_input.get_argument_position(quoted=True)
if n == 1:
@@ -1779,8 +2056,9 @@ class MucTab(ChatTab):
colors.append('random')
return Completion(
the_input.new_completion, colors, 2, '', quotify=False)
+ return None
- def completion_ignore(self, the_input):
+ def completion_ignore(self, the_input: windows.MessageInput) -> Completion:
"""Completion for /ignore"""
userlist = [user.nick for user in self.users]
if self.own_nick in userlist:
@@ -1788,7 +2066,7 @@ class MucTab(ChatTab):
userlist.sort()
return Completion(the_input.auto_completion, userlist, quotify=False)
- def completion_role(self, the_input):
+ def completion_role(self, the_input: windows.MessageInput) -> Optional[Completion]:
"""Completion for /role"""
n = the_input.get_argument_position(quoted=True)
if n == 1:
@@ -1801,8 +2079,9 @@ class MucTab(ChatTab):
possible_roles = ['none', 'visitor', 'participant', 'moderator']
return Completion(
the_input.new_completion, possible_roles, 2, '', quotify=True)
+ return None
- def completion_affiliation(self, the_input):
+ def completion_affiliation(self, the_input: windows.MessageInput) -> Optional[Completion]:
"""Completion for /affiliation"""
n = the_input.get_argument_position(quoted=True)
if n == 1:
@@ -1825,20 +2104,26 @@ class MucTab(ChatTab):
2,
'',
quotify=True)
+ return None
- def completion_invite(self, the_input):
+ def completion_invite(self, the_input: windows.MessageInput) -> Optional[Completion]:
"""Completion for /invite"""
n = the_input.get_argument_position(quoted=True)
if n == 1:
return Completion(
- the_input.new_completion, roster.jids(), 1, quotify=True)
+ the_input.new_completion,
+ [str(i) for i in roster.jids()],
+ argument_position=1,
+ quotify=True)
+ return None
- def completion_topic(self, the_input):
+ def completion_topic(self, the_input: windows.MessageInput) -> Optional[Completion]:
if the_input.get_argument_position() == 1:
return Completion(
the_input.auto_completion, [self.topic], '', quotify=False)
+ return None
- def completion_quoted(self, the_input):
+ def completion_quoted(self, the_input: windows.MessageInput) -> Optional[Completion]:
"""Nick completion, but with quotes"""
if the_input.get_argument_position(quoted=True) == 1:
word_list = []
@@ -1848,16 +2133,23 @@ class MucTab(ChatTab):
return Completion(
the_input.new_completion, word_list, 1, quotify=True)
+ return None
- def completion_unignore(self, the_input):
+ def completion_unignore(self, the_input: windows.MessageInput) -> Optional[Completion]:
if the_input.get_argument_position() == 1:
users = [user.nick for user in self.ignores]
return Completion(the_input.auto_completion, users, quotify=False)
+ return None
+
+ def completion_request_voice(self, the_input: windows.MessageInput) -> Optional[Completion]:
+ """Completion for /request_voice"""
+ allowed = ['participant', 'moderator']
+ return Completion(the_input.auto_completion, allowed, quotify=False)
########################## REGISTER STUFF ##############################
- def register_keys(self):
+ def register_keys(self) -> None:
"Register tab-specific keys"
self.key_func['^I'] = self.completion
self.key_func['M-u'] = self.scroll_user_list_down
@@ -1865,7 +2157,7 @@ class MucTab(ChatTab):
self.key_func['M-n'] = self.go_to_next_hl
self.key_func['M-p'] = self.go_to_prev_hl
- def register_commands(self):
+ def register_commands(self) -> None:
"Register tab-specific commands"
self.register_commands_batch([{
'name': 'ignore',
@@ -1933,7 +2225,7 @@ class MucTab(ChatTab):
'func':
self.command_affiliation,
'usage':
- '<nick or jid> <affiliation>',
+ '[<nick or jid> [<affiliation>]]',
'desc': ('Set the affiliation of a user. Affiliations can be:'
' outcast, none, member, admin, owner.'),
'shortdesc':
@@ -1993,15 +2285,23 @@ class MucTab(ChatTab):
'shortdesc':
'Leave the room.'
}, {
+ 'name': 'leave',
+ 'func': self.command_leave,
+ 'usage': '[message]',
+ 'desc': 'Deprecated alias for /close',
+ 'shortdesc': 'Leave the room.'
+ }, {
'name':
'close',
'func':
self.command_close,
'usage':
'[message]',
- 'desc': ('Disconnect from a room and close the tab.'
- ' You can specify an optional message if '
- 'you are still connected.'),
+ 'desc': ('Disconnect from a room and close the tab. '
+ 'You can specify an optional message if '
+ 'you are still connected. If synchronise_open_tabs '
+ 'is true, also disconnect you from your other '
+ 'clients.'),
'shortdesc':
'Close the tab.'
}, {
@@ -2023,12 +2323,11 @@ class MucTab(ChatTab):
'func':
self.command_recolor,
'usage':
- '[random]',
- 'desc': ('Re-assign a color to all participants of the'
- ' current room, based on the last time they talked.'
- ' Use this if the participants currently talking '
- 'have too many identical colors. Use /recolor random'
- ' for a non-deterministic result.'),
+ '',
+ 'desc': (
+ 'Re-assign a color to all participants of the room '
+ 'if the theme has changed.'
+ ),
'shortdesc':
'Change the nicks colors.',
'completion':
@@ -2045,7 +2344,7 @@ class MucTab(ChatTab):
'shortdesc':
'Fix a color for a nick.',
'completion':
- self.completion_recolor
+ self.completion_color
}, {
'name':
'cycle',
@@ -2116,6 +2415,19 @@ class MucTab(ChatTab):
'Invite a contact to this room',
'completion':
self.completion_invite
+ }, {
+ 'name':
+ 'request_voice',
+ 'func':
+ self.command_request_voice,
+ 'desc':
+ 'Request voice when we are a visitor in a moderated room',
+ 'usage':
+ '[role]',
+ 'shortdesc':
+ 'Request voice in a moderated room',
+ 'completion':
+ self.completion_request_voice
}])
@@ -2123,7 +2435,7 @@ class PresenceError(Exception):
pass
-def dissect_presence(presence):
+def dissect_presence(presence: Presence) -> Tuple[str, str, str, str, str, str, JID, str]:
"""
Extract relevant information from a presence
"""
diff --git a/poezio/tabs/privatetab.py b/poezio/tabs/privatetab.py
index b4a64ba8..1909e3c1 100644
--- a/poezio/tabs/privatetab.py
+++ b/poezio/tabs/privatetab.py
@@ -10,23 +10,30 @@ both participant’s nicks. It also has slightly different features than
the ConversationTab (such as tab-completion on nicks from the room).
"""
+import asyncio
import curses
import logging
+from datetime import datetime
from typing import Dict, Callable
from slixmpp import JID
+from slixmpp.stanza import Message as SMessage
from poezio.tabs import OneToOneTab, MucTab, Tab
+from poezio import common
from poezio import windows
from poezio import xhtml
-from poezio.common import safeJID
-from poezio.config import config
+from poezio.config import config, get_image_cache
from poezio.core.structs import Command
from poezio.decorators import refresh_wrapper
-from poezio.logger import logger
from poezio.theming import get_theme, dump_tuple
from poezio.decorators import command_args_parser
+from poezio.text_buffer import CorrectionError
+from poezio.ui.types import (
+ Message,
+ PersistentInfoMessage,
+)
log = logging.getLogger(__name__)
@@ -35,16 +42,14 @@ class PrivateTab(OneToOneTab):
"""
The tab containing a private conversation (someone from a MUC)
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
message_type = 'chat'
- additional_information = {} # type: Dict[str, Callable[[str], str]]
+ additional_information: Dict[str, Callable[[str], str]] = {}
- def __init__(self, core, jid, nick):
- OneToOneTab.__init__(self, core, jid)
+ def __init__(self, core, jid, nick, initial=None):
+ OneToOneTab.__init__(self, core, jid, initial)
self.own_nick = nick
- self.text_win = windows.TextWin()
- self._text_buffer.add_window(self.text_win)
self.info_header = windows.PrivateInfoWin()
self.input = windows.MessageInput()
# keys
@@ -68,6 +73,11 @@ class PrivateTab(OneToOneTab):
self.update_commands()
self.update_keys()
+ @property
+ def log_name(self) -> str:
+ """Overriden from ChatTab because this is a case where we want the full JID"""
+ return self.jid.full
+
def remote_user_color(self):
user = self.parent_muc.get_user_by_name(self.jid.resource)
if user:
@@ -75,20 +85,20 @@ class PrivateTab(OneToOneTab):
return super().remote_user_color()
@property
- def general_jid(self):
+ def general_jid(self) -> JID:
return self.jid
- def get_dest_jid(self):
+ def get_dest_jid(self) -> JID:
return self.jid
@property
- def nick(self):
+ def nick(self) -> str:
return self.get_nick()
def ack_message(self, msg_id: str, msg_jid: JID):
- # special case when talking to oneself
- if msg_jid == self.core.xmpp.boundjid:
- msg_jid = self.jid.full
+ if JID(msg_jid).bare == self.core.xmpp.boundjid.bare:
+ msg_jid = JID(self.jid.bare)
+ msg_jid.resource = self.own_nick
super().ack_message(msg_id, msg_jid)
@staticmethod
@@ -104,18 +114,6 @@ class PrivateTab(OneToOneTab):
def remove_information_element(plugin_name):
del PrivateTab.additional_information[plugin_name]
- def load_logs(self, log_nb):
- logs = logger.get_logs(self.jid.full.replace('/', '\\'), log_nb)
- return logs
-
- def log_message(self, txt, nickname, time=None, typ=1):
- """
- Log the messages in the archives.
- """
- if not logger.log_message(
- self.jid.full, nickname, txt, date=time, typ=typ):
- self.core.information('Unable to write in the log file', 'Error')
-
def on_close(self):
super().on_close()
self.parent_muc.privates.remove(self)
@@ -131,7 +129,7 @@ class PrivateTab(OneToOneTab):
compare_users = lambda x: x.last_talked
word_list = [user.nick for user in sorted(self.parent_muc.users, key=compare_users, reverse=True)\
if user.nick != self.own_nick]
- after = config.get('after_completion') + ' '
+ after = config.getstr('after_completion') + ' '
input_pos = self.input.pos
if ' ' not in self.input.get_text()[:input_pos] or (self.input.last_completion and\
self.input.get_text()[:input_pos] == self.input.last_completion + after):
@@ -144,40 +142,87 @@ class PrivateTab(OneToOneTab):
and not self.input.get_text().startswith('//'))
self.send_composing_chat_state(empty_after)
+ async def handle_message(self, message: SMessage, display: bool = True):
+ sent = message['from'].bare == self.core.xmpp.boundjid.bare
+ jid = message['to'] if sent else message['from']
+ with_nick = jid.resource
+ sender_nick = with_nick
+ if sent:
+ sender_nick = (self.own_nick or self.core.own_nick)
+ room_from = jid.bare
+ use_xhtml = config.get_by_tabname(
+ 'enable_xhtml_im',
+ jid.bare
+ )
+ tmp_dir = get_image_cache()
+ if not sent:
+ await self.core.events.trigger_async('private_msg', message, self)
+ body = xhtml.get_body_from_message_stanza(
+ message, use_xhtml=use_xhtml, extract_images_to=tmp_dir)
+ if not body or not self:
+ return
+ delayed, date = common.find_delayed_tag(message)
+ replaced = False
+ user = self.parent_muc.get_user_by_name(with_nick)
+ if message.get_plugin('replace', check=True):
+ replaced_id = message['replace']['id']
+ if replaced_id != '' and config.get_by_tabname(
+ 'group_corrections', room_from):
+ try:
+ self.modify_message(
+ body,
+ replaced_id,
+ message['id'],
+ user=user,
+ time=date,
+ jid=message['from'],
+ nickname=sender_nick)
+ replaced = True
+ except CorrectionError:
+ log.debug('Unable to correct a message', exc_info=True)
+ if not replaced:
+ msg = Message(
+ txt=body,
+ time=date,
+ history=delayed,
+ nickname=sender_nick,
+ nick_color=get_theme().COLOR_OWN_NICK if sent else None,
+ user=user,
+ identifier=message['id'],
+ jid=message['from'],
+ )
+ if display:
+ self.add_message(msg)
+ else:
+ self.log_message(msg)
+ if sent:
+ self.set_last_sent_message(message, correct=replaced)
+ else:
+ self.last_remote_message = datetime.now()
+
+ @refresh_wrapper.always
@command_args_parser.raw
- def command_say(self, line, attention=False, correct=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False) -> None:
if not self.on:
return
- echo_message = self.jid.resource != self.own_nick
- msg = self.core.xmpp.make_message(self.jid.full)
+ await self._initial_log.wait()
+ our_jid = JID(self.jid.bare)
+ our_jid.resource = self.own_nick
+ msg: SMessage = self.core.xmpp.make_message(
+ mto=self.jid.full,
+ mfrom=our_jid,
+ )
msg['type'] = 'chat'
msg['body'] = line
+ msg.enable('muc')
# trigger the event BEFORE looking for colors.
# This lets a plugin insert \x19xxx} colors, that will
# be converted in xhtml.
self.core.events.trigger('private_say', msg, self)
if not msg['body']:
- self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
return
- user = self.parent_muc.get_user_by_name(self.own_nick)
- replaced = False
- if correct or msg['replace']['id']:
- msg['replace']['id'] = self.last_sent_message['id']
- if (config.get_by_tabname('group_corrections', self.jid.full)
- and echo_message):
- try:
- self.modify_message(
- msg['body'],
- self.last_sent_message['id'],
- msg['id'],
- user=user,
- jid=self.core.xmpp.boundjid,
- nickname=self.own_nick)
- replaced = True
- except:
- log.error('Unable to correct a message', exc_info=True)
+ if correct or msg['replace']['id'] and self.last_sent_message:
+ msg['replace']['id'] = self.last_sent_message['id'] # type: ignore
else:
del msg['replace']
@@ -186,43 +231,32 @@ class PrivateTab(OneToOneTab):
msg['html']['body'] = xhtml.poezio_colors_to_html(msg['body'])
msg['body'] = xhtml.clean_text(msg['body'])
if config.get_by_tabname('send_chat_states', self.general_jid):
- needed = 'inactive' if self.inactive else 'active'
- msg['chat_state'] = needed
+ if self.inactive:
+ self.send_chat_state('inactive', always_send=True)
+ else:
+ msg['chat_state'] = 'active'
if attention:
msg['attention'] = True
self.core.events.trigger('private_say_after', msg, self)
if not msg['body']:
- self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
return
- if not replaced and echo_message:
- self.add_message(
- msg['body'],
- nickname=self.own_nick or self.core.own_nick,
- forced_user=user,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=msg['id'],
- jid=self.core.xmpp.boundjid,
- typ=1)
-
- self.last_sent_message = msg
- msg._add_receipt = True
+ self.set_last_sent_message(msg, correct=correct)
+ await self.core.handler.on_groupchat_private_message(msg, sent=True)
+ # Our receipts slixmpp hack
+ msg._add_receipt = True # type: ignore
msg.send()
self.cancel_paused_delay()
- self.text_win.refresh()
- self.input.refresh()
@command_args_parser.quoted(0, 1)
- def command_version(self, args):
+ async def command_version(self, args):
"""
/version
"""
if args:
- return self.core.command.version(args[0])
+ return await self.core.command.version(args[0])
jid = self.jid.full
- self.core.xmpp.plugin['xep_0092'].get_version(
- jid, callback=self.core.handler.on_version_result)
+ iq = await self.core.xmpp.plugin['xep_0092'].get_version(jid)
+ self.core.handler.on_version_result(iq)
@command_args_parser.quoted(0, 1)
def command_info(self, arg):
@@ -247,8 +281,8 @@ class PrivateTab(OneToOneTab):
self.text_win.resize(
self.height - 2 - info_win_height - tab_win_height, self.width, 0,
- 0)
- self.text_win.rebuild_everything(self._text_buffer)
+ 0, self._text_buffer, force=self.ui_config_changed)
+ self.ui_config_changed = False
self.info_header.resize(
1, self.width, self.height - 2 - info_win_height - tab_win_height,
0)
@@ -326,9 +360,6 @@ class PrivateTab(OneToOneTab):
1, self.width, self.height - 2 - self.core.information_win_size -
Tab.tab_win_height(), 0)
- def get_text_window(self):
- return self.text_win
-
@refresh_wrapper.conditional
def rename_user(self, old_nick, user):
"""
@@ -336,16 +367,18 @@ class PrivateTab(OneToOneTab):
display a message.
"""
self.add_message(
- '\x19%(nick_col)s}%(old)s\x19%(info_col)s} is now '
- 'known as \x19%(nick_col)s}%(new)s' % {
- 'old': old_nick,
- 'new': user.nick,
- 'nick_col': dump_tuple(user.color),
- 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- },
- typ=2)
+ PersistentInfoMessage(
+ '\x19%(nick_col)s}%(old)s\x19%(info_col)s} is now '
+ 'known as \x19%(nick_col)s}%(new)s' % {
+ 'old': old_nick,
+ 'new': user.nick,
+ 'nick_col': dump_tuple(user.color),
+ 'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ },
+ ),
+ )
new_jid = self.jid.bare + '/' + user.nick
- self.name = new_jid
+ self._name = new_jid
return self.core.tabs.current_tab is self
@refresh_wrapper.conditional
@@ -363,28 +396,32 @@ class PrivateTab(OneToOneTab):
if not status_message:
self.add_message(
- '\x19%(quit_col)s}%(spec)s \x19%(nick_col)s}'
- '%(nick)s\x19%(info_col)s} has left the room' % {
- 'nick': user.nick,
- 'spec': theme.CHAR_QUIT,
- 'nick_col': color,
- 'quit_col': dump_tuple(theme.COLOR_QUIT_CHAR),
- 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
- },
- typ=2)
+ PersistentInfoMessage(
+ '\x19%(quit_col)s}%(spec)s \x19%(nick_col)s}'
+ '%(nick)s\x19%(info_col)s} has left the room' % {
+ 'nick': user.nick,
+ 'spec': theme.CHAR_QUIT,
+ 'nick_col': color,
+ 'quit_col': dump_tuple(theme.COLOR_QUIT_CHAR),
+ 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
+ },
+ ),
+ )
else:
self.add_message(
- '\x19%(quit_col)s}%(spec)s \x19%(nick_col)s}'
- '%(nick)s\x19%(info_col)s} has left the room'
- ' (%(status)s)' % {
- 'status': status_message,
- 'nick': user.nick,
- 'spec': theme.CHAR_QUIT,
- 'nick_col': color,
- 'quit_col': dump_tuple(theme.COLOR_QUIT_CHAR),
- 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
- },
- typ=2)
+ PersistentInfoMessage(
+ '\x19%(quit_col)s}%(spec)s \x19%(nick_col)s}'
+ '%(nick)s\x19%(info_col)s} has left the room'
+ ' (%(status)s)' % {
+ 'status': status_message,
+ 'nick': user.nick,
+ 'spec': theme.CHAR_QUIT,
+ 'nick_col': color,
+ 'quit_col': dump_tuple(theme.COLOR_QUIT_CHAR),
+ 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
+ },
+ ),
+ )
return self.core.tabs.current_tab is self
@refresh_wrapper.conditional
@@ -393,7 +430,6 @@ class PrivateTab(OneToOneTab):
The user (or at least someone with the same nick) came back in the MUC
"""
self.activate()
- self.check_features()
tab = self.parent_muc
theme = get_theme()
color = dump_tuple(theme.COLOR_REMOTE_USER)
@@ -403,26 +439,28 @@ class PrivateTab(OneToOneTab):
if user:
color = dump_tuple(user.color)
self.add_message(
- '\x19%(join_col)s}%(spec)s \x19%(color)s}%(nick)s\x19'
- '%(info_col)s} joined the room' % {
- 'nick': nick,
- 'color': color,
- 'spec': theme.CHAR_JOIN,
- 'join_col': dump_tuple(theme.COLOR_JOIN_CHAR),
- 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
- },
- typ=2)
+ PersistentInfoMessage(
+ '\x19%(join_col)s}%(spec)s \x19%(color)s}%(nick)s\x19'
+ '%(info_col)s} joined the room' % {
+ 'nick': nick,
+ 'color': color,
+ 'spec': theme.CHAR_JOIN,
+ 'join_col': dump_tuple(theme.COLOR_JOIN_CHAR),
+ 'info_col': dump_tuple(theme.COLOR_INFORMATION_TEXT)
+ },
+ ),
+ )
return self.core.tabs.current_tab is self
def activate(self, reason=None):
self.on = True
if reason:
- self.add_message(txt=reason, typ=2)
+ self.add_message(PersistentInfoMessage(reason))
def deactivate(self, reason=None):
self.on = False
if reason:
- self.add_message(txt=reason, typ=2)
+ self.add_message(PersistentInfoMessage(reason))
def matching_names(self):
return [(3, self.jid.resource), (4, self.name)]
@@ -432,9 +470,11 @@ class PrivateTab(OneToOneTab):
error = '\x19%s}%s\x19o' % (dump_tuple(theme.COLOR_CHAR_NACK),
error_message)
self.add_message(
- error,
- highlight=True,
- nickname='Error',
- nick_color=theme.COLOR_ERROR_MSG,
- typ=2)
+ Message(
+ error,
+ highlight=True,
+ nickname='Error',
+ nick_color=theme.COLOR_ERROR_MSG,
+ ),
+ )
self.core.refresh_window()
diff --git a/poezio/tabs/rostertab.py b/poezio/tabs/rostertab.py
index a5ce268b..18334c20 100644
--- a/poezio/tabs/rostertab.py
+++ b/poezio/tabs/rostertab.py
@@ -14,44 +14,36 @@ import ssl
from functools import partial
from os import getenv, path
from pathlib import Path
-from typing import Dict, Callable
+from typing import Dict, Callable, Union
+
+from slixmpp import JID, InvalidJID
+from slixmpp.exceptions import IqError, IqTimeout
-from poezio import common
from poezio import windows
-from poezio.common import safeJID, shell_split
+from poezio.common import shell_split
from poezio.config import config
from poezio.contact import Contact, Resource
from poezio.decorators import refresh_wrapper
from poezio.roster import RosterGroup, roster
from poezio.theming import get_theme, dump_tuple
-from poezio.decorators import command_args_parser
+from poezio.decorators import command_args_parser, deny_anonymous
from poezio.core.structs import Command, Completion
from poezio.tabs import Tab
+from poezio.ui.types import InfoMessage
log = logging.getLogger(__name__)
-def deny_anonymous(func: Callable) -> Callable:
- def wrap(self: 'RosterInfoTab', *args, **kwargs):
- if self.core.xmpp.anon:
- return self.core.information(
- 'This command is not available for anonymous accounts.',
- 'Info'
- )
- return func(self, *args, **kwargs)
- return wrap
-
-
class RosterInfoTab(Tab):
"""
A tab, split in two, containing the roster and infos
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
def __init__(self, core):
Tab.__init__(self, core)
- self.name = "Roster"
+ self._name = "Roster"
self.v_separator = windows.VerticalSeparator()
self.information_win = windows.TextWin()
self.core.information_buffer.add_window(self.information_win)
@@ -83,15 +75,6 @@ class RosterInfoTab(Tab):
self.key_func["S"] = self.start_search_slow
self.key_func["n"] = self.change_contact_name
self.register_command(
- 'deny',
- self.command_deny,
- usage='[jid]',
- desc='Deny your presence to the provided JID (or the '
- 'selected contact in your roster), who is asking'
- 'you to be in their roster.',
- shortdesc='Deny a user your presence.',
- completion=self.completion_deny)
- self.register_command(
'name',
self.command_name,
usage='<jid> [name]',
@@ -119,16 +102,6 @@ class RosterInfoTab(Tab):
shortdesc='Remove a user from a group.',
completion=self.completion_groupremove)
self.register_command(
- 'remove',
- self.command_remove,
- usage='[jid]',
- desc='Remove the specified JID from your roster. This '
- 'will unsubscribe you from its presence, cancel '
- 'its subscription to yours, and remove the item '
- 'from your roster.',
- shortdesc='Remove a user from your roster.',
- completion=self.completion_remove)
- self.register_command(
'export',
self.command_export,
usage='[/path/to/file]',
@@ -171,18 +144,6 @@ class RosterInfoTab(Tab):
def check_blocking(self, features):
if 'urn:xmpp:blocking' in features and not self.core.xmpp.anon:
self.register_command(
- 'block',
- self.command_block,
- usage='[jid]',
- shortdesc='Prevent a JID from talking to you.',
- completion=self.completion_block)
- self.register_command(
- 'unblock',
- self.command_unblock,
- usage='[jid]',
- shortdesc='Allow a JID to talk to you.',
- completion=self.completion_unblock)
- self.register_command(
'list_blocks',
self.command_list_blocks,
shortdesc='Show the blocked contacts.')
@@ -238,50 +199,40 @@ class RosterInfoTab(Tab):
completion=self.completion_cert_fetch)
@property
- def selected_row(self):
+ def selected_row(self) -> Union[Contact, Resource]:
return self.roster_win.get_selected_row()
@command_args_parser.ignored
- def command_certs(self):
+ async def command_certs(self):
"""
/certs
"""
-
- def cb(iq):
- if iq['type'] == 'error':
- self.core.information(
- 'Unable to retrieve the certificate list.', 'Error')
- return
- certs = []
- for item in iq['sasl_certs']['items']:
- users = '\n'.join(item['users'])
- certs.append((item['name'], users))
-
- if not certs:
- return self.core.information('No certificates found', 'Info')
- msg = 'Certificates:\n'
- msg += '\n'.join(
- ((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1]))
- for item in certs))
- self.core.information(msg, 'Info')
-
- self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3)
+ try:
+ iq = await self.core.xmpp.plugin['xep_0257'].get_certs(timeout=3)
+ except (IqError, IqTimeout):
+ self.core.information(
+ 'Unable to retrieve the certificate list.', 'Error')
+ return
+ certs = []
+ for item in iq['sasl_certs']['items']:
+ users = '\n'.join(item['users'])
+ certs.append((item['name'], users))
+
+ if not certs:
+ return self.core.information('No certificates found', 'Info')
+ msg = 'Certificates:\n'
+ msg += '\n'.join(
+ ((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1]))
+ for item in certs))
+ self.core.information(msg, 'Info')
@command_args_parser.quoted(2, 1)
- def command_cert_add(self, args):
+ async def command_cert_add(self, args):
"""
/cert_add <name> <certfile> [cert-management]
"""
if not args or len(args) < 2:
return self.core.command.help('cert_add')
-
- def cb(iq):
- if iq['type'] == 'error':
- self.core.information('Unable to add the certificate.',
- 'Error')
- else:
- self.core.information('Certificate added.', 'Info')
-
name = args[0]
try:
@@ -307,8 +258,17 @@ class RosterInfoTab(Tab):
else:
management = True
- self.core.xmpp.plugin['xep_0257'].add_cert(
- name, crt, callback=cb, allow_management=management)
+ try:
+ await self.core.xmpp.plugin['xep_0257'].add_cert(
+ name,
+ crt,
+ allow_management=management
+ )
+ self.core.information('Certificate added.', 'Info')
+ except (IqError, IqTimeout):
+ self.core.information('Unable to add the certificate.',
+ 'Error')
+
def completion_cert_add(self, the_input):
"""
@@ -324,76 +284,62 @@ class RosterInfoTab(Tab):
return Completion(the_input.new_completion, ['true', 'false'], n)
@command_args_parser.quoted(1)
- def command_cert_disable(self, args):
+ async def command_cert_disable(self, args):
"""
/cert_disable <name>
"""
if not args:
return self.core.command.help('cert_disable')
-
- def cb(iq):
- if iq['type'] == 'error':
- self.core.information('Unable to disable the certificate.',
- 'Error')
- else:
- self.core.information('Certificate disabled.', 'Info')
-
name = args[0]
-
- self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb)
+ try:
+ await self.core.xmpp.plugin['xep_0257'].disable_cert(name)
+ self.core.information('Certificate disabled.', 'Info')
+ except (IqError, IqTimeout):
+ self.core.information('Unable to disable the certificate.',
+ 'Error')
@command_args_parser.quoted(1)
- def command_cert_revoke(self, args):
+ async def command_cert_revoke(self, args):
"""
/cert_revoke <name>
"""
if not args:
return self.core.command.help('cert_revoke')
-
- def cb(iq):
- if iq['type'] == 'error':
- self.core.information('Unable to revoke the certificate.',
- 'Error')
- else:
- self.core.information('Certificate revoked.', 'Info')
-
name = args[0]
-
- self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb)
+ try:
+ await self.core.xmpp.plugin['xep_0257'].revoke_cert(name)
+ self.core.information('Certificate revoked.', 'Info')
+ except (IqError, IqTimeout):
+ self.core.information('Unable to revoke the certificate.',
+ 'Error')
@command_args_parser.quoted(2)
- def command_cert_fetch(self, args):
+ async def command_cert_fetch(self, args):
"""
/cert_fetch <name> <path>
"""
if not args or len(args) < 2:
return self.core.command.help('cert_fetch')
-
- def cb(iq):
- if iq['type'] == 'error':
- self.core.information('Unable to fetch the certificate.',
- 'Error')
- return
-
- cert = None
- for item in iq['sasl_certs']['items']:
- if item['name'] == name:
- cert = base64.b64decode(item['x509cert'])
- break
-
- if not cert:
- return self.core.information('Certificate not found.', 'Info')
-
- cert = ssl.DER_cert_to_PEM_cert(cert)
- with open(path, 'w') as fd:
- fd.write(cert)
-
- self.core.information('File stored at %s' % path, 'Info')
-
name = args[0]
path = args[1]
- self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb)
+ try:
+ iq = await self.core.xmpp.plugin['xep_0257'].get_certs()
+ except (IqError, IqTimeout):
+ self.core.information('Unable to fetch the certificate.',
+ 'Error')
+ return
+ cert = None
+ for item in iq['sasl_certs']['items']:
+ if item['name'] == name:
+ cert = base64.b64decode(item['x509cert'])
+ break
+ if not cert:
+ return self.core.information('Certificate not found.', 'Info')
+ cert = ssl.DER_cert_to_PEM_cert(cert)
+ with open(path, 'w') as fd:
+ fd.write(cert)
+ self.core.information('File stored at %s' % path, 'Info')
def completion_cert_fetch(self, the_input):
"""
@@ -414,100 +360,30 @@ class RosterInfoTab(Tab):
if not tab:
log.debug('Received message from nonexistent tab: %s',
message['from'])
- message = '\x19%(info_col)s}Cannot send message to %(jid)s: contact blocked' % {
+ message = 'Cannot send message to %(jid)s: contact blocked' % {
'info_col': dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
'jid': message['from'],
}
- tab.add_message(message)
-
- @command_args_parser.quoted(0, 1)
- def command_block(self, args):
- """
- /block [jid]
- """
- item = self.roster_win.selected_row
- if args:
- jid = safeJID(args[0])
- elif isinstance(item, Contact):
- jid = item.bare_jid
- elif isinstance(item, Resource):
- jid = item.jid.bare
-
- def callback(iq):
- if iq['type'] == 'error':
- return self.core.information('Could not block %s.' % jid,
- 'Error')
- elif iq['type'] == 'result':
- return self.core.information('Blocked %s.' % jid, 'Info')
-
- self.core.xmpp.plugin['xep_0191'].block(jid, callback=callback)
-
- def completion_block(self, the_input):
- """
- Completion for /block
- """
- if the_input.get_argument_position() == 1:
- jids = roster.jids()
- return Completion(
- the_input.new_completion, jids, 1, '', quotify=False)
-
- @command_args_parser.quoted(0, 1)
- def command_unblock(self, args):
- """
- /unblock [jid]
- """
-
- def callback(iq):
- if iq['type'] == 'error':
- return self.core.information('Could not unblock the contact.',
- 'Error')
- elif iq['type'] == 'result':
- return self.core.information('Contact unblocked.', 'Info')
-
- item = self.roster_win.selected_row
- if args:
- jid = safeJID(args[0])
- elif isinstance(item, Contact):
- jid = item.bare_jid
- elif isinstance(item, Resource):
- jid = item.jid.bare
- self.core.xmpp.plugin['xep_0191'].unblock(jid, callback=callback)
-
- def completion_unblock(self, the_input):
- """
- Completion for /unblock
- """
-
- def on_result(iq):
- if iq['type'] == 'error':
- return
- l = sorted(str(item) for item in iq['blocklist']['items'])
- return Completion(the_input.new_completion, l, 1, quotify=False)
-
- if the_input.get_argument_position():
- self.core.xmpp.plugin['xep_0191'].get_blocked(callback=on_result)
- return True
+ tab.add_message(InfoMessage(message))
@command_args_parser.ignored
- def command_list_blocks(self):
+ async def command_list_blocks(self):
"""
/list_blocks
"""
-
- def callback(iq):
- if iq['type'] == 'error':
- return self.core.information(
- 'Could not retrieve the blocklist.', 'Error')
- s = 'List of blocked JIDs:\n'
- items = (str(item) for item in iq['blocklist']['items'])
- jids = '\n'.join(items)
- if jids:
- s += jids
- else:
- s = 'No blocked JIDs.'
- self.core.information(s, 'Info')
-
- self.core.xmpp.plugin['xep_0191'].get_blocked(callback=callback)
+ try:
+ iq = await self.core.xmpp.plugin['xep_0191'].get_blocked()
+ except (IqError, IqTimeout) as iq:
+ return self.core.information(
+ 'Could not retrieve the blocklist.', 'Error')
+ s = 'List of blocked JIDs:\n'
+ items = (str(item) for item in iq['blocklist']['items'])
+ jids = '\n'.join(items)
+ if jids:
+ s += jids
+ else:
+ s = 'No blocked JIDs.'
+ self.core.information(s, 'Info')
@command_args_parser.ignored
def command_disconnect(self):
@@ -558,7 +434,9 @@ class RosterInfoTab(Tab):
roster_width)
self.information_win.resize(
self.height - 1 - tab_win_height - contact_win_h, info_width,
- 0, roster_width + 1, self.core.information_buffer)
+ 0, roster_width + 1, self.core.information_buffer,
+ force=self.ui_config_changed)
+ self.ui_config_changed = False
if display_contact_win:
y = self.height - tab_win_height - contact_win_h - 1
avatar_width = contact_win_h * 2
@@ -632,64 +510,34 @@ class RosterInfoTab(Tab):
@deny_anonymous
@command_args_parser.quoted(1)
- def command_password(self, args):
+ async def command_password(self, args):
"""
/password <password>
"""
-
- def callback(iq):
- if iq['type'] == 'result':
- self.core.information('Password updated', 'Account')
- if config.get('password'):
- config.silent_set('password', args[0])
- else:
- self.core.information('Unable to change the password',
- 'Account')
-
- self.core.xmpp.plugin['xep_0077'].change_password(
- args[0], callback=callback)
-
- @deny_anonymous
- @command_args_parser.quoted(0, 1)
- def command_deny(self, args):
- """
- /deny [jid]
- Denies a JID from our roster
- """
- if not args:
- item = self.roster_win.selected_row
- if isinstance(item, Contact):
- jid = item.bare_jid
- else:
- self.core.information('No subscription to deny', 'Warning')
- return
- else:
- jid = safeJID(args[0]).bare
- if jid not in [jid for jid in roster.jids()]:
- self.core.information('No subscription to deny', 'Warning')
- return
-
- contact = roster[jid]
- if contact:
- contact.unauthorize()
- self.core.information('Subscription to %s was revoked' % jid,
- 'Roster')
+ try:
+ await self.core.xmpp.plugin['xep_0077'].change_password(
+ args[0]
+ )
+ self.core.information('Password updated', 'Account')
+ if config.getstr('password'):
+ config.silent_set('password', args[0])
+ except (IqError, IqTimeout):
+ self.core.information('Unable to change the password',
+ 'Account')
@deny_anonymous
@command_args_parser.quoted(1, 1)
- def command_name(self, args):
+ async def command_name(self, args):
"""
Set a name for the specified JID in your roster
"""
-
- def callback(iq):
- if not iq:
- self.core.information('The name could not be set.', 'Error')
- log.debug('Error in /name:\n%s', iq)
-
if args is None:
return self.core.command.help('name')
- jid = safeJID(args[0]).bare
+ try:
+ jid = JID(args[0]).bare
+ except InvalidJID:
+ self.core.information(f'Invalid JID: {args[0]}', 'Error')
+ return
name = args[1] if len(args) == 2 else ''
contact = roster[jid]
@@ -701,16 +549,19 @@ class RosterInfoTab(Tab):
if 'none' in groups:
groups.remove('none')
subscription = contact.subscription
- self.core.xmpp.update_roster(
- jid,
- name=name,
- groups=groups,
- subscription=subscription,
- callback=callback)
+ try:
+ await self.core.xmpp.update_roster(
+ jid,
+ name=name,
+ groups=groups,
+ subscription=subscription
+ )
+ except (IqError, IqTimeout):
+ self.core.information('The name could not be set.', 'Error')
@deny_anonymous
@command_args_parser.quoted(1, 1)
- def command_groupadd(self, args):
+ async def command_groupadd(self, args):
"""
Add the specified JID to the specified group
"""
@@ -726,7 +577,11 @@ class RosterInfoTab(Tab):
else:
return self.core.command.help('groupadd')
else:
- jid = safeJID(args[0]).bare
+ try:
+ jid = JID(args[0]).bare
+ except InvalidJID:
+ self.core.information(f'Invalid JID: {args[0]}', 'Error')
+ return
group = args[1]
contact = roster[jid]
@@ -749,29 +604,31 @@ class RosterInfoTab(Tab):
name = contact.name
subscription = contact.subscription
- def callback(iq):
- if iq:
- roster.update_contact_groups(jid)
- else:
- self.core.information('The group could not be set.', 'Error')
- log.debug('Error in groupadd:\n%s', iq)
- self.core.xmpp.update_roster(
- jid,
- name=name,
- groups=new_groups,
- subscription=subscription,
- callback=callback)
+ try:
+ await self.core.xmpp.update_roster(
+ jid,
+ name=name,
+ groups=new_groups,
+ subscription=subscription,
+ )
+ roster.update_contact_groups(jid)
+ except (IqError, IqTimeout):
+ self.core.information('The group could not be set.', 'Error')
@deny_anonymous
@command_args_parser.quoted(3)
- def command_groupmove(self, args):
+ async def command_groupmove(self, args):
"""
Remove the specified JID from the first specified group and add it to the second one
"""
if args is None:
return self.core.command.help('groupmove')
- jid = safeJID(args[0]).bare
+ try:
+ jid = JID(args[0]).bare
+ except InvalidJID:
+ self.core.information(f'Invalid JID: {args[0]}', 'Error')
+ return
group_from = args[1]
group_to = args[2]
@@ -808,31 +665,31 @@ class RosterInfoTab(Tab):
new_groups.remove(group_from)
name = contact.name
subscription = contact.subscription
-
- def callback(iq):
- if iq:
- roster.update_contact_groups(contact)
- else:
- self.core.information('The group could not be set', 'Error')
- log.debug('Error in groupmove:\n%s', iq)
-
- self.core.xmpp.update_roster(
- jid,
- name=name,
- groups=new_groups,
- subscription=subscription,
- callback=callback)
+ try:
+ await self.core.xmpp.update_roster(
+ jid,
+ name=name,
+ groups=new_groups,
+ subscription=subscription,
+ )
+ roster.update_contact_groups(contact)
+ except (IqError, IqTimeout):
+ self.core.information('The group could not be set', 'Error')
@deny_anonymous
@command_args_parser.quoted(2)
- def command_groupremove(self, args):
+ async def command_groupremove(self, args):
"""
Remove the specified JID from the specified group
"""
if args is None:
return self.core.command.help('groupremove')
- jid = safeJID(args[0]).bare
+ try:
+ jid = JID(args[0]).bare
+ except InvalidJID:
+ self.core.information(f'Invalid JID: {args[0]}', 'Error')
+ return
group = args[1]
contact = roster[jid]
@@ -854,39 +711,16 @@ class RosterInfoTab(Tab):
new_groups.remove(group)
name = contact.name
subscription = contact.subscription
-
- def callback(iq):
- if iq:
- roster.update_contact_groups(jid)
- else:
- self.core.information('The group could not be set')
- log.debug('Error in groupremove:\n%s', iq)
-
- self.core.xmpp.update_roster(
- jid,
- name=name,
- groups=new_groups,
- subscription=subscription,
- callback=callback)
-
- @deny_anonymous
- @command_args_parser.quoted(0, 1)
- def command_remove(self, args):
- """
- Remove the specified JID from the roster. i.e.: unsubscribe
- from its presence, and cancel its subscription to our.
- """
- if args:
- jid = safeJID(args[0]).bare
- else:
- item = self.roster_win.selected_row
- if isinstance(item, Contact):
- jid = item.bare_jid
- else:
- self.core.information('No roster item to remove', 'Error')
- return
- roster.remove(jid)
- del roster[jid]
+ try:
+ self.core.xmpp.update_roster(
+ jid,
+ name=name,
+ groups=new_groups,
+ subscription=subscription,
+ )
+ roster.update_contact_groups(jid)
+ except (IqError, IqTimeout):
+ self.core.information('The group could not be set')
@deny_anonymous
@command_args_parser.quoted(0, 1)
@@ -914,7 +748,7 @@ class RosterInfoTab(Tab):
log.error('Unable to correct a message', exc_info=True)
return
for jid in lines:
- self.command.command_add(jid.lstrip('\n'))
+ self.core.command.command_add(jid.lstrip('\n'))
self.core.information('Contacts imported from %s' % filepath, 'Info')
@deny_anonymous
@@ -1012,16 +846,6 @@ class RosterInfoTab(Tab):
the_input.new_completion, groups, n, '', quotify=True)
return False
- def completion_deny(self, the_input):
- """
- Complete the first argument from the list of the
- contact with ask=='subscribe'
- """
- jids = sorted(
- str(contact.bare_jid) for contact in roster.contacts.values()
- if contact.pending_in)
- return Completion(the_input.new_completion, jids, 1, '', quotify=False)
-
def refresh(self):
if self.need_resize:
self.resize()
@@ -1062,7 +886,7 @@ class RosterInfoTab(Tab):
Show or hide offline contacts
"""
option = 'roster_show_offline'
- value = config.get(option)
+ value = config.getbool(option)
success = config.silent_set(option, str(not value))
roster.modified()
if not success:
@@ -1206,15 +1030,6 @@ class RosterInfoTab(Tab):
'%s connected resource%s' % (len(cont), ''
if len(cont) == 1 else 's'))
acc.append('Current status: %s' % res.status)
- if cont.tune:
- acc.append('Tune: %s' % common.format_tune_string(cont.tune))
- if cont.mood:
- acc.append('Mood: %s' % cont.mood)
- if cont.activity:
- acc.append('Activity: %s' % cont.activity)
- if cont.gaming:
- acc.append(
- 'Game: %s' % (common.format_gaming_string(cont.gaming)))
msg = '\n'.join(acc)
elif isinstance(selected_row, Resource):
res = selected_row
@@ -1240,7 +1055,7 @@ class RosterInfoTab(Tab):
if isinstance(selected_row, Contact):
jid = selected_row.bare_jid
elif isinstance(selected_row, Resource):
- jid = safeJID(selected_row.jid).bare
+ jid = JID(selected_row.jid).bare
else:
return
self.on_slash()
@@ -1322,8 +1137,11 @@ def jid_and_name_match(contact, txt):
if not txt:
return True
txt = txt.lower()
- if txt in safeJID(contact.bare_jid).bare.lower():
- return True
+ try:
+ if txt in JID(contact.bare_jid).bare.lower():
+ return True
+ except InvalidJID:
+ pass
if txt in contact.name.lower():
return True
return False
@@ -1336,9 +1154,12 @@ def jid_and_name_match_slow(contact, txt):
"""
if not txt:
return True # Everything matches when search is empty
- user = safeJID(contact.bare_jid).bare
- if diffmatch(txt, user):
- return True
+ try:
+ user = JID(contact.bare_jid).bare
+ if diffmatch(txt, user):
+ return True
+ except InvalidJID:
+ pass
if contact.name and diffmatch(txt, contact.name):
return True
return False
diff --git a/poezio/tabs/xmltab.py b/poezio/tabs/xmltab.py
index c4a50df8..939af67d 100644
--- a/poezio/tabs/xmltab.py
+++ b/poezio/tabs/xmltab.py
@@ -10,7 +10,8 @@ log = logging.getLogger(__name__)
import curses
import os
-from slixmpp.xmlstream import matcher
+from slixmpp import JID, InvalidJID
+from slixmpp.xmlstream import matcher, StanzaBase
from slixmpp.xmlstream.tostring import tostring
from slixmpp.xmlstream.stanzabase import ElementBase
from xml.etree import ElementTree as ET
@@ -21,17 +22,16 @@ from poezio import text_buffer
from poezio import windows
from poezio.xhtml import clean_text
from poezio.decorators import command_args_parser, refresh_wrapper
-from poezio.common import safeJID
class MatchJID:
- def __init__(self, jid, dest=''):
+ def __init__(self, jid: JID, dest: str = ''):
self.jid = jid
self.dest = dest
- def match(self, xml):
- from_ = safeJID(xml['from'])
- to_ = safeJID(xml['to'])
+ def match(self, xml: StanzaBase):
+ from_ = xml['from']
+ to_ = xml['to']
if self.jid.full == self.jid.bare:
from_ = from_.bare
to_ = to_.bare
@@ -58,14 +58,14 @@ class XMLTab(Tab):
def __init__(self, core):
Tab.__init__(self, core)
self.state = 'normal'
- self.name = 'XMLTab'
+ self._name = 'XMLTab'
self.filters = []
self.core_buffer = self.core.xml_buffer
self.filtered_buffer = text_buffer.TextBuffer()
self.info_header = windows.XMLInfoWin()
- self.text_win = windows.XMLTextWin()
+ self.text_win = windows.TextWin()
self.core_buffer.add_window(self.text_win)
self.default_help_message = windows.HelpText("/ to enter a command")
@@ -120,7 +120,7 @@ class XMLTab(Tab):
usage='<filename>',
desc='Writes the content of the XML buffer into a file.',
shortdesc='Write in a file.')
- self.input = self.default_help_message
+ self.input = self.default_help_message # type: ignore
self.key_func['^T'] = self.close
self.key_func['^I'] = self.completion
self.key_func["KEY_DOWN"] = self.on_scroll_down
@@ -173,7 +173,7 @@ class XMLTab(Tab):
self.text_win.toggle_lock()
self.refresh()
- def match_stanza(self, stanza):
+ def match_stanza(self, stanza) -> bool:
for matcher_ in self.filters:
if not matcher_.match(stanza):
return False
@@ -190,33 +190,36 @@ class XMLTab(Tab):
self.command_filter_reset()
@command_args_parser.raw
- def command_filter_to(self, jid):
+ def command_filter_to(self, jid_str: str):
"""/filter_jid_to <jid>"""
- jid_obj = safeJID(jid)
- if not jid_obj:
+ try:
+ jid = JID(jid_str)
+ except InvalidJID:
return self.core.information('Invalid JID: %s' % jid, 'Error')
- self.update_filters(MatchJID(jid_obj, dest='to'))
+ self.update_filters(MatchJID(jid, dest='to'))
self.refresh()
@command_args_parser.raw
- def command_filter_from(self, jid):
+ def command_filter_from(self, jid_str: str):
"""/filter_jid_from <jid>"""
- jid_obj = safeJID(jid)
- if not jid_obj:
+ try:
+ jid = JID(jid_str)
+ except InvalidJID:
return self.core.information('Invalid JID: %s' % jid, 'Error')
- self.update_filters(MatchJID(jid_obj, dest='from'))
+ self.update_filters(MatchJID(jid, dest='from'))
self.refresh()
@command_args_parser.raw
- def command_filter_jid(self, jid):
+ def command_filter_jid(self, jid_str: str):
"""/filter_jid <jid>"""
- jid_obj = safeJID(jid)
- if not jid_obj:
+ try:
+ jid = JID(jid_str)
+ except InvalidJID:
return self.core.information('Invalid JID: %s' % jid, 'Error')
- self.update_filters(MatchJID(jid_obj))
+ self.update_filters(MatchJID(jid))
self.refresh()
@command_args_parser.quoted(1)
@@ -229,7 +232,7 @@ class XMLTab(Tab):
self.refresh()
@command_args_parser.raw
- def command_filter_xpath(self, xpath):
+ def command_filter_xpath(self, xpath: str):
"""/filter_xpath <xpath>"""
try:
self.update_filters(
@@ -262,7 +265,10 @@ class XMLTab(Tab):
else:
xml = self.core_buffer.messages[:]
text = '\n'.join(
- ('%s %s %s' % (msg.str_time, msg.nickname, clean_text(msg.txt))
+ ('%s %s %s' % (
+ msg.time.strftime('%H:%M:%S'),
+ 'IN' if msg.incoming else 'OUT',
+ clean_text(msg.txt))
for msg in xml))
filename = os.path.expandvars(os.path.expanduser(args[0]))
try:
@@ -283,7 +289,7 @@ class XMLTab(Tab):
self.input.do_command("/") # we add the slash
@refresh_wrapper.always
- def reset_help_message(self, _=None):
+ def reset_help_message(self, _=None) -> bool:
if self.closed:
return True
if self.core.tabs.current_tab is self:
@@ -291,10 +297,10 @@ class XMLTab(Tab):
self.input = self.default_help_message
return True
- def on_scroll_up(self):
+ def on_scroll_up(self) -> bool:
return self.text_win.scroll_up(self.text_win.height - 1)
- def on_scroll_down(self):
+ def on_scroll_down(self) -> bool:
return self.text_win.scroll_down(self.text_win.height - 1)
@command_args_parser.ignored
@@ -308,10 +314,11 @@ class XMLTab(Tab):
self.refresh()
self.core.doupdate()
- def execute_slash_command(self, txt):
+ def execute_slash_command(self, txt: str) -> bool:
if txt.startswith('/'):
- self.input.key_enter()
- self.execute_command(txt)
+ if isinstance(self.input, windows.CommandInput):
+ self.input.key_enter()
+ self.execute_command(txt)
return self.reset_help_message()
def completion(self):