summaryrefslogtreecommitdiff
path: root/poezio/log_loader.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/log_loader.py')
-rw-r--r--poezio/log_loader.py395
1 files changed, 395 insertions, 0 deletions
diff --git a/poezio/log_loader.py b/poezio/log_loader.py
new file mode 100644
index 00000000..2e3b27c2
--- /dev/null
+++ b/poezio/log_loader.py
@@ -0,0 +1,395 @@
+"""
+This modules contains a class that loads messages into a ChatTab, either from
+MAM or the local logs, and a class that loads MUC history into the local
+logs.
+
+
+How the log loading works will depend on the poezio configuration:
+
+- if use_log is True, no logs will be fetched dynamically
+- if use_log is False, all logs will be fetched from MAM (if available)
+- if mam_sync and use_log are True, most chat tabs (all of them except the
+ static conversation tab) will try to sync the local
+ logs with the MAM history when opening them, or when joining a room.
+- all log loading/writing workflows are paused until the MAM sync is complete
+ (so that the local log loading can be up-to-date with the MAM history)
+- when use_log is False, mam_sync has no effect
+"""
+from __future__ import annotations
+import asyncio
+import logging
+from datetime import datetime, timedelta, timezone
+from typing import List, Optional
+from poezio import tabs
+from poezio.logger import (
+ build_log_message,
+ iterate_messages_reverse,
+ last_message_in_archive,
+ Logger,
+ LogDict,
+)
+from poezio.mam import (
+ fetch_history,
+ NoMAMSupportException,
+ MAMQueryException,
+ DiscoInfoException,
+ make_line,
+)
+from poezio.common import to_utc
+from poezio.ui.types import EndOfArchive, Message, BaseMessage
+from poezio.text_buffer import HistoryGap
+from slixmpp import JID
+
+
+# Max number of messages to insert when filling a gap
+HARD_LIMIT = 999
+
+
+log = logging.getLogger(__name__)
+
+
+def make_line_local(tab: tabs.ChatTab, msg: LogDict) -> Message:
+ """Create a UI message from a local log read.
+
+ :param tab: Tab in which that message will be displayed
+ :param msg: Log data
+ :returns: The UI message
+ """
+ if isinstance(tab, tabs.MucTab):
+ jid = JID(tab.jid)
+ jid.resource = msg.get('nickname') or ''
+ else:
+ jid = JID(tab.jid)
+ msg['time'] = msg['time'].astimezone(tz=timezone.utc)
+ return make_line(tab, msg['txt'], msg['time'], jid, '', msg['nickname'])
+
+
+class LogLoader:
+ """
+ An ephemeral class that loads history in a tab.
+
+ Loading from local logs is blocked until history has been fetched from
+ MAM to fill the local archive.
+ """
+ logger: Logger
+ tab: tabs.ChatTab
+ mam_only: bool
+
+ def __init__(self, logger: Logger, tab: tabs.ChatTab,
+ local_logs: bool = True,
+ done_event: Optional[asyncio.Event] = None):
+ self.mam_only = not local_logs
+ self.logger = logger
+ self.tab = tab
+ self.done_event = done_event
+
+ def _done(self) -> None:
+ """Signal end if possible"""
+ if self.done_event is not None:
+ self.done_event.set()
+
+ async def tab_open(self) -> None:
+ """Called on a tab opening or a MUC join"""
+ amount = 2 * self.tab.text_win.height
+ gap = self.tab._text_buffer.find_last_gap_muc()
+ messages = []
+ if gap is not None:
+ if self.mam_only:
+ messages = await self.mam_fill_gap(gap, amount)
+ else:
+ messages = await self.local_fill_gap(gap, amount)
+ else:
+ if self.mam_only:
+ messages = await self.mam_tab_open(amount)
+ else:
+ messages = await self.local_tab_open(amount)
+
+ log.debug(
+ 'Fetched %s messages for %s',
+ len(messages), self.tab.jid
+ )
+ if messages:
+ self.tab._text_buffer.add_history_messages(messages)
+ self.tab.core.refresh_window()
+ self._done()
+
+ async def mam_tab_open(self, nb: int) -> List[BaseMessage]:
+ """Fetch messages in MAM when opening a new tab.
+
+ :param nb: number of max messages to fetch.
+ :returns: list of ui messages to add
+ """
+ tab = self.tab
+ end = datetime.now()
+ for message in tab._text_buffer.messages:
+ time_ok = to_utc(message.time) < to_utc(end)
+ if isinstance(message, Message) and time_ok:
+ end = message.time
+ break
+ end = end - timedelta(microseconds=1)
+ try:
+ return await fetch_history(tab, end=end, amount=nb)
+ except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
+ return []
+ finally:
+ tab.query_status = False
+
+ def _get_time_limit(self) -> datetime:
+ """Get the date 10 weeks ago from now."""
+ return datetime.now() - timedelta(weeks=10)
+
+ async def local_tab_open(self, nb: int) -> List[BaseMessage]:
+ """Fetch messages locally when opening a new tab.
+
+ :param nb: number of max messages to fetch.
+ :returns: list of ui messages to add
+ """
+ await self.wait_mam()
+ limit = self._get_time_limit()
+ results: List[BaseMessage] = []
+ filepath = self.logger.get_file_path(self.tab.jid)
+ count = 0
+ for msg in iterate_messages_reverse(filepath):
+ typ_ = msg.pop('type')
+ if typ_ == 'message':
+ results.append(make_line_local(self.tab, msg))
+ elif msg['time'] < limit and 'set the subject' not in msg['txt']:
+ break
+ if len(results) >= nb:
+ break
+ count += 1
+ if count % 20 == 0:
+ await asyncio.sleep(0)
+ return results[::-1]
+
+ async def mam_fill_gap(self, gap: HistoryGap, amount: Optional[int] = None) -> List[BaseMessage]:
+ """Fill a message gap in an existing tab using MAM.
+
+ :param gap: Object describing the history gap
+ :returns: list of ui messages to add
+ """
+ tab = self.tab
+ if amount is None:
+ amount = HARD_LIMIT
+
+ start = gap.last_timestamp_before_leave
+ end = gap.first_timestamp_after_join
+ if start:
+ start = start + timedelta(seconds=1)
+ if end:
+ end = end - timedelta(seconds=1)
+ try:
+ return await fetch_history(
+ tab,
+ start=start,
+ end=end,
+ amount=amount,
+ )
+ except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
+ return []
+ finally:
+ tab.query_status = False
+
+ async def local_fill_gap(self, gap: HistoryGap, amount: Optional[int] = None) -> List[BaseMessage]:
+ """Fill a message gap in an existing tab using the local logs.
+ Mostly useless when not used with the MAMFiller.
+
+ :param gap: Object describing the history gap
+ :returns: list of ui messages to add
+ """
+ if amount is None:
+ amount = HARD_LIMIT
+ await self.wait_mam()
+ limit = self._get_time_limit()
+ start = gap.last_timestamp_before_leave
+ end = gap.first_timestamp_after_join
+ count = 0
+
+ results: List[BaseMessage] = []
+ filepath = self.logger.get_file_path(self.tab.jid)
+ for msg in iterate_messages_reverse(filepath):
+ typ_ = msg.pop('type')
+ if start and msg['time'] < start:
+ break
+ if typ_ == 'message' and (not end or msg['time'] < end):
+ results.append(make_line_local(self.tab, msg))
+ elif msg['time'] < limit and 'set the subject' not in msg['txt']:
+ break
+ if len(results) >= amount:
+ break
+ count += 1
+ if count % 20 == 0:
+ await asyncio.sleep(0)
+ return results[::-1]
+
+ async def scroll_requested(self):
+ """When a scroll up is requested in a chat tab.
+
+ Try to load more history if there are no more messages in the buffer.
+ """
+ tab = self.tab
+ tw = tab.text_win
+
+ # If position in the tab is < two screen pages, then fetch MAM, so that
+ # wa keep some prefetched margin. A first page should also be
+ # prefetched on join if not already available.
+ total, pos, height = len(tw.built_lines), tw.pos, tw.height
+ rest = (total - pos) // height
+
+ if rest > 1:
+ return None
+
+ if self.mam_only:
+ messages = await self.mam_scroll_requested(height)
+ else:
+ messages = await self.local_scroll_requested(height)
+ if messages:
+ tab._text_buffer.add_history_messages(messages)
+ tab.core.refresh_window()
+ self._done()
+
+ async def local_scroll_requested(self, nb: int) -> List[BaseMessage]:
+ """Fetch messages locally on scroll up.
+
+ :param nb: Number of messages to fetch
+ :returns: list of ui messages to add
+ """
+ await self.wait_mam()
+ tab = self.tab
+ count = 0
+
+ first_message = tab._text_buffer.find_first_message()
+ first_message_time = None
+ if first_message:
+ first_message_time = first_message.time - timedelta(microseconds=1)
+
+ results: List[BaseMessage] = []
+ filepath = self.logger.get_file_path(self.tab.jid)
+ for msg in iterate_messages_reverse(filepath):
+ typ_ = msg.pop('type')
+ if first_message_time is None or msg['time'] < first_message_time:
+ if typ_ == 'message':
+ results.append(make_line_local(self.tab, msg))
+ if len(results) >= nb:
+ break
+ count += 1
+ if count % 20 == 0:
+ await asyncio.sleep(0)
+ return results[::-1]
+
+ async def mam_scroll_requested(self, nb: int) -> List[BaseMessage]:
+ """Fetch messages from MAM on scroll up.
+
+ :param nb: Number of messages to fetch
+ :returns: list of ui messages to add
+ """
+ tab = self.tab
+ try:
+ messages = await fetch_history(tab, amount=nb)
+ last_message_exists = False
+ if tab._text_buffer.messages:
+ last_message = tab._text_buffer.messages[0]
+ last_message_exists = True
+ if (not messages and
+ last_message_exists
+ and not isinstance(last_message, EndOfArchive)):
+ time = tab._text_buffer.messages[0].time
+ messages = [EndOfArchive('End of archive reached', time=time)]
+ return messages
+ except NoMAMSupportException:
+ return []
+ except (MAMQueryException, DiscoInfoException):
+ tab.core.information(
+ f'An error occured when fetching MAM for {tab.jid}',
+ 'Error'
+ )
+ return []
+ finally:
+ tab.query_status = False
+
+ async def wait_mam(self) -> None:
+ """Wait for the MAM history sync before reading the local logs.
+
+ Does nothing apart from blocking.
+ """
+ if self.tab.mam_filler is None:
+ return
+ await self.tab.mam_filler.done.wait()
+
+
+class MAMFiller:
+ """Class that loads messages from MAM history into the local logs.
+ """
+ tab: tabs.ChatTab
+ logger: Logger
+ future: asyncio.Future
+ done: asyncio.Event
+ limit: int
+
+ def __init__(self, logger: Logger, tab: tabs.ChatTab, limit: int = 2000):
+ self.tab = tab
+ self.logger = logger
+ logger.fd_busy(tab.jid)
+ self.future = asyncio.create_task(self.fetch_routine())
+ self.done = asyncio.Event()
+ self.limit = limit
+ self.result = 0
+
+ def cancel(self) -> None:
+ """Cancel the routine and signal the end."""
+ self.future.cancel()
+ self.end()
+
+ async def fetch_routine(self) -> None:
+ """Load logs into the local archive, if possible."""
+ filepath = self.logger.get_file_path(self.tab.jid)
+ log.debug('Fetching logs for %s', self.tab.jid)
+ try:
+ last_msg = last_message_in_archive(filepath)
+ last_msg_time = None
+ if last_msg:
+ last_msg_time = last_msg['time'] + timedelta(seconds=1)
+ try:
+ messages = await fetch_history(
+ self.tab,
+ start=last_msg_time,
+ amount=self.limit,
+ )
+ log.debug(
+ 'Fetched %s messages to fill local logs for %s',
+ len(messages), self.tab.jid,
+ )
+ self.result = len(messages)
+ except NoMAMSupportException:
+ log.debug('The entity %s does not support MAM', self.tab.jid)
+ return
+ except (DiscoInfoException, MAMQueryException):
+ log.debug(
+ 'Failed fetching logs for %s',
+ self.tab.jid, exc_info=True
+ )
+ return
+
+ def build_message(msg) -> str:
+ return build_log_message(
+ msg.nickname,
+ msg.txt,
+ msg.time,
+ prefix='MR',
+ )
+
+ logs = ''.join(map(build_message, messages))
+ self.logger.log_raw(self.tab.jid, logs, force=True)
+ finally:
+ self.end()
+
+ def end(self) -> None:
+ """End a MAM fill (error or sucess). Remove references and signal on
+ the Event().
+ """
+ try:
+ self.logger.fd_available(self.tab.jid)
+ except Exception:
+ log.error('Error when restoring log fd:', exc_info=True)
+ self.tab.mam_filler = None
+ self.done.set()