diff options
Diffstat (limited to 'poezio/text_buffer.py')
-rw-r--r-- | poezio/text_buffer.py | 127 |
1 files changed, 121 insertions, 6 deletions
diff --git a/poezio/text_buffer.py b/poezio/text_buffer.py index 3b3ac051..ff853a67 100644 --- a/poezio/text_buffer.py +++ b/poezio/text_buffer.py @@ -12,6 +12,7 @@ import logging log = logging.getLogger(__name__) from typing import ( + cast, Dict, List, Optional, @@ -19,9 +20,15 @@ from typing import ( Tuple, Union, ) +from dataclasses import dataclass from datetime import datetime from poezio.config import config -from poezio.ui.types import Message, BaseMessage +from poezio.ui.types import ( + BaseMessage, + Message, + MucOwnJoinMessage, + MucOwnLeaveMessage, +) if TYPE_CHECKING: from poezio.windows.text_win import TextWin @@ -35,6 +42,15 @@ class AckError(Exception): pass +@dataclass +class HistoryGap: + """Class representing a period of non-presence inside a MUC""" + leave_message: Optional[BaseMessage] + join_message: Optional[BaseMessage] + last_timestamp_before_leave: Optional[datetime] + first_timestamp_after_join: Optional[datetime] + + class TextBuffer: """ This class just keep trace of messages, in a list with various @@ -44,7 +60,7 @@ class TextBuffer: def __init__(self, messages_nb_limit: Optional[int] = None) -> None: if messages_nb_limit is None: - messages_nb_limit = config.get('max_messages_in_memory') + messages_nb_limit = cast(int, config.get('max_messages_in_memory')) self._messages_nb_limit = messages_nb_limit # type: int # Message objects self.messages = [] # type: List[BaseMessage] @@ -58,6 +74,99 @@ class TextBuffer: def add_window(self, win) -> None: self._windows.append(win) + def find_last_gap_muc(self) -> Optional[HistoryGap]: + """Find the last known history gap contained in buffer""" + leave = None # type:Optional[Tuple[int, BaseMessage]] + join = None # type:Optional[Tuple[int, BaseMessage]] + for i, item in enumerate(reversed(self.messages)): + if isinstance(item, MucOwnLeaveMessage): + leave = (len(self.messages) - i - 1, item) + break + elif join and isinstance(item, MucOwnJoinMessage): + leave = (len(self.messages) - i - 1, item) + break + if isinstance(item, MucOwnJoinMessage): + join = (len(self.messages) - i - 1, item) + + last_timestamp = None + first_timestamp = datetime.now() + + # Identify the special case when we got disconnected from a chatroom + # without receiving or sending the relevant presence, therefore only + # having two joins with no leave, and messages in the middle. + if leave and join and isinstance(leave[1], MucOwnJoinMessage): + for i in range(join[0] - 1, leave[0], - 1): + if isinstance(self.messages[i], Message): + leave = ( + i, + self.messages[i] + ) + last_timestamp = self.messages[i].time + break + # If we have a normal gap but messages inbetween, it probably + # already has history, so abort there without returning it. + if join and leave: + for i in range(leave[0] + 1, join[0], 1): + if isinstance(self.messages[i], Message): + return None + elif not (join or leave): + return None + + # If a leave message is found, get the last Message timestamp + # before it. + if leave is None: + leave_msg = None + elif last_timestamp is None: + leave_msg = leave[1] + for i in range(leave[0], 0, -1): + if isinstance(self.messages[i], Message): + last_timestamp = self.messages[i].time + break + else: + leave_msg = leave[1] + # If a join message is found, get the first Message timestamp + # after it, or the current time. + if join is None: + join_msg = None + else: + join_msg = join[1] + for i in range(join[0], len(self.messages)): + msg = self.messages[i] + if isinstance(msg, Message) and msg.time < first_timestamp: + first_timestamp = msg.time + break + return HistoryGap( + leave_message=leave_msg, + join_message=join_msg, + last_timestamp_before_leave=last_timestamp, + first_timestamp_after_join=first_timestamp, + ) + + def get_gap_index(self, gap: HistoryGap) -> Optional[int]: + """Find the first index to insert into inside a gap""" + if gap.leave_message is None: + return 0 + for i, msg in enumerate(self.messages): + if msg is gap.leave_message: + return i + 1 + return None + + def add_history_messages(self, messages: List[BaseMessage], gap: Optional[HistoryGap] = None) -> None: + """Insert history messages at their correct place """ + index = 0 + new_index = None + if gap is not None: + new_index = self.get_gap_index(gap) + if new_index is None: # Not sure what happened, abort + return + index = new_index + for message in messages: + self.messages.insert(index, message) + index += 1 + log.debug('inserted message: %s', message) + for window in self._windows: # make the associated windows + window.rebuild_everything(self) + @property def last_message(self) -> Optional[BaseMessage]: return self.messages[-1] if self.messages else None @@ -72,8 +181,8 @@ class TextBuffer: self.messages.pop(0) ret_val = 0 - show_timestamps = config.get('show_timestamps') - nick_size = config.get('max_nick_length') + show_timestamps = cast(bool, config.get('show_timestamps')) + nick_size = cast(int, config.get('max_nick_length')) for window in self._windows: # make the associated windows # build the lines from the new message nb = window.build_new_message( @@ -82,8 +191,7 @@ class TextBuffer: nick_size=nick_size) if ret_val == 0: ret_val = nb - top = isinstance(msg, Message) and msg.top - if window.pos != 0 and top is False: + if window.pos != 0: window.scroll_up(nb) return min(ret_val, 1) @@ -197,6 +305,13 @@ class TextBuffer: def del_window(self, win) -> None: self._windows.remove(win) + def find_last_message(self) -> Optional[Message]: + """Find the last real message received in this buffer""" + for message in reversed(self.messages): + if isinstance(message, Message): + return message + return None + def __del__(self): size = len(self.messages) log.debug('** Deleting %s messages from textbuffer', size) |