From d174e1fa352dd5b8c479a71123ab25a7371dd5bd Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 01:36:13 +0200 Subject: MAM: many changes - Fix color & nicks in one to one chats - Make poezio-facing functions "schedules" to avoid races on tab query state - Rename functions - Use a different behavior when filling a history gap and populating a new tab in a MUC --- poezio/mam.py | 244 +++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 146 insertions(+), 98 deletions(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index 50dad4a3..05275975 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -6,34 +6,43 @@ XEP-0313: Message Archive Management(MAM). """ +import asyncio +import logging import random from datetime import datetime, timedelta, timezone from hashlib import md5 -from typing import Optional, Callable +from typing import ( + AsyncIterable, + Callable, + Dict, + List, + Optional, +) -from slixmpp import JID +from slixmpp import JID, Message as SMessage from slixmpp.exceptions import IqError, IqTimeout from poezio.theming import get_theme from poezio import tabs from poezio import xhtml, colors from poezio.config import config -from poezio.text_buffer import TextBuffer -from poezio.ui.types import Message +from poezio.text_buffer import TextBuffer, HistoryGap +from poezio.ui.types import BaseMessage, Message +log = logging.getLogger(__name__) + class DiscoInfoException(Exception): pass class MAMQueryException(Exception): pass class NoMAMSupportException(Exception): pass -def add_line( - tab, - text_buffer: TextBuffer, +def make_line( + tab: tabs.Tab, text: str, time: datetime, nick: str, - top: bool, - ) -> None: + identifier: str = '', + ) -> Message: """Adds a textual entry in the TextBuffer""" # Convert to local timezone @@ -61,39 +70,40 @@ def add_line( color = xhtml.colors.get(color) color = (color, -1) else: - nick = nick.split('/')[0] - color = get_theme().COLOR_OWN_NICK - text_buffer.add_message( - Message( - txt=text, - time=time, - nickname=nick, - nick_color=color, - history=True, - user=None, - top=top, - ) + if nick.split('/')[0] == tab.core.xmpp.boundjid.bare: + color = get_theme().COLOR_OWN_NICK + else: + color = get_theme().COLOR_REMOTE_USER + nick = tab.get_nick() + return Message( + txt=text, + identifier=identifier, + time=time, + nickname=nick, + nick_color=color, + history=True, + user=None, ) -async def query( +async def get_mam_iterator( core, groupchat: bool, remote_jid: JID, amount: int, - reverse: bool, + reverse: bool = True, start: Optional[datetime] = None, end: Optional[datetime] = None, before: Optional[str] = None, - callback: Optional[Callable] = None, - ) -> None: + ) -> AsyncIterable[Message]: + """Get an async iterator for this mam query""" try: query_jid = remote_jid if groupchat else JID(core.xmpp.boundjid.bare) iq = await core.xmpp.plugin['xep_0030'].get_info(jid=query_jid) except (IqError, IqTimeout): - raise DiscoInfoException + raise DiscoInfoException() if 'urn:xmpp:mam:2' not in iq['disco_info'].get_features(): - raise NoMAMSupportException + raise NoMAMSupportException() args = { 'iterator': True, @@ -105,64 +115,66 @@ async def query( else: args['with_jid'] = remote_jid - args['rsm'] = {'max': amount} - if reverse: - if before is not None: - args['rsm']['before'] = before - else: - args['end'] = end - else: - args['rsm']['start'] = start - if before is not None: - args['rsm']['end'] = end - try: - results = core.xmpp['xep_0313'].retrieve(**args) - except (IqError, IqTimeout): - raise MAMQueryException - if callback is not None: - callback(results) + if amount > 0: + args['rsm'] = {'max': amount} + args['start'] = start + args['end'] = end + return core.xmpp['xep_0313'].retrieve(**args) - return results +def _parse_message(msg: SMessage) -> Dict: + """Parse info inside a MAM forwarded message""" + forwarded = msg['mam_result']['forwarded'] + message = forwarded['stanza'] + return { + 'time': forwarded['delay']['stamp'], + 'nick': str(message['from']), + 'text': message['body'], + 'identifier': message['origin-id'] + } -async def add_messages_to_buffer(tab, top: bool, results, amount: int) -> bool: - """Prepends or appends messages to the tab text_buffer""" +async def retrieve_messages(tab: tabs.Tab, + results: AsyncIterable[SMessage], + amount: int = 100) -> List[Message]: + """Run the MAM query and put messages in order""" text_buffer = tab._text_buffer msg_count = 0 msgs = [] - async for rsm in results: - if top: + to_add = [] + last_stanza_id = tab.last_stanza_id + try: + async for rsm in results: for msg in rsm['mam']['results']: if msg['mam_result']['forwarded']['stanza'] \ - .xml.find('{%s}%s' % ('jabber:client', 'body')) is not None: - msgs.append(msg) - if msg_count == amount: - tab.core.refresh_window() - return False + .xml.find('{%s}%s' % ('jabber:client', 'body')) is not None: + args = _parse_message(msg) + msgs.append(make_line(tab, **args)) + for msg in reversed(msgs): + to_add.append(msg) msg_count += 1 - msgs.reverse() - for msg in msgs: - forwarded = msg['mam_result']['forwarded'] - timestamp = forwarded['delay']['stamp'] - message = forwarded['stanza'] - tab.last_stanza_id = msg['mam_result']['id'] - nick = str(message['from']) - add_line(tab, text_buffer, message['body'], timestamp, nick, top) - else: - for msg in rsm['mam']['results']: - forwarded = msg['mam_result']['forwarded'] - timestamp = forwarded['delay']['stamp'] - message = forwarded['stanza'] - nick = str(message['from']) - add_line(tab, text_buffer, message['body'], timestamp, nick, top) - tab.core.refresh_window() - return False + if msg_count == amount: + to_add.reverse() + return to_add + msgs = [] + to_add.reverse() + return to_add + except (IqError, IqTimeout) as exc: + log.debug('Unable to complete MAM query: %s', exc, exc_info=True) + raise MAMQueryException('Query interrupted') -async def fetch_history(tab, end: Optional[datetime] = None, amount: Optional[int] = None): +async def fetch_history(tab: tabs.Tab, + start: Optional[datetime] = None, + end: Optional[datetime] = None, + amount: Optional[int] = None) -> None: remote_jid = tab.jid - before = tab.last_stanza_id + if not end: + for msg in tab._text_buffer.messages: + if isinstance(msg, Message): + end = msg.time + end -= timedelta(microseconds=1) + break if end is None: end = datetime.now() tzone = datetime.now().astimezone().tzinfo @@ -170,38 +182,74 @@ async def fetch_history(tab, end: Optional[datetime] = None, amount: Optional[in end = end.replace(tzinfo=None) end = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ') - if amount >= 100: - amount = 99 - - groupchat = isinstance(tab, tabs.MucTab) + if start is not None: + start = start.replace(tzinfo=tzone).astimezone(tz=timezone.utc) + start = start.replace(tzinfo=None) + start = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ') - results = await query( - tab.core, - groupchat, - remote_jid, - amount, - reverse=True, + mam_iterator = await get_mam_iterator( + core=tab.core, + groupchat=isinstance(tab, tabs.MucTab), + remote_jid=remote_jid, + amount=amount, end=end, - before=before, + start=start, + reverse=True, ) - query_status = await add_messages_to_buffer(tab, True, results, amount) - tab.query_status = query_status + return await retrieve_messages(tab, mam_iterator, amount) +async def fill_missing_history(tab: tabs.Tab, gap: HistoryGap) -> None: + start = gap.last_timestamp_before_leave + end = gap.first_timestamp_after_join + if start: + start = start + timedelta(seconds=1) + if end: + end = end - timedelta(seconds=1) + try: + messages = await fetch_history(tab, start=start, end=end, amount=999) + tab._text_buffer.add_history_messages(messages, gap=gap) + tab.core.refresh_window() + except (NoMAMSupportException, MAMQueryException, DiscoInfoException): + return + finally: + tab.query_status = False -async def on_tab_open(tab) -> None: +async def on_new_tab_open(tab: tabs.Tab) -> None: + """Called when opening a new tab""" amount = 2 * tab.text_win.height end = datetime.now() - tab.query_status = True for message in tab._text_buffer.messages: - time = message.time - if time < end: - end = time - end = end + timedelta(seconds=-1) + if isinstance(message, Message) and message.time < end: + end = message.time + break + end = end - timedelta(microseconds=1) try: - await fetch_history(tab, end=end, amount=amount) + messages = await fetch_history(tab, end=end, amount=amount) + tab._text_buffer.add_history_messages(messages) except (NoMAMSupportException, MAMQueryException, DiscoInfoException): - tab.query_status = False return None + finally: + tab.query_status = False + + +def schedule_tab_open(tab: tabs.Tab) -> None: + """Set the query status and schedule a MAM query""" + tab.query_status = True + asyncio.ensure_future(on_tab_open(tab)) + + +async def on_tab_open(tab: tabs.Tab) -> None: + gap = tab._text_buffer.find_last_gap_muc() + if gap is not None: + await fill_missing_history(tab, gap) + else: + await on_new_tab_open(tab) + + +def schedule_scroll_up(tab: tabs.Tab) -> None: + """Set query status and schedule a scroll up""" + tab.query_status = True + asyncio.ensure_future(on_scroll_up(tab)) async def on_scroll_up(tab) -> None: @@ -212,22 +260,22 @@ async def on_scroll_up(tab) -> None: # join if not already available. total, pos, height = len(tw.built_lines), tw.pos, tw.height rest = (total - pos) // height - # Not resetting the state of query_status here, it is changed only after the - # query is complete (in fetch_history) - # This is done to stop message repetition, eg: if the user presses PageUp continuously. - tab.query_status = True if rest > 1: + tab.query_status = False return None try: # XXX: Do we want to fetch a possibly variable number of messages? # (InfoTab changes height depending on the type of messages, see # `information_buffer_popup_on`). - await fetch_history(tab, amount=height) + messages = await fetch_history(tab, amount=height) + tab._text_buffer.add_history_messages(messages) except NoMAMSupportException: tab.core.information('MAM not supported for %r' % tab.jid, 'Info') return None except (MAMQueryException, DiscoInfoException): tab.core.information('An error occured when fetching MAM for %r' % tab.jid, 'Error') return None + finally: + tab.query_status = False -- cgit v1.2.3 From 36c85a5df4fc4650956bc3aa9d4e8474268a06a9 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 01:55:32 +0200 Subject: Add an "end of archive" message type --- poezio/mam.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index 05275975..6882ef7a 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -26,7 +26,11 @@ from poezio import tabs from poezio import xhtml, colors from poezio.config import config from poezio.text_buffer import TextBuffer, HistoryGap -from poezio.ui.types import BaseMessage, Message +from poezio.ui.types import ( + BaseMessage, + EndOfArchive, + Message, +) log = logging.getLogger(__name__) @@ -270,6 +274,13 @@ async def on_scroll_up(tab) -> None: # (InfoTab changes height depending on the type of messages, see # `information_buffer_popup_on`). messages = await fetch_history(tab, amount=height) + if tab._text_buffer.messages: + last_message = tab._text_buffer.messages[0] + else: + last_message = None + if not messages and not isinstance(last_message, EndOfArchive): + time = tab._text_buffer.messages[0].time + messages = [EndOfArchive('End of archive reached', time=time)] tab._text_buffer.add_history_messages(messages) except NoMAMSupportException: tab.core.information('MAM not supported for %r' % tab.jid, 'Info') -- cgit v1.2.3 From d3655c4c3520f1a86af5ccf7b816076ae1d18312 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 01:58:03 +0200 Subject: Fix some remaining refresh issues --- poezio/mam.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index 6882ef7a..edd1e462 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -212,7 +212,8 @@ async def fill_missing_history(tab: tabs.Tab, gap: HistoryGap) -> None: try: messages = await fetch_history(tab, start=start, end=end, amount=999) tab._text_buffer.add_history_messages(messages, gap=gap) - tab.core.refresh_window() + if messages: + tab.core.refresh_window() except (NoMAMSupportException, MAMQueryException, DiscoInfoException): return finally: @@ -230,6 +231,8 @@ async def on_new_tab_open(tab: tabs.Tab) -> None: try: messages = await fetch_history(tab, end=end, amount=amount) tab._text_buffer.add_history_messages(messages) + if messages: + tab.core.refresh_window() except (NoMAMSupportException, MAMQueryException, DiscoInfoException): return None finally: @@ -282,6 +285,8 @@ async def on_scroll_up(tab) -> None: time = tab._text_buffer.messages[0].time messages = [EndOfArchive('End of archive reached', time=time)] tab._text_buffer.add_history_messages(messages) + if messages: + tab.core.refresh_window() except NoMAMSupportException: tab.core.information('MAM not supported for %r' % tab.jid, 'Info') return None -- cgit v1.2.3 From 29eef159d50c836fbd7a27770775d63700dc7f19 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 17:09:17 +0200 Subject: Fix some edge cases of MAM history fetch - Wait until we receive our own MUC presence to fetch history - Fix /reconnect weirdness --- poezio/mam.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index edd1e462..ee5b1be8 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -247,10 +247,10 @@ def schedule_tab_open(tab: tabs.Tab) -> None: async def on_tab_open(tab: tabs.Tab) -> None: gap = tab._text_buffer.find_last_gap_muc() - if gap is not None: - await fill_missing_history(tab, gap) - else: + if gap is None or not gap.leave_message: await on_new_tab_open(tab) + else: + await fill_missing_history(tab, gap) def schedule_scroll_up(tab: tabs.Tab) -> None: -- cgit v1.2.3 From 4210f5c776dab2439cecb1e19dc3997109cab400 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 17:13:11 +0200 Subject: Convert all datetimes to UTC when doing comparisons --- poezio/mam.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index ee5b1be8..4171de5e 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -25,6 +25,7 @@ from poezio.theming import get_theme from poezio import tabs from poezio import xhtml, colors from poezio.config import config +from poezio.common import to_utc from poezio.text_buffer import TextBuffer, HistoryGap from poezio.ui.types import ( BaseMessage, @@ -89,7 +90,6 @@ def make_line( user=None, ) - async def get_mam_iterator( core, groupchat: bool, @@ -181,14 +181,11 @@ async def fetch_history(tab: tabs.Tab, break if end is None: end = datetime.now() - tzone = datetime.now().astimezone().tzinfo - end = end.replace(tzinfo=tzone).astimezone(tz=timezone.utc) - end = end.replace(tzinfo=None) + end = to_utc(end) end = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ') if start is not None: - start = start.replace(tzinfo=tzone).astimezone(tz=timezone.utc) - start = start.replace(tzinfo=None) + start = to_utc(start) start = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ') mam_iterator = await get_mam_iterator( @@ -224,7 +221,7 @@ async def on_new_tab_open(tab: tabs.Tab) -> None: amount = 2 * tab.text_win.height end = datetime.now() for message in tab._text_buffer.messages: - if isinstance(message, Message) and message.time < end: + if isinstance(message, Message) and to_utc(message.time) < to_utc(end): end = message.time break end = end - timedelta(microseconds=1) -- cgit v1.2.3 From f282b14e8d8314b3ec95268571093c6e2295458c Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 18:07:54 +0200 Subject: Fix typing in mam.py --- poezio/mam.py | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index 4171de5e..49ccf017 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -12,6 +12,7 @@ import random from datetime import datetime, timedelta, timezone from hashlib import md5 from typing import ( + Any, AsyncIterable, Callable, Dict, @@ -42,7 +43,7 @@ class NoMAMSupportException(Exception): pass def make_line( - tab: tabs.Tab, + tab: tabs.ChatTab, text: str, time: datetime, nick: str, @@ -96,8 +97,8 @@ async def get_mam_iterator( remote_jid: JID, amount: int, reverse: bool = True, - start: Optional[datetime] = None, - end: Optional[datetime] = None, + start: Optional[str] = None, + end: Optional[str] = None, before: Optional[str] = None, ) -> AsyncIterable[Message]: """Get an async iterator for this mam query""" @@ -112,7 +113,7 @@ async def get_mam_iterator( args = { 'iterator': True, 'reverse': reverse, - } + } # type: Dict[str, Any] if groupchat: args['jid'] = remote_jid @@ -138,9 +139,9 @@ def _parse_message(msg: SMessage) -> Dict: } -async def retrieve_messages(tab: tabs.Tab, +async def retrieve_messages(tab: tabs.ChatTab, results: AsyncIterable[SMessage], - amount: int = 100) -> List[Message]: + amount: int = 100) -> List[BaseMessage]: """Run the MAM query and put messages in order""" text_buffer = tab._text_buffer msg_count = 0 @@ -168,10 +169,10 @@ async def retrieve_messages(tab: tabs.Tab, raise MAMQueryException('Query interrupted') -async def fetch_history(tab: tabs.Tab, +async def fetch_history(tab: tabs.ChatTab, start: Optional[datetime] = None, end: Optional[datetime] = None, - amount: Optional[int] = None) -> None: + amount: int = 100) -> List[BaseMessage]: remote_jid = tab.jid if not end: for msg in tab._text_buffer.messages: @@ -182,24 +183,25 @@ async def fetch_history(tab: tabs.Tab, if end is None: end = datetime.now() end = to_utc(end) - end = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ') + end_str = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ') + start_str = None if start is not None: start = to_utc(start) - start = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ') + start_str = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ') mam_iterator = await get_mam_iterator( core=tab.core, groupchat=isinstance(tab, tabs.MucTab), remote_jid=remote_jid, amount=amount, - end=end, - start=start, + end=end_str, + start=start_str, reverse=True, ) return await retrieve_messages(tab, mam_iterator, amount) -async def fill_missing_history(tab: tabs.Tab, gap: HistoryGap) -> None: +async def fill_missing_history(tab: tabs.ChatTab, gap: HistoryGap) -> None: start = gap.last_timestamp_before_leave end = gap.first_timestamp_after_join if start: @@ -216,7 +218,7 @@ async def fill_missing_history(tab: tabs.Tab, gap: HistoryGap) -> None: finally: tab.query_status = False -async def on_new_tab_open(tab: tabs.Tab) -> None: +async def on_new_tab_open(tab: tabs.ChatTab) -> None: """Called when opening a new tab""" amount = 2 * tab.text_win.height end = datetime.now() @@ -236,13 +238,13 @@ async def on_new_tab_open(tab: tabs.Tab) -> None: tab.query_status = False -def schedule_tab_open(tab: tabs.Tab) -> None: +def schedule_tab_open(tab: tabs.ChatTab) -> None: """Set the query status and schedule a MAM query""" tab.query_status = True asyncio.ensure_future(on_tab_open(tab)) -async def on_tab_open(tab: tabs.Tab) -> None: +async def on_tab_open(tab: tabs.ChatTab) -> None: gap = tab._text_buffer.find_last_gap_muc() if gap is None or not gap.leave_message: await on_new_tab_open(tab) @@ -250,13 +252,13 @@ async def on_tab_open(tab: tabs.Tab) -> None: await fill_missing_history(tab, gap) -def schedule_scroll_up(tab: tabs.Tab) -> None: +def schedule_scroll_up(tab: tabs.ChatTab) -> None: """Set query status and schedule a scroll up""" tab.query_status = True asyncio.ensure_future(on_scroll_up(tab)) -async def on_scroll_up(tab) -> None: +async def on_scroll_up(tab: tabs.ChatTab) -> None: tw = tab.text_win # If position in the tab is < two screen pages, then fetch MAM, so that we @@ -274,11 +276,11 @@ async def on_scroll_up(tab) -> None: # (InfoTab changes height depending on the type of messages, see # `information_buffer_popup_on`). messages = await fetch_history(tab, amount=height) + last_message_exists = False if tab._text_buffer.messages: last_message = tab._text_buffer.messages[0] - else: - last_message = None - if not messages and not isinstance(last_message, EndOfArchive): + last_message_exists = True + if not messages and last_message_exists and not isinstance(last_message, EndOfArchive): time = tab._text_buffer.messages[0].time messages = [EndOfArchive('End of archive reached', time=time)] tab._text_buffer.add_history_messages(messages) -- cgit v1.2.3 From faeab78c7e3c9f125cfbfe3dce0fb18c9b8649c4 Mon Sep 17 00:00:00 2001 From: mathieui Date: Fri, 22 May 2020 18:23:51 +0200 Subject: Remove remaining occurences of tab.last_stanza_id --- poezio/mam.py | 1 - 1 file changed, 1 deletion(-) (limited to 'poezio/mam.py') diff --git a/poezio/mam.py b/poezio/mam.py index 49ccf017..371b34dd 100644 --- a/poezio/mam.py +++ b/poezio/mam.py @@ -147,7 +147,6 @@ async def retrieve_messages(tab: tabs.ChatTab, msg_count = 0 msgs = [] to_add = [] - last_stanza_id = tab.last_stanza_id try: async for rsm in results: for msg in rsm['mam']['results']: -- cgit v1.2.3