summaryrefslogtreecommitdiff
path: root/poezio/tabs/basetabs.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/tabs/basetabs.py')
-rw-r--r--poezio/tabs/basetabs.py351
1 files changed, 210 insertions, 141 deletions
diff --git a/poezio/tabs/basetabs.py b/poezio/tabs/basetabs.py
index 73db87f2..793eae62 100644
--- a/poezio/tabs/basetabs.py
+++ b/poezio/tabs/basetabs.py
@@ -13,38 +13,57 @@ This module also defines ChatTabs, the parent class for all tabs
revolving around chats.
"""
-import copy
+from __future__ import annotations
+
import logging
import string
import asyncio
-import time
+from copy import copy
from math import ceil, log10
from datetime import datetime
from xml.etree import ElementTree as ET
+from xml.sax import SAXParseException
from typing import (
Any,
Callable,
+ cast,
Dict,
List,
Optional,
Union,
+ Tuple,
TYPE_CHECKING,
)
-from poezio import mam, poopt, timed_events, xhtml, windows
+from poezio import (
+ poopt,
+ timed_events,
+ xhtml,
+ windows
+)
from poezio.core.structs import Command, Completion, Status
-from poezio.common import safeJID
from poezio.config import config
from poezio.decorators import command_args_parser, refresh_wrapper
from poezio.logger import logger
+from poezio.log_loader import MAMFiller, LogLoader
from poezio.text_buffer import TextBuffer
from poezio.theming import get_theme, dump_tuple
-from poezio.windows.funcs import truncate_nick
+from poezio.user import User
+from poezio.ui.funcs import truncate_nick
+from poezio.timed_events import DelayedEvent
+from poezio.ui.types import (
+ BaseMessage,
+ Message,
+ PersistentInfoMessage,
+ LoggableTrait,
+)
-from slixmpp import JID, InvalidJID, Message
+from slixmpp import JID, InvalidJID, Message as SMessage
if TYPE_CHECKING:
from _curses import _CursesWindow # pylint: disable=E0611
+ from poezio.size_manager import SizeManager
+ from poezio.core.core import Core
log = logging.getLogger(__name__)
@@ -102,29 +121,42 @@ SHOW_NAME = {
class Tab:
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
# Placeholder values, set on resize
- height = 1
- width = 1
-
- def __init__(self, core):
+ height: int = 1
+ width: int = 1
+ core: Core
+ input: Optional[windows.Input]
+ key_func: Dict[str, Callable[[], Any]]
+ commands: Dict[str, Command]
+ need_resize: bool
+ ui_config_changed: bool
+
+ def __init__(self, core: Core):
self.core = core
self.nb = 0
- if not hasattr(self, 'name'):
- self.name = self.__class__.__name__
+ if not hasattr(self, '_name'):
+ self._name = self.__class__.__name__
self.input = None
self.closed = False
self._state = 'normal'
self._prev_state = None
self.need_resize = False
+ self.ui_config_changed = False
self.key_func = {} # each tab should add their keys in there
# and use them in on_input
self.commands = {} # and their own commands
@property
- def size(self) -> int:
+ def name(self) -> str:
+ if hasattr(self, '_name'):
+ return self._name
+ return ''
+
+ @property
+ def size(self) -> SizeManager:
return self.core.size
@staticmethod
@@ -133,23 +165,27 @@ class Tab:
Returns 1 or 0, depending on if we are using the vertical tab list
or not.
"""
- if config.get('enable_vertical_tab_list'):
+ if config.getbool('enable_vertical_tab_list'):
return 0
return 1
@property
- def info_win(self):
+ def info_win(self) -> windows.TextWin:
return self.core.information_win
@property
- def color(self):
+ def color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]:
return STATE_COLORS[self._state]()
@property
- def vertical_color(self):
+ def vertical_color(self) -> Union[Tuple[int, int], Tuple[int, int, 'str']]:
return VERTICAL_STATE_COLORS[self._state]()
@property
+ def priority(self) -> Union[int, float]:
+ return STATE_PRIORITY.get(self._state, -1)
+
+ @property
def state(self) -> str:
return self._state
@@ -187,7 +223,7 @@ class Tab:
self._state = 'normal'
@staticmethod
- def resize(scr: '_CursesWindow'):
+ def initial_resize(scr: _CursesWindow):
Tab.height, Tab.width = scr.getmaxyx()
windows.base_wins.TAB_WIN = scr
@@ -224,7 +260,7 @@ class Tab:
*,
desc='',
shortdesc='',
- completion: Optional[Callable] = None,
+ completion: Optional[Callable[[windows.Input], Completion]] = None,
usage=''):
"""
Add a command
@@ -276,7 +312,6 @@ class Tab:
comp = command.comp(the_input)
if comp:
return comp.run()
- return comp
return False
def execute_command(self, provided_text: str) -> bool:
@@ -284,8 +319,10 @@ class Tab:
Execute the command in the input and return False if
the input didn't contain a command
"""
+ if self.input is None:
+ raise NotImplementedError
txt = provided_text or self.input.key_enter()
- if txt.startswith('/') and not txt.startswith('//') and\
+ if txt and txt.startswith('/') and not txt.startswith('//') and\
not txt.startswith('/me '):
command = txt.strip().split()[0][1:]
arg = txt[2 + len(command):] # jump the '/' and the ' '
@@ -313,13 +350,16 @@ class Tab:
if func:
if hasattr(self.input, "reset_completion"):
self.input.reset_completion()
- func(arg)
+ if asyncio.iscoroutinefunction(func):
+ asyncio.create_task(func(arg))
+ else:
+ func(arg)
return True
else:
return False
- def refresh_tab_win(self):
- if config.get('enable_vertical_tab_list'):
+ def refresh_tab_win(self) -> None:
+ if config.getbool('enable_vertical_tab_list'):
left_tab_win = self.core.left_tab_win
if left_tab_win and not self.size.core_degrade_x:
left_tab_win.refresh()
@@ -350,24 +390,18 @@ class Tab:
"""
return self.name
- def get_text_window(self) -> Optional[windows.TextWin]:
- """
- Returns the principal TextWin window, if there's one
- """
- return None
-
def on_input(self, key: str, raw: bool):
"""
raw indicates if the key should activate the associated command or not.
"""
pass
- def update_commands(self):
+ def update_commands(self) -> None:
for c in self.plugin_commands:
if c not in self.commands:
self.commands[c] = self.plugin_commands[c]
- def update_keys(self):
+ def update_keys(self) -> None:
for k in self.plugin_keys:
if k not in self.key_func:
self.key_func[k] = self.plugin_keys[k]
@@ -426,7 +460,7 @@ class Tab:
"""
pass
- def on_close(self):
+ def on_close(self) -> None:
"""
Called when the tab is to be closed
"""
@@ -434,7 +468,7 @@ class Tab:
self.input.on_delete()
self.closed = True
- def matching_names(self) -> List[str]:
+ def matching_names(self) -> List[Tuple[int, str]]:
"""
Returns a list of strings that are used to name a tab with the /win
command. For example you could switch to a tab that returns
@@ -448,6 +482,9 @@ class Tab:
class GapTab(Tab):
+ def __init__(self):
+ return
+
def __bool__(self):
return False
@@ -455,7 +492,7 @@ class GapTab(Tab):
return 0
@property
- def name(self):
+ def name(self) -> str:
return ''
def refresh(self):
@@ -470,9 +507,14 @@ class ChatTab(Tab):
Also, ^M is already bound to on_enter
And also, add the /say command
"""
- plugin_commands = {} # type: Dict[str, Command]
- plugin_keys = {} # type: Dict[str, Callable]
+ plugin_commands: Dict[str, Command] = {}
+ plugin_keys: Dict[str, Callable] = {}
+ last_sent_message: Optional[SMessage]
message_type = 'chat'
+ timed_event_paused: Optional[DelayedEvent]
+ timed_event_not_paused: Optional[DelayedEvent]
+ mam_filler: Optional[MAMFiller]
+ e2e_encryption: Optional[str] = None
def __init__(self, core, jid: Union[JID, str]):
Tab.__init__(self, core)
@@ -483,19 +525,19 @@ class ChatTab(Tab):
self._jid = jid
#: Is the tab currently requesting MAM data?
self.query_status = False
- self.last_stanza_id = None
-
- self._name = jid.full # type: Optional[str]
- self.text_win = None
+ self._name = jid.full
+ self.text_win = windows.TextWin()
self.directed_presence = None
self._text_buffer = TextBuffer()
+ self._text_buffer.add_window(self.text_win)
+ self.mam_filler = None
self.chatstate = None # can be "active", "composing", "paused", "gone", "inactive"
# We keep a reference of the event that will set our chatstate to "paused", so that
# we can delete it or change it if we need to
self.timed_event_paused = None
self.timed_event_not_paused = None
# Keeps the last sent message to complete it easily in completion_correct, and to replace it.
- self.last_sent_message = {}
+ self.last_sent_message = None
self.key_func['M-v'] = self.move_separator
self.key_func['M-h'] = self.scroll_separator
self.key_func['M-/'] = self.last_words_completion
@@ -524,7 +566,7 @@ class ChatTab(Tab):
desc='Fix the last message with whatever you want.',
shortdesc='Correct the last message.',
completion=self.completion_correct)
- self.chat_state = None
+ self.chat_state: Optional[str] = None
self.update_commands()
self.update_keys()
@@ -544,13 +586,18 @@ class ChatTab(Tab):
if value.domain:
self._jid = value
except InvalidJID:
- self._name = value
+ self._name = str(value)
else:
raise TypeError("Name %r must be of type JID or str." % value)
@property
+ def log_name(self) -> str:
+ """Name used for the log filename"""
+ return self.jid.bare
+
+ @property
def jid(self) -> JID:
- return copy.copy(self._jid)
+ return copy(self._jid)
@jid.setter
def jid(self, value: JID) -> None:
@@ -563,53 +610,35 @@ class ChatTab(Tab):
def general_jid(self) -> JID:
raise NotImplementedError
- def log_message(self,
- txt: str,
- nickname: str,
- time: Optional[datetime] = None,
- typ=1):
+ def log_message(self, message: BaseMessage):
"""
Log the messages in the archives.
"""
- name = self.jid.bare
- if not logger.log_message(name, nickname, txt, date=time, typ=typ):
+ if not isinstance(message, LoggableTrait):
+ return
+ if not logger.log_message(self.log_name, message):
self.core.information('Unable to write in the log file', 'Error')
- def add_message(self,
- txt,
- time=None,
- nickname=None,
- forced_user=None,
- nick_color=None,
- identifier=None,
- jid=None,
- history=None,
- typ=1,
- highlight=False):
- self.log_message(txt, nickname, time=time, typ=typ)
- self._text_buffer.add_message(
- txt,
- time=time,
- nickname=nickname,
- highlight=highlight,
- nick_color=nick_color,
- history=history,
- user=forced_user,
- identifier=identifier,
- jid=jid)
+ def add_message(self, message: BaseMessage):
+ self.log_message(message)
+ self._text_buffer.add_message(message)
def modify_message(self,
- txt,
- old_id,
- new_id,
- user=None,
- jid=None,
- nickname=None):
- self.log_message(txt, nickname, typ=1)
+ txt: str,
+ old_id: str,
+ new_id: str,
+ time: Optional[datetime],
+ delayed: bool = False,
+ nickname: Optional[str] = None,
+ user: Optional[User] = None,
+ jid: Optional[JID] = None,
+ ) -> bool:
message = self._text_buffer.modify_message(
- txt, old_id, new_id, time=time, user=user, jid=jid)
+ txt, old_id, new_id, user=user, jid=jid, time=time
+ )
if message:
- self.text_win.modify_message(old_id, message)
+ self.log_message(message)
+ self.text_win.modify_message(message.identifier, message)
self.core.refresh_window()
return True
return False
@@ -630,16 +659,20 @@ class ChatTab(Tab):
for word in txt.split():
if len(word) >= 4 and word not in words:
words.append(word)
- words.extend([word for word in config.get('words').split(':') if word])
+ words.extend([word for word in config.getlist('words') if word])
self.input.auto_completion(words, ' ', quotify=False)
def on_enter(self):
+ if self.input is None:
+ raise NotImplementedError
txt = self.input.key_enter()
if txt:
if not self.execute_command(txt):
if txt.startswith('//'):
txt = txt[1:]
- self.command_say(xhtml.convert_simple_to_full_colors(txt))
+ asyncio.ensure_future(
+ self.command_say(xhtml.convert_simple_to_full_colors(txt))
+ )
self.cancel_paused_delay()
@command_args_parser.raw
@@ -651,19 +684,19 @@ class ChatTab(Tab):
if message:
message.send()
- def generate_xhtml_message(self, arg: str) -> Message:
+ def generate_xhtml_message(self, arg: str) -> Optional[SMessage]:
if not arg:
- return
+ return None
try:
body = xhtml.clean_text(
xhtml.xhtml_to_poezio_colors(arg, force=True))
ET.fromstring(arg)
- except:
+ except SAXParseException:
self.core.information('Could not send custom xhtml', 'Error')
- log.error('/xhtml: Unable to send custom xhtml', exc_info=True)
- return
+ log.error('/xhtml: Unable to send custom xhtml')
+ return None
- msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid())
msg['body'] = body
msg.enable('html')
msg['html']['body'] = arg
@@ -680,27 +713,31 @@ class ChatTab(Tab):
self._text_buffer.messages = []
self.text_win.rebuild_everything(self._text_buffer)
- def check_send_chat_state(self):
+ def check_send_chat_state(self) -> bool:
"If we should send a chat state"
return True
- def send_chat_state(self, state, always_send=False):
+ def send_chat_state(self, state: str, always_send: bool = False) -> None:
"""
Send an empty chatstate message
"""
+ from poezio.tabs import PrivateTab
+
if self.check_send_chat_state():
if state in ('active', 'inactive',
'gone') and self.inactive and not always_send:
return
if config.get_by_tabname('send_chat_states', self.general_jid):
- msg = self.core.xmpp.make_message(self.get_dest_jid())
+ msg: SMessage = self.core.xmpp.make_message(self.get_dest_jid())
msg['type'] = self.message_type
msg['chat_state'] = state
self.chat_state = state
+ msg['no-store'] = True
+ if isinstance(self, PrivateTab):
+ msg.enable('muc')
msg.send()
- return True
- def send_composing_chat_state(self, empty_after):
+ def send_composing_chat_state(self, empty_after: bool) -> None:
"""
Send the "active" or "composing" chatstate, depending
on the the current status of the input
@@ -736,7 +773,7 @@ class ChatTab(Tab):
self.core.add_timed_event(new_event)
self.timed_event_not_paused = new_event
- def cancel_paused_delay(self):
+ def cancel_paused_delay(self) -> None:
"""
Remove that event from the list and set it to None.
Called for example when the input is emptied, or when the message
@@ -745,11 +782,22 @@ class ChatTab(Tab):
if self.timed_event_paused is not None:
self.core.remove_timed_event(self.timed_event_paused)
self.timed_event_paused = None
- self.core.remove_timed_event(self.timed_event_not_paused)
- self.timed_event_not_paused = None
+ if self.timed_event_not_paused is not None:
+ self.core.remove_timed_event(self.timed_event_not_paused)
+ self.timed_event_not_paused = None
+
+ def set_last_sent_message(self, msg: SMessage, correct: bool = False) -> None:
+ """Ensure last_sent_message is set with the correct attributes"""
+ if correct:
+ # XXX: Is the copy needed. Is the object passed here reused
+ # afterwards? Who knows.
+ msg = cast(SMessage, copy(msg))
+ if self.last_sent_message is not None:
+ msg['id'] = self.last_sent_message['id']
+ self.last_sent_message = msg
@command_args_parser.raw
- def command_correct(self, line):
+ async def command_correct(self, line: str) -> None:
"""
/correct <fixed message>
"""
@@ -759,7 +807,7 @@ class ChatTab(Tab):
if not self.last_sent_message:
self.core.information('There is no message to correct.', 'Error')
return
- self.command_say(line, correct=True)
+ await self.command_say(line, correct=True)
def completion_correct(self, the_input):
if self.last_sent_message and the_input.get_argument_position() == 1:
@@ -772,32 +820,37 @@ class ChatTab(Tab):
@property
def inactive(self) -> bool:
"""Whether we should send inactive or active as a chatstate"""
- return self.core.status.show in ('xa', 'away') or\
- (hasattr(self, 'directed_presence') and not self.directed_presence)
+ return self.core.status.show in ('xa', 'away') or (
+ hasattr(self, 'directed_presence')
+ and self.directed_presence is not None
+ and self.directed_presence
+ )
- def move_separator(self):
+ def move_separator(self) -> None:
self.text_win.remove_line_separator()
self.text_win.add_line_separator(self._text_buffer)
self.text_win.refresh()
- self.input.refresh()
+ if self.input:
+ self.input.refresh()
def get_conversation_messages(self):
return self._text_buffer.messages
- def check_scrolled(self):
+ def check_scrolled(self) -> None:
if self.text_win.pos != 0:
self.state = 'scrolled'
@command_args_parser.raw
- def command_say(self, line, correct=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
pass
def goto_build_lines(self, new_date):
text_buffer = self._text_buffer
built_lines = []
message_count = 0
- timestamp = config.get('show_timestamps')
- nick_size = config.get('max_nick_length')
+ timestamp = config.getbool('show_timestamps')
+ nick_size = config.getint('max_nick_length')
+ theme = get_theme()
for message in text_buffer.messages:
# Build lines of a message
txt = message.txt
@@ -816,12 +869,8 @@ class ChatTab(Tab):
if message.me:
offset += 1
if timestamp:
- if message.str_time:
- offset += 1 + len(message.str_time)
- if theme.CHAR_TIME_LEFT and message.str_time:
- offset += 1
- if theme.CHAR_TIME_RIGHT and message.str_time:
- offset += 1
+ if message.history:
+ offset += 1 + theme.LONG_TIME_FORMAT_LENGTH
lines = poopt.cut_text(txt, self.text_win.width - offset - 1)
for line in lines:
built_lines.append(line)
@@ -926,7 +975,10 @@ class ChatTab(Tab):
def on_scroll_up(self):
if not self.query_status:
- asyncio.ensure_future(mam.on_scroll_up(tab=self))
+ from poezio.log_loader import LogLoader
+ asyncio.create_task(
+ LogLoader(logger, self, config.getbool('use_log')).scroll_requested()
+ )
return self.text_win.scroll_up(self.text_win.height - 1)
def on_scroll_down(self):
@@ -944,15 +996,15 @@ class ChatTab(Tab):
class OneToOneTab(ChatTab):
- def __init__(self, core, jid):
+ def __init__(self, core, jid, initial=None):
ChatTab.__init__(self, core, jid)
self.__status = Status("", "")
self.last_remote_message = datetime.now()
+ self._initial_log = asyncio.Event()
# Set to true once the first disco is done
self.__initial_disco = False
- self.check_features()
self.register_command(
'unquery', self.command_unquery, shortdesc='Close the tab.')
self.register_command(
@@ -964,6 +1016,30 @@ class OneToOneTab(ChatTab):
shortdesc='Request the attention.',
desc='Attention: Request the attention of the contact. Can also '
'send a message along with the attention.')
+ asyncio.create_task(self.init_logs(initial=initial))
+
+ async def init_logs(self, initial: Optional[SMessage] = None) -> None:
+ use_log = config.get_by_tabname('use_log', self.jid)
+ mam_sync = config.get_by_tabname('mam_sync', self.jid)
+ if use_log and mam_sync:
+ limit = config.get_by_tabname('mam_sync_limit', self.jid)
+ mam_filler = MAMFiller(logger, self, limit)
+ self.mam_filler = mam_filler
+
+ if initial is not None:
+ # If there is an initial message, throw it back into the
+ # text buffer if it cannot be fetched from mam
+ await mam_filler.done.wait()
+ if mam_filler.result == 0:
+ await self.handle_message(initial)
+ elif use_log and initial:
+ await self.handle_message(initial, display=False)
+ elif initial:
+ await self.handle_message(initial)
+ await LogLoader(logger, self, use_log, self._initial_log).tab_open()
+
+ async def handle_message(self, msg: SMessage, display: bool = True):
+ pass
def remote_user_color(self):
return dump_tuple(get_theme().COLOR_REMOTE_USER)
@@ -990,7 +1066,9 @@ class OneToOneTab(ChatTab):
msg += 'status: %s, ' % status.message
if status.show in SHOW_NAME:
msg += 'show: %s, ' % SHOW_NAME[status.show]
- self.add_message(msg[:-2], typ=2)
+ self.add_message(
+ PersistentInfoMessage(txt=msg[:-2])
+ )
def ack_message(self, msg_id: str, msg_jid: JID):
"""
@@ -1022,26 +1100,21 @@ class OneToOneTab(ChatTab):
message.send()
body = xhtml.xhtml_to_poezio_colors(xhtml_data, force=True)
self._text_buffer.add_message(
- body,
- nickname=self.core.own_nick,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=message['id'],
- jid=self.core.xmpp.boundjid)
+ Message(
+ body,
+ nickname=self.core.own_nick,
+ nick_color=get_theme().COLOR_OWN_NICK,
+ identifier=message['id'],
+ jid=self.core.xmpp.boundjid,
+ )
+ )
self.refresh()
- def check_features(self):
- "check the features supported by the other party"
- if safeJID(self.get_dest_jid()).resource:
- self.core.xmpp.plugin['xep_0030'].get_info(
- jid=self.get_dest_jid(),
- timeout=5,
- callback=self.features_checked)
-
@command_args_parser.raw
- def command_attention(self, message):
+ async def command_attention(self, message):
"""/attention [message]"""
if message != '':
- self.command_say(message, attention=True)
+ await self.command_say(message, attention=True)
else:
msg = self.core.xmpp.make_message(self.get_dest_jid())
msg['type'] = 'chat'
@@ -1049,7 +1122,7 @@ class OneToOneTab(ChatTab):
msg.send()
@command_args_parser.raw
- def command_say(self, line, correct=False, attention=False):
+ async def command_say(self, line: str, attention: bool = False, correct: bool = False):
pass
@command_args_parser.ignored
@@ -1073,7 +1146,3 @@ class OneToOneTab(ChatTab):
msg = msg % (self.name, feature, command_name)
self.core.information(msg, 'Info')
return True
-
- def features_checked(self, iq):
- "Features check callback"
- features = iq['disco_info'].get_features() or []