diff options
Diffstat (limited to 'poezio/tabs/basetabs.py')
-rw-r--r-- | poezio/tabs/basetabs.py | 351 |
1 files changed, 210 insertions, 141 deletions
diff --git a/poezio/tabs/basetabs.py b/poezio/tabs/basetabs.py index 73db87f2..793eae62 100644 --- a/poezio/tabs/basetabs.py +++ b/poezio/tabs/basetabs.py @@ -13,38 +13,57 @@ This module also defines ChatTabs, the parent class for all tabs revolving around chats. """ -import copy +from __future__ import annotations + import logging import string import asyncio -import time +from copy import copy from math import ceil, log10 from datetime import datetime from xml.etree import ElementTree as ET +from xml.sax import SAXParseException from typing import ( Any, Callable, + cast, Dict, List, Optional, Union, + Tuple, TYPE_CHECKING, ) -from poezio import mam, poopt, timed_events, xhtml, windows +from poezio import ( + poopt, + timed_events, + xhtml, + windows +) from poezio.core.structs import Command, Completion, Status -from poezio.common import safeJID from poezio.config import config from poezio.decorators import command_args_parser, refresh_wrapper from poezio.logger import logger +from poezio.log_loader import MAMFiller, LogLoader from poezio.text_buffer import TextBuffer from poezio.theming import get_theme, dump_tuple -from poezio.windows.funcs import truncate_nick +from poezio.user import User +from poezio.ui.funcs import truncate_nick +from poezio.timed_events import DelayedEvent +from poezio.ui.types import ( + BaseMessage, + Message, + PersistentInfoMessage, + LoggableTrait, +) -from slixmpp import JID, InvalidJID, Message +from slixmpp import JID, InvalidJID, Message as SMessage if TYPE_CHECKING: from _curses import _CursesWindow # pylint: disable=E0611 + from poezio.size_manager import SizeManager + from poezio.core.core import Core log = logging.getLogger(__name__) @@ -102,29 +121,42 @@ SHOW_NAME = { class Tab: - plugin_commands = {} # type: Dict[str, Command] - plugin_keys = {} # type: Dict[str, Callable] + plugin_commands: Dict[str, Command] = {} + plugin_keys: Dict[str, Callable] = {} # Placeholder values, set on resize - height = 1 - width = 1 - - def __init__(self, core): + height: int = 1 + width: int = 1 + core: Core + input: Optional[windows.Input] + key_func: Dict[str, Callable[[], Any]] + commands: Dict[str, Command] + need_resize: bool + ui_config_changed: bool + + def __init__(self, core: Core): self.core = core self.nb = 0 - if not hasattr(self, 'name'): - self.name = self.__class__.__name__ + if not hasattr(self, '_name'): + self._name = self.__class__.__name__ self.input = None self.closed = False self._state = 'normal' self._prev_state = None self.need_resize = False + self.ui_config_changed = False self.key_func = {} # each tab should add their keys in there # and use them in on_input self.commands = {} # and their own commands @property - def size(self) -> int: + def name(self) -> str: + if hasattr(self, '_name'): + return self._name + return '' + + @property + def size(self) -> SizeManager: return self.core.size @staticmethod @@ -133,23 +165,27 @@ class Tab: Returns 1 or 0, depending on if we are using the vertical tab list or not. """ - if config.get('enable_vertical_tab_list'): + if config.getbool('enable_vertical_tab_list'): return 0 return 1 @property - def info_win(self): + def info_win(self) -> windows.TextWin: return self.core.information_win @property - def color(self): + def color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]: return STATE_COLORS[self._state]() @property - def vertical_color(self): + def vertical_color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]: return VERTICAL_STATE_COLORS[self._state]() @property + def priority(self) -> Union[int, float]: + return STATE_PRIORITY.get(self._state, -1) + + @property def state(self) -> str: return self._state @@ -187,7 +223,7 @@ class Tab: self._state = 'normal' @staticmethod - def resize(scr: '_CursesWindow'): + def initial_resize(scr: _CursesWindow): Tab.height, Tab.width = scr.getmaxyx() windows.base_wins.TAB_WIN = scr @@ -224,7 +260,7 @@ class Tab: *, desc='', shortdesc='', - completion: Optional[Callable] = None, + completion: Optional[Callable[[windows.Input], Completion]] = None, usage=''): """ Add a command @@ -276,7 +312,6 @@ class Tab: comp = command.comp(the_input) if comp: return comp.run() - return comp return False def execute_command(self, provided_text: str) -> bool: @@ -284,8 +319,10 @@ class Tab: Execute the command in the input and return False if the input didn't contain a command """ + if self.input is None: + raise NotImplementedError txt = provided_text or self.input.key_enter() - if txt.startswith('/') and not txt.startswith('//') and\ + if txt and txt.startswith('/') and not txt.startswith('//') and\ not txt.startswith('/me '): command = txt.strip().split()[0][1:] arg = txt[2 + len(command):] # jump the '/' and the ' ' @@ -313,13 +350,16 @@ class Tab: if func: if hasattr(self.input, "reset_completion"): self.input.reset_completion() - func(arg) + if asyncio.iscoroutinefunction(func): + asyncio.create_task(func(arg)) + else: + func(arg) return True else: return False - def refresh_tab_win(self): - if config.get('enable_vertical_tab_list'): + def refresh_tab_win(self) -> None: + if config.getbool('enable_vertical_tab_list'): left_tab_win = self.core.left_tab_win if left_tab_win and not self.size.core_degrade_x: left_tab_win.refresh() @@ -350,24 +390,18 @@ class Tab: """ return self.name - def get_text_window(self) -> Optional[windows.TextWin]: - """ - Returns the principal TextWin window, if there's one - """ - return None - def on_input(self, key: str, raw: bool): """ raw indicates if the key should activate the associated command or not. """ pass - def update_commands(self): + def update_commands(self) -> None: for c in self.plugin_commands: if c not in self.commands: self.commands[c] = self.plugin_commands[c] - def update_keys(self): + def update_keys(self) -> None: for k in self.plugin_keys: if k not in self.key_func: self.key_func[k] = self.plugin_keys[k] @@ -426,7 +460,7 @@ class Tab: """ pass - def on_close(self): + def on_close(self) -> None: """ Called when the tab is to be closed """ @@ -434,7 +468,7 @@ class Tab: self.input.on_delete() self.closed = True - def matching_names(self) -> List[str]: + def matching_names(self) -> List[Tuple[int, str]]: """ Returns a list of strings that are used to name a tab with the /win command. For example you could switch to a tab that returns @@ -448,6 +482,9 @@ class Tab: class GapTab(Tab): + def __init__(self): + return + def __bool__(self): return False @@ -455,7 +492,7 @@ class GapTab(Tab): return 0 @property - def name(self): + def name(self) -> str: return '' def refresh(self): @@ -470,9 +507,14 @@ class ChatTab(Tab): Also, ^M is already bound to on_enter And also, add the /say command """ - plugin_commands = {} # type: Dict[str, Command] - plugin_keys = {} # type: Dict[str, Callable] + plugin_commands: Dict[str, Command] = {} + plugin_keys: Dict[str, Callable] = {} + last_sent_message: Optional[SMessage] message_type = 'chat' + timed_event_paused: Optional[DelayedEvent] + timed_event_not_paused: Optional[DelayedEvent] + mam_filler: Optional[MAMFiller] + e2e_encryption: Optional[str] = None def __init__(self, core, jid: Union[JID, str]): Tab.__init__(self, core) @@ -483,19 +525,19 @@ class ChatTab(Tab): self._jid = jid #: Is the tab currently requesting MAM data? self.query_status = False - self.last_stanza_id = None - - self._name = jid.full # type: Optional[str] - self.text_win = None + self._name = jid.full + self.text_win = windows.TextWin() self.directed_presence = None self._text_buffer = TextBuffer() + self._text_buffer.add_window(self.text_win) + self.mam_filler = None self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive" # We keep a reference of the event that will set our chatstate to "paused", so that # we can delete it or change it if we need to self.timed_event_paused = None self.timed_event_not_paused = None # Keeps the last sent message to complete it easily in completion_correct, and to replace it. - self.last_sent_message = {} + self.last_sent_message = None self.key_func['M-v'] = self.move_separator self.key_func['M-h'] = self.scroll_separator self.key_func['M-/'] = self.last_words_completion @@ -524,7 +566,7 @@ class ChatTab(Tab): desc='Fix the last message with whatever you want.', shortdesc='Correct the last message.', completion=self.completion_correct) - self.chat_state = None + self.chat_state: Optional[str] = None self.update_commands() self.update_keys() @@ -544,13 +586,18 @@ class ChatTab(Tab): if value.domain: self._jid = value except InvalidJID: - self._name = value + self._name = str(value) else: raise TypeError("Name %r must be of type JID or str." % value) @property + def log_name(self) -> str: + """Name used for the log filename""" + return self.jid.bare + + @property def jid(self) -> JID: - return copy.copy(self._jid) + return copy(self._jid) @jid.setter def jid(self, value: JID) -> None: @@ -563,53 +610,35 @@ class ChatTab(Tab): def general_jid(self) -> JID: raise NotImplementedError - def log_message(self, - txt: str, - nickname: str, - time: Optional[datetime] = None, - typ=1): + def log_message(self, message: BaseMessage): """ Log the messages in the archives. """ - name = self.jid.bare - if not logger.log_message(name, nickname, txt, date=time, typ=typ): + if not isinstance(message, LoggableTrait): + return + if not logger.log_message(self.log_name, message): self.core.information('Unable to write in the log file', 'Error') - def add_message(self, - txt, - time=None, - nickname=None, - forced_user=None, - nick_color=None, - identifier=None, - jid=None, - history=None, - typ=1, - highlight=False): - self.log_message(txt, nickname, time=time, typ=typ) - self._text_buffer.add_message( - txt, - time=time, - nickname=nickname, - highlight=highlight, - nick_color=nick_color, - history=history, - user=forced_user, - identifier=identifier, - jid=jid) + def add_message(self, message: BaseMessage): + self.log_message(message) + self._text_buffer.add_message(message) def modify_message(self, - txt, - old_id, - new_id, - user=None, - jid=None, - nickname=None): - self.log_message(txt, nickname, typ=1) + txt: str, + old_id: str, + new_id: str, + time: Optional[datetime], + delayed: bool = False, + nickname: Optional[str] = None, + user: Optional[User] = None, + jid: Optional[JID] = None, + ) -> bool: message = self._text_buffer.modify_message( - txt, old_id, new_id, time=time, user=user, jid=jid) + txt, old_id, new_id, user=user, jid=jid, time=time + ) if message: - self.text_win.modify_message(old_id, message) + self.log_message(message) + self.text_win.modify_message(message.identifier, message) self.core.refresh_window() return True return False @@ -630,16 +659,20 @@ class ChatTab(Tab): for word in txt.split(): if len(word) >= 4 and word not in words: words.append(word) - words.extend([word for word in config.get('words').split(':') if word]) + words.extend([word for word in config.getlist('words') if word]) self.input.auto_completion(words, ' ', quotify=False) def on_enter(self): + if self.input is None: + raise NotImplementedError txt = self.input.key_enter() if txt: if not self.execute_command(txt): if txt.startswith('//'): txt = txt[1:] - self.command_say(xhtml.convert_simple_to_full_colors(txt)) + asyncio.ensure_future( + self.command_say(xhtml.convert_simple_to_full_colors(txt)) + ) self.cancel_paused_delay() @command_args_parser.raw @@ -651,19 +684,19 @@ class ChatTab(Tab): if message: message.send() - def generate_xhtml_message(self, arg: str) -> Message: + def generate_xhtml_message(self, arg: str) -> Optional[SMessage]: if not arg: - return + return None try: body = xhtml.clean_text( xhtml.xhtml_to_poezio_colors(arg, force=True)) ET.fromstring(arg) - except: + except SAXParseException: self.core.information('Could not send custom xhtml', 'Error') - log.error('/xhtml: Unable to send custom xhtml', exc_info=True) - return + log.error('/xhtml: Unable to send custom xhtml') + return None - msg = self.core.xmpp.make_message(self.get_dest_jid()) + msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid()) msg['body'] = body msg.enable('html') msg['html']['body'] = arg @@ -680,27 +713,31 @@ class ChatTab(Tab): self._text_buffer.messages = [] self.text_win.rebuild_everything(self._text_buffer) - def check_send_chat_state(self): + def check_send_chat_state(self) -> bool: "If we should send a chat state" return True - def send_chat_state(self, state, always_send=False): + def send_chat_state(self, state: str, always_send: bool = False) -> None: """ Send an empty chatstate message """ + from poezio.tabs import PrivateTab + if self.check_send_chat_state(): if state in ('active', 'inactive', 'gone') and self.inactive and not always_send: return if config.get_by_tabname('send_chat_states', self.general_jid): - msg = self.core.xmpp.make_message(self.get_dest_jid()) + msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid()) msg['type'] = self.message_type msg['chat_state'] = state self.chat_state = state + msg['no-store'] = True + if isinstance(self, PrivateTab): + msg.enable('muc') msg.send() - return True - def send_composing_chat_state(self, empty_after): + def send_composing_chat_state(self, empty_after: bool) -> None: """ Send the "active" or "composing" chatstate, depending on the the current status of the input @@ -736,7 +773,7 @@ class ChatTab(Tab): self.core.add_timed_event(new_event) self.timed_event_not_paused = new_event - def cancel_paused_delay(self): + def cancel_paused_delay(self) -> None: """ Remove that event from the list and set it to None. Called for example when the input is emptied, or when the message @@ -745,11 +782,22 @@ class ChatTab(Tab): if self.timed_event_paused is not None: self.core.remove_timed_event(self.timed_event_paused) self.timed_event_paused = None - self.core.remove_timed_event(self.timed_event_not_paused) - self.timed_event_not_paused = None + if self.timed_event_not_paused is not None: + self.core.remove_timed_event(self.timed_event_not_paused) + self.timed_event_not_paused = None + + def set_last_sent_message(self, msg: SMessage, correct: bool = False) -> None: + """Ensure last_sent_message is set with the correct attributes""" + if correct: + # XXX: Is the copy needed. Is the object passed here reused + # afterwards? Who knows. + msg = cast(SMessage, copy(msg)) + if self.last_sent_message is not None: + msg['id'] = self.last_sent_message['id'] + self.last_sent_message = msg @command_args_parser.raw - def command_correct(self, line): + async def command_correct(self, line: str) -> None: """ /correct <fixed message> """ @@ -759,7 +807,7 @@ class ChatTab(Tab): if not self.last_sent_message: self.core.information('There is no message to correct.', 'Error') return - self.command_say(line, correct=True) + await self.command_say(line, correct=True) def completion_correct(self, the_input): if self.last_sent_message and the_input.get_argument_position() == 1: @@ -772,32 +820,37 @@ class ChatTab(Tab): @property def inactive(self) -> bool: """Whether we should send inactive or active as a chatstate""" - return self.core.status.show in ('xa', 'away') or\ - (hasattr(self, 'directed_presence') and not self.directed_presence) + return self.core.status.show in ('xa', 'away') or ( + hasattr(self, 'directed_presence') + and self.directed_presence is not None + and self.directed_presence + ) - def move_separator(self): + def move_separator(self) -> None: self.text_win.remove_line_separator() self.text_win.add_line_separator(self._text_buffer) self.text_win.refresh() - self.input.refresh() + if self.input: + self.input.refresh() def get_conversation_messages(self): return self._text_buffer.messages - def check_scrolled(self): + def check_scrolled(self) -> None: if self.text_win.pos != 0: self.state = 'scrolled' @command_args_parser.raw - def command_say(self, line, correct=False): + async def command_say(self, line: str, attention: bool = False, correct: bool = False): pass def goto_build_lines(self, new_date): text_buffer = self._text_buffer built_lines = [] message_count = 0 - timestamp = config.get('show_timestamps') - nick_size = config.get('max_nick_length') + timestamp = config.getbool('show_timestamps') + nick_size = config.getint('max_nick_length') + theme = get_theme() for message in text_buffer.messages: # Build lines of a message txt = message.txt @@ -816,12 +869,8 @@ class ChatTab(Tab): if message.me: offset += 1 if timestamp: - if message.str_time: - offset += 1 + len(message.str_time) - if theme.CHAR_TIME_LEFT and message.str_time: - offset += 1 - if theme.CHAR_TIME_RIGHT and message.str_time: - offset += 1 + if message.history: + offset += 1 + theme.LONG_TIME_FORMAT_LENGTH lines = poopt.cut_text(txt, self.text_win.width - offset - 1) for line in lines: built_lines.append(line) @@ -926,7 +975,10 @@ class ChatTab(Tab): def on_scroll_up(self): if not self.query_status: - asyncio.ensure_future(mam.on_scroll_up(tab=self)) + from poezio.log_loader import LogLoader + asyncio.create_task( + LogLoader(logger, self, config.getbool('use_log')).scroll_requested() + ) return self.text_win.scroll_up(self.text_win.height - 1) def on_scroll_down(self): @@ -944,15 +996,15 @@ class ChatTab(Tab): class OneToOneTab(ChatTab): - def __init__(self, core, jid): + def __init__(self, core, jid, initial=None): ChatTab.__init__(self, core, jid) self.__status = Status("", "") self.last_remote_message = datetime.now() + self._initial_log = asyncio.Event() # Set to true once the first disco is done self.__initial_disco = False - self.check_features() self.register_command( 'unquery', self.command_unquery, shortdesc='Close the tab.') self.register_command( @@ -964,6 +1016,30 @@ class OneToOneTab(ChatTab): shortdesc='Request the attention.', desc='Attention: Request the attention of the contact. Can also ' 'send a message along with the attention.') + asyncio.create_task(self.init_logs(initial=initial)) + + async def init_logs(self, initial: Optional[SMessage] = None) -> None: + use_log = config.get_by_tabname('use_log', self.jid) + mam_sync = config.get_by_tabname('mam_sync', self.jid) + if use_log and mam_sync: + limit = config.get_by_tabname('mam_sync_limit', self.jid) + mam_filler = MAMFiller(logger, self, limit) + self.mam_filler = mam_filler + + if initial is not None: + # If there is an initial message, throw it back into the + # text buffer if it cannot be fetched from mam + await mam_filler.done.wait() + if mam_filler.result == 0: + await self.handle_message(initial) + elif use_log and initial: + await self.handle_message(initial, display=False) + elif initial: + await self.handle_message(initial) + await LogLoader(logger, self, use_log, self._initial_log).tab_open() + + async def handle_message(self, msg: SMessage, display: bool = True): + pass def remote_user_color(self): return dump_tuple(get_theme().COLOR_REMOTE_USER) @@ -990,7 +1066,9 @@ class OneToOneTab(ChatTab): msg += 'status: %s, ' % status.message if status.show in SHOW_NAME: msg += 'show: %s, ' % SHOW_NAME[status.show] - self.add_message(msg[:-2], typ=2) + self.add_message( + PersistentInfoMessage(txt=msg[:-2]) + ) def ack_message(self, msg_id: str, msg_jid: JID): """ @@ -1022,26 +1100,21 @@ class OneToOneTab(ChatTab): message.send() body = xhtml.xhtml_to_poezio_colors(xhtml_data, force=True) self._text_buffer.add_message( - body, - nickname=self.core.own_nick, - nick_color=get_theme().COLOR_OWN_NICK, - identifier=message['id'], - jid=self.core.xmpp.boundjid) + Message( + body, + nickname=self.core.own_nick, + nick_color=get_theme().COLOR_OWN_NICK, + identifier=message['id'], + jid=self.core.xmpp.boundjid, + ) + ) self.refresh() - def check_features(self): - "check the features supported by the other party" - if safeJID(self.get_dest_jid()).resource: - self.core.xmpp.plugin['xep_0030'].get_info( - jid=self.get_dest_jid(), - timeout=5, - callback=self.features_checked) - @command_args_parser.raw - def command_attention(self, message): + async def command_attention(self, message): """/attention [message]""" if message != '': - self.command_say(message, attention=True) + await self.command_say(message, attention=True) else: msg = self.core.xmpp.make_message(self.get_dest_jid()) msg['type'] = 'chat' @@ -1049,7 +1122,7 @@ class OneToOneTab(ChatTab): msg.send() @command_args_parser.raw - def command_say(self, line, correct=False, attention=False): + async def command_say(self, line: str, attention: bool = False, correct: bool = False): pass @command_args_parser.ignored @@ -1073,7 +1146,3 @@ class OneToOneTab(ChatTab): msg = msg % (self.name, feature, command_name) self.core.information(msg, 'Info') return True - - def features_checked(self, iq): - "Features check callback" - features = iq['disco_info'].get_features() or [] |