summaryrefslogtreecommitdiff
path: root/poezio/text_buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/text_buffer.py')
-rw-r--r--poezio/text_buffer.py127
1 files changed, 121 insertions, 6 deletions
diff --git a/poezio/text_buffer.py b/poezio/text_buffer.py
index 3b3ac051..ff853a67 100644
--- a/poezio/text_buffer.py
+++ b/poezio/text_buffer.py
@@ -12,6 +12,7 @@ import logging
log = logging.getLogger(__name__)
from typing import (
+ cast,
Dict,
List,
Optional,
@@ -19,9 +20,15 @@ from typing import (
Tuple,
Union,
)
+from dataclasses import dataclass
from datetime import datetime
from poezio.config import config
-from poezio.ui.types import Message, BaseMessage
+from poezio.ui.types import (
+ BaseMessage,
+ Message,
+ MucOwnJoinMessage,
+ MucOwnLeaveMessage,
+)
if TYPE_CHECKING:
from poezio.windows.text_win import TextWin
@@ -35,6 +42,15 @@ class AckError(Exception):
pass
+@dataclass
+class HistoryGap:
+ """Class representing a period of non-presence inside a MUC"""
+ leave_message: Optional[BaseMessage]
+ join_message: Optional[BaseMessage]
+ last_timestamp_before_leave: Optional[datetime]
+ first_timestamp_after_join: Optional[datetime]
+
+
class TextBuffer:
"""
This class just keep trace of messages, in a list with various
@@ -44,7 +60,7 @@ class TextBuffer:
def __init__(self, messages_nb_limit: Optional[int] = None) -> None:
if messages_nb_limit is None:
- messages_nb_limit = config.get('max_messages_in_memory')
+ messages_nb_limit = cast(int, config.get('max_messages_in_memory'))
self._messages_nb_limit = messages_nb_limit # type: int
# Message objects
self.messages = [] # type: List[BaseMessage]
@@ -58,6 +74,99 @@ class TextBuffer:
def add_window(self, win) -> None:
self._windows.append(win)
+ def find_last_gap_muc(self) -> Optional[HistoryGap]:
+ """Find the last known history gap contained in buffer"""
+ leave = None # type:Optional[Tuple[int, BaseMessage]]
+ join = None # type:Optional[Tuple[int, BaseMessage]]
+ for i, item in enumerate(reversed(self.messages)):
+ if isinstance(item, MucOwnLeaveMessage):
+ leave = (len(self.messages) - i - 1, item)
+ break
+ elif join and isinstance(item, MucOwnJoinMessage):
+ leave = (len(self.messages) - i - 1, item)
+ break
+ if isinstance(item, MucOwnJoinMessage):
+ join = (len(self.messages) - i - 1, item)
+
+ last_timestamp = None
+ first_timestamp = datetime.now()
+
+ # Identify the special case when we got disconnected from a chatroom
+ # without receiving or sending the relevant presence, therefore only
+ # having two joins with no leave, and messages in the middle.
+ if leave and join and isinstance(leave[1], MucOwnJoinMessage):
+ for i in range(join[0] - 1, leave[0], - 1):
+ if isinstance(self.messages[i], Message):
+ leave = (
+ i,
+ self.messages[i]
+ )
+ last_timestamp = self.messages[i].time
+ break
+ # If we have a normal gap but messages inbetween, it probably
+ # already has history, so abort there without returning it.
+ if join and leave:
+ for i in range(leave[0] + 1, join[0], 1):
+ if isinstance(self.messages[i], Message):
+ return None
+ elif not (join or leave):
+ return None
+
+ # If a leave message is found, get the last Message timestamp
+ # before it.
+ if leave is None:
+ leave_msg = None
+ elif last_timestamp is None:
+ leave_msg = leave[1]
+ for i in range(leave[0], 0, -1):
+ if isinstance(self.messages[i], Message):
+ last_timestamp = self.messages[i].time
+ break
+ else:
+ leave_msg = leave[1]
+ # If a join message is found, get the first Message timestamp
+ # after it, or the current time.
+ if join is None:
+ join_msg = None
+ else:
+ join_msg = join[1]
+ for i in range(join[0], len(self.messages)):
+ msg = self.messages[i]
+ if isinstance(msg, Message) and msg.time < first_timestamp:
+ first_timestamp = msg.time
+ break
+ return HistoryGap(
+ leave_message=leave_msg,
+ join_message=join_msg,
+ last_timestamp_before_leave=last_timestamp,
+ first_timestamp_after_join=first_timestamp,
+ )
+
+ def get_gap_index(self, gap: HistoryGap) -> Optional[int]:
+ """Find the first index to insert into inside a gap"""
+ if gap.leave_message is None:
+ return 0
+ for i, msg in enumerate(self.messages):
+ if msg is gap.leave_message:
+ return i + 1
+ return None
+
+ def add_history_messages(self, messages: List[BaseMessage], gap: Optional[HistoryGap] = None) -> None:
+ """Insert history messages at their correct place """
+ index = 0
+ new_index = None
+ if gap is not None:
+ new_index = self.get_gap_index(gap)
+ if new_index is None: # Not sure what happened, abort
+ return
+ index = new_index
+ for message in messages:
+ self.messages.insert(index, message)
+ index += 1
+ log.debug('inserted message: %s', message)
+ for window in self._windows: # make the associated windows
+ window.rebuild_everything(self)
+
@property
def last_message(self) -> Optional[BaseMessage]:
return self.messages[-1] if self.messages else None
@@ -72,8 +181,8 @@ class TextBuffer:
self.messages.pop(0)
ret_val = 0
- show_timestamps = config.get('show_timestamps')
- nick_size = config.get('max_nick_length')
+ show_timestamps = cast(bool, config.get('show_timestamps'))
+ nick_size = cast(int, config.get('max_nick_length'))
for window in self._windows: # make the associated windows
# build the lines from the new message
nb = window.build_new_message(
@@ -82,8 +191,7 @@ class TextBuffer:
nick_size=nick_size)
if ret_val == 0:
ret_val = nb
- top = isinstance(msg, Message) and msg.top
- if window.pos != 0 and top is False:
+ if window.pos != 0:
window.scroll_up(nb)
return min(ret_val, 1)
@@ -197,6 +305,13 @@ class TextBuffer:
def del_window(self, win) -> None:
self._windows.remove(win)
+ def find_last_message(self) -> Optional[Message]:
+ """Find the last real message received in this buffer"""
+ for message in reversed(self.messages):
+ if isinstance(message, Message):
+ return message
+ return None
+
def __del__(self):
size = len(self.messages)
log.debug('** Deleting %s messages from textbuffer', size)