summaryrefslogtreecommitdiff
path: root/poezio/text_buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/text_buffer.py')
-rw-r--r--poezio/text_buffer.py339
1 files changed, 191 insertions, 148 deletions
diff --git a/poezio/text_buffer.py b/poezio/text_buffer.py
index d9347527..bcee5989 100644
--- a/poezio/text_buffer.py
+++ b/poezio/text_buffer.py
@@ -8,96 +8,35 @@ Each text buffer can be linked to multiple windows, that will be rendered
independently by their TextWins.
"""
+from __future__ import annotations
+
import logging
-log = logging.getLogger(__name__)
-from typing import Union, Optional, List, Tuple
+from typing import (
+ Dict,
+ List,
+ Optional,
+ TYPE_CHECKING,
+ Tuple,
+ Union,
+)
+from dataclasses import dataclass
from datetime import datetime
from poezio.config import config
-from poezio.theming import get_theme, dump_tuple
-
-
-class Message:
- __slots__ = ('txt', 'nick_color', 'time', 'str_time', 'nickname', 'user',
- 'identifier', 'top', 'highlight', 'me', 'old_message', 'revisions',
- 'jid', 'ack')
-
- def __init__(self,
- txt: str,
- time: Optional[datetime],
- nickname: Optional[str],
- nick_color: Optional[Tuple],
- history: bool,
- user: Optional[str],
- identifier: Optional[str],
- top: Optional[bool] = False,
- str_time: Optional[str] = None,
- highlight: bool = False,
- old_message: Optional['Message'] = None,
- revisions: int = 0,
- jid: Optional[str] = None,
- ack: int = 0) -> None:
- """
- Create a new Message object with parameters, check for /me messages,
- and delayed messages
- """
- time = time if time is not None else datetime.now()
- if txt.startswith('/me '):
- me = True
- txt = '\x19%s}%s\x19o' % (dump_tuple(get_theme().COLOR_ME_MESSAGE),
- txt[4:])
- else:
- me = False
- str_time = time.strftime("%H:%M:%S")
- if history:
- txt = txt.replace(
- '\x19o',
- '\x19o\x19%s}' % dump_tuple(get_theme().COLOR_LOG_MSG))
- str_time = time.strftime("%Y-%m-%d %H:%M:%S")
-
- self.txt = txt.replace('\t', ' ') + '\x19o'
- self.nick_color = nick_color
- self.time = time
- self.str_time = str_time
- self.nickname = nickname
- self.user = user
- self.identifier = identifier
- self.top = top
- self.highlight = highlight
- self.me = me
- self.old_message = old_message
- self.revisions = revisions
- self.jid = jid
- self.ack = ack
-
- def _other_elems(self) -> str:
- "Helper for the repr_message function"
- acc = []
- fields = list(self.__slots__)
- fields.remove('old_message')
- for field in fields:
- acc.append('%s=%s' % (field, repr(getattr(self, field))))
- return 'Message(%s, %s' % (', '.join(acc), 'old_message=')
-
- def __repr__(self) -> str:
- """
- repr() for the Message class, for debug purposes, since the default
- repr() is recursive, so it can stack overflow given too many revisions
- of a message
- """
- init = self._other_elems()
- acc = [init]
- next_message = self.old_message
- rev = 1
- while next_message is not None:
- acc.append(next_message._other_elems())
- next_message = next_message.old_message
- rev += 1
- acc.append('None')
- while rev:
- acc.append(')')
- rev -= 1
- return ''.join(acc)
+from poezio.ui.types import (
+ BaseMessage,
+ Message,
+ MucOwnJoinMessage,
+ MucOwnLeaveMessage,
+)
+
+if TYPE_CHECKING:
+ from poezio.windows.text_win import TextWin
+ from poezio.user import User
+ from slixmpp import JID
+
+
+log = logging.getLogger(__name__)
class CorrectionError(Exception):
@@ -108,6 +47,15 @@ class AckError(Exception):
pass
+@dataclass
+class HistoryGap:
+ """Class representing a period of non-presence inside a MUC"""
+ leave_message: Optional[BaseMessage]
+ join_message: Optional[BaseMessage]
+ last_timestamp_before_leave: Optional[datetime]
+ first_timestamp_after_join: Optional[datetime]
+
+
class TextBuffer:
"""
This class just keep trace of messages, in a list with various
@@ -117,104 +65,178 @@ class TextBuffer:
def __init__(self, messages_nb_limit: Optional[int] = None) -> None:
if messages_nb_limit is None:
- messages_nb_limit = config.get('max_messages_in_memory')
- self._messages_nb_limit = messages_nb_limit # type: int
+ messages_nb_limit = config.getint('max_messages_in_memory')
+ self._messages_nb_limit: int = messages_nb_limit
# Message objects
- self.messages = [] # type: List[Message]
+ self.messages: List[BaseMessage] = []
+ # COMPAT: Correction id -> Original message id.
+ self.correction_ids: Dict[str, str] = {}
# we keep track of one or more windows
# so we can pass the new messages to them, as they are added, so
# they (the windows) can build the lines from the new message
- self._windows = []
+ self._windows: List[TextWin] = []
def add_window(self, win) -> None:
self._windows.append(win)
+ def find_last_gap_muc(self) -> Optional[HistoryGap]:
+ """Find the last known history gap contained in buffer"""
+ leave: Optional[Tuple[int, BaseMessage]] = None
+ join: Optional[Tuple[int, BaseMessage]] = None
+ for i, item in enumerate(reversed(self.messages)):
+ if isinstance(item, MucOwnLeaveMessage):
+ leave = (len(self.messages) - i - 1, item)
+ break
+ elif join and isinstance(item, MucOwnJoinMessage):
+ leave = (len(self.messages) - i - 1, item)
+ break
+ if isinstance(item, MucOwnJoinMessage):
+ join = (len(self.messages) - i - 1, item)
+
+ last_timestamp = None
+ first_timestamp = datetime.now()
+
+ # Identify the special case when we got disconnected from a chatroom
+ # without receiving or sending the relevant presence, therefore only
+ # having two joins with no leave, and messages in the middle.
+ if leave and join and isinstance(leave[1], MucOwnJoinMessage):
+ for i in range(join[0] - 1, leave[0], - 1):
+ if isinstance(self.messages[i], Message):
+ leave = (
+ i,
+ self.messages[i]
+ )
+ last_timestamp = self.messages[i].time
+ break
+ # If we have a normal gap but messages inbetween, it probably
+ # already has history, so abort there without returning it.
+ if join and leave:
+ for i in range(leave[0] + 1, join[0], 1):
+ if isinstance(self.messages[i], Message):
+ return None
+ elif not (join or leave):
+ return None
+
+ # If a leave message is found, get the last Message timestamp
+ # before it.
+ if leave is None:
+ leave_msg = None
+ elif last_timestamp is None:
+ leave_msg = leave[1]
+ for i in range(leave[0], 0, -1):
+ if isinstance(self.messages[i], Message):
+ last_timestamp = self.messages[i].time
+ break
+ else:
+ leave_msg = leave[1]
+ # If a join message is found, get the first Message timestamp
+ # after it, or the current time.
+ if join is None:
+ join_msg = None
+ else:
+ join_msg = join[1]
+ for i in range(join[0], len(self.messages)):
+ msg = self.messages[i]
+ if isinstance(msg, Message) and msg.time < first_timestamp:
+ first_timestamp = msg.time
+ break
+ return HistoryGap(
+ leave_message=leave_msg,
+ join_message=join_msg,
+ last_timestamp_before_leave=last_timestamp,
+ first_timestamp_after_join=first_timestamp,
+ )
+
+ def get_gap_index(self, gap: HistoryGap) -> Optional[int]:
+ """Find the first index to insert into inside a gap"""
+ if gap.leave_message is None:
+ return 0
+ for i, msg in enumerate(self.messages):
+ if msg is gap.leave_message:
+ return i + 1
+ return None
+
+ def add_history_messages(self, messages: List[BaseMessage], gap: Optional[HistoryGap] = None) -> None:
+ """Insert history messages at their correct place """
+ index = 0
+ new_index = None
+ if gap is not None:
+ new_index = self.get_gap_index(gap)
+ if new_index is None: # Not sure what happened, abort
+ return
+ index = new_index
+ for message in messages:
+ self.messages.insert(index, message)
+ index += 1
+ log.debug('inserted message: %s', message)
+ for window in self._windows: # make the associated windows
+ window.rebuild_everything(self)
+
@property
- def last_message(self) -> Optional[Message]:
+ def last_message(self) -> Optional[BaseMessage]:
return self.messages[-1] if self.messages else None
- def add_message(self,
- txt: str,
- time: Optional[datetime] = None,
- nickname: Optional[str] = None,
- nick_color: Optional[Tuple] = None,
- history: bool = False,
- user: Optional[str] = None,
- highlight: bool = False,
- top: Optional[bool] = False,
- identifier: Optional[str] = None,
- str_time: Optional[str] = None,
- jid: Optional[str] = None,
- ack: int = 0) -> int:
+ def add_message(self, msg: BaseMessage):
"""
Create a message and add it to the text buffer
"""
- msg = Message(
- txt,
- time,
- nickname,
- nick_color,
- history,
- user,
- identifier,
- top,
- str_time=str_time,
- highlight=highlight,
- jid=jid,
- ack=ack)
self.messages.append(msg)
while len(self.messages) > self._messages_nb_limit:
self.messages.pop(0)
ret_val = 0
- show_timestamps = config.get('show_timestamps')
- nick_size = config.get('max_nick_length')
+ show_timestamps = config.getbool('show_timestamps')
+ nick_size = config.getbool('max_nick_length')
for window in self._windows: # make the associated windows
# build the lines from the new message
nb = window.build_new_message(
msg,
- history=history,
- highlight=highlight,
timestamp=show_timestamps,
- top=top,
nick_size=nick_size)
if ret_val == 0:
ret_val = nb
- if window.pos != 0 and top is False:
+ if window.pos != 0:
window.scroll_up(nb)
return min(ret_val, 1)
- def _find_message(self, old_id: str) -> int:
+ def _find_message(self, orig_id: str) -> Tuple[str, int]:
"""
Find a message in the text buffer from its message id
"""
+ # When looking for a message, ensure the id doesn't appear in a
+ # message we've removed from our message list. If so return the index
+ # of the corresponding id for the original message instead.
+ orig_id = self.correction_ids.get(orig_id, orig_id)
+
for i in range(len(self.messages) - 1, -1, -1):
msg = self.messages[i]
- if msg.identifier == old_id:
- return i
- return -1
+ if msg.identifier == orig_id:
+ return (orig_id, i)
+ return (orig_id, -1)
- def ack_message(self, old_id: str, jid: str) -> Union[None, bool, Message]:
+ def ack_message(self, old_id: str, jid: JID) -> Union[None, bool, Message]:
"""Mark a message as acked"""
return self._edit_ack(1, old_id, jid)
def nack_message(self, error: str, old_id: str,
- jid: str) -> Union[None, bool, Message]:
+ jid: JID) -> Union[None, bool, Message]:
"""Mark a message as errored"""
return self._edit_ack(-1, old_id, jid, append=error)
- def _edit_ack(self, value: int, old_id: str, jid: str,
+ def _edit_ack(self, value: int, old_id: str, jid: JID,
append: str = '') -> Union[None, bool, Message]:
"""
Edit the ack status of a message, and optionally
append some text.
"""
- i = self._find_message(old_id)
+ _, i = self._find_message(old_id)
if i == -1:
return None
msg = self.messages[i]
+ if not isinstance(msg, Message):
+ return None
if msg.ack == 1: # Message was already acked
return False
if msg.jid != jid:
@@ -228,29 +250,35 @@ class TextBuffer:
def modify_message(self,
txt: str,
- old_id: str,
+ orig_id: str,
new_id: str,
highlight: bool = False,
time: Optional[datetime] = None,
- user: Optional[str] = None,
- jid: Optional[str] = None):
+ user: Optional[User] = None,
+ jid: Optional[JID] = None) -> Message:
"""
Correct a message in a text buffer.
+
+ Version 1.1.0 of Last Message Correction (0308) added clarifications
+ that break the way poezio handles corrections. Instead of linking
+ corrections to the previous correction/message as we were doing, we
+ are now required to link all corrections to the original messages.
"""
- i = self._find_message(old_id)
+ orig_id, i = self._find_message(orig_id)
if i == -1:
log.debug(
'Message %s not found in text_buffer, abort replacement.',
- old_id)
+ orig_id)
raise CorrectionError("nothing to replace")
msg = self.messages[i]
-
+ if not isinstance(msg, Message):
+ raise CorrectionError('Wrong message type')
if msg.user and msg.user is not user:
raise CorrectionError("Different users")
- elif len(msg.str_time) > 8: # ugly
+ elif msg.delayed:
raise CorrectionError("Delayed message")
elif not msg.user and (msg.jid is None or jid is None):
raise CorrectionError('Could not check the '
@@ -258,29 +286,44 @@ class TextBuffer:
elif not msg.user and msg.jid != jid:
raise CorrectionError(
'Messages %s and %s have not been '
- 'sent by the same fullJID' % (old_id, new_id))
+ 'sent by the same fullJID' % (orig_id, new_id))
if not time:
- time = msg.time
+ time = datetime.now()
+
+ self.correction_ids[new_id] = orig_id
message = Message(
- txt,
- time,
- msg.nickname,
- msg.nick_color,
- False,
- msg.user,
- new_id,
+ txt=txt,
+ time=time,
+ nickname=msg.nickname,
+ nick_color=msg.nick_color,
+ user=msg.user,
+ identifier=orig_id,
highlight=highlight,
old_message=msg,
revisions=msg.revisions + 1,
jid=jid)
self.messages[i] = message
- log.debug('Replacing message %s with %s.', old_id, new_id)
+ log.debug('Replacing message %s with %s.', orig_id, new_id)
return message
def del_window(self, win) -> None:
self._windows.remove(win)
+ def find_last_message(self) -> Optional[Message]:
+ """Find the last real message received in this buffer"""
+ for message in reversed(self.messages):
+ if isinstance(message, Message):
+ return message
+ return None
+
+ def find_first_message(self) -> Optional[Message]:
+ """Find the first real message received in this buffer"""
+ for message in self.messages:
+ if isinstance(message, Message):
+ return message
+ return None
+
def __del__(self):
size = len(self.messages)
log.debug('** Deleting %s messages from textbuffer', size)