summaryrefslogtreecommitdiff
path: root/poezio/mam.py
diff options
context:
space:
mode:
Diffstat (limited to 'poezio/mam.py')
-rw-r--r--poezio/mam.py276
1 files changed, 169 insertions, 107 deletions
diff --git a/poezio/mam.py b/poezio/mam.py
index 50dad4a3..371b34dd 100644
--- a/poezio/mam.py
+++ b/poezio/mam.py
@@ -6,34 +6,49 @@
XEP-0313: Message Archive Management(MAM).
"""
+import asyncio
+import logging
import random
from datetime import datetime, timedelta, timezone
from hashlib import md5
-from typing import Optional, Callable
+from typing import (
+ Any,
+ AsyncIterable,
+ Callable,
+ Dict,
+ List,
+ Optional,
+)
-from slixmpp import JID
+from slixmpp import JID, Message as SMessage
from slixmpp.exceptions import IqError, IqTimeout
from poezio.theming import get_theme
from poezio import tabs
from poezio import xhtml, colors
from poezio.config import config
-from poezio.text_buffer import TextBuffer
-from poezio.ui.types import Message
+from poezio.common import to_utc
+from poezio.text_buffer import TextBuffer, HistoryGap
+from poezio.ui.types import (
+ BaseMessage,
+ EndOfArchive,
+ Message,
+)
+log = logging.getLogger(__name__)
+
class DiscoInfoException(Exception): pass
class MAMQueryException(Exception): pass
class NoMAMSupportException(Exception): pass
-def add_line(
- tab,
- text_buffer: TextBuffer,
+def make_line(
+ tab: tabs.ChatTab,
text: str,
time: datetime,
nick: str,
- top: bool,
- ) -> None:
+ identifier: str = '',
+ ) -> Message:
"""Adds a textual entry in the TextBuffer"""
# Convert to local timezone
@@ -61,150 +76,188 @@ def add_line(
color = xhtml.colors.get(color)
color = (color, -1)
else:
- nick = nick.split('/')[0]
- color = get_theme().COLOR_OWN_NICK
- text_buffer.add_message(
- Message(
- txt=text,
- time=time,
- nickname=nick,
- nick_color=color,
- history=True,
- user=None,
- top=top,
- )
+ if nick.split('/')[0] == tab.core.xmpp.boundjid.bare:
+ color = get_theme().COLOR_OWN_NICK
+ else:
+ color = get_theme().COLOR_REMOTE_USER
+ nick = tab.get_nick()
+ return Message(
+ txt=text,
+ identifier=identifier,
+ time=time,
+ nickname=nick,
+ nick_color=color,
+ history=True,
+ user=None,
)
-
-async def query(
+async def get_mam_iterator(
core,
groupchat: bool,
remote_jid: JID,
amount: int,
- reverse: bool,
- start: Optional[datetime] = None,
- end: Optional[datetime] = None,
+ reverse: bool = True,
+ start: Optional[str] = None,
+ end: Optional[str] = None,
before: Optional[str] = None,
- callback: Optional[Callable] = None,
- ) -> None:
+ ) -> AsyncIterable[Message]:
+ """Get an async iterator for this mam query"""
try:
query_jid = remote_jid if groupchat else JID(core.xmpp.boundjid.bare)
iq = await core.xmpp.plugin['xep_0030'].get_info(jid=query_jid)
except (IqError, IqTimeout):
- raise DiscoInfoException
+ raise DiscoInfoException()
if 'urn:xmpp:mam:2' not in iq['disco_info'].get_features():
- raise NoMAMSupportException
+ raise NoMAMSupportException()
args = {
'iterator': True,
'reverse': reverse,
- }
+ } # type: Dict[str, Any]
if groupchat:
args['jid'] = remote_jid
else:
args['with_jid'] = remote_jid
- args['rsm'] = {'max': amount}
- if reverse:
- if before is not None:
- args['rsm']['before'] = before
- else:
- args['end'] = end
- else:
- args['rsm']['start'] = start
- if before is not None:
- args['rsm']['end'] = end
- try:
- results = core.xmpp['xep_0313'].retrieve(**args)
- except (IqError, IqTimeout):
- raise MAMQueryException
- if callback is not None:
- callback(results)
+ if amount > 0:
+ args['rsm'] = {'max': amount}
+ args['start'] = start
+ args['end'] = end
+ return core.xmpp['xep_0313'].retrieve(**args)
- return results
+def _parse_message(msg: SMessage) -> Dict:
+ """Parse info inside a MAM forwarded message"""
+ forwarded = msg['mam_result']['forwarded']
+ message = forwarded['stanza']
+ return {
+ 'time': forwarded['delay']['stamp'],
+ 'nick': str(message['from']),
+ 'text': message['body'],
+ 'identifier': message['origin-id']
+ }
-async def add_messages_to_buffer(tab, top: bool, results, amount: int) -> bool:
- """Prepends or appends messages to the tab text_buffer"""
+async def retrieve_messages(tab: tabs.ChatTab,
+ results: AsyncIterable[SMessage],
+ amount: int = 100) -> List[BaseMessage]:
+ """Run the MAM query and put messages in order"""
text_buffer = tab._text_buffer
msg_count = 0
msgs = []
- async for rsm in results:
- if top:
+ to_add = []
+ try:
+ async for rsm in results:
for msg in rsm['mam']['results']:
if msg['mam_result']['forwarded']['stanza'] \
- .xml.find('{%s}%s' % ('jabber:client', 'body')) is not None:
- msgs.append(msg)
- if msg_count == amount:
- tab.core.refresh_window()
- return False
+ .xml.find('{%s}%s' % ('jabber:client', 'body')) is not None:
+ args = _parse_message(msg)
+ msgs.append(make_line(tab, **args))
+ for msg in reversed(msgs):
+ to_add.append(msg)
msg_count += 1
- msgs.reverse()
- for msg in msgs:
- forwarded = msg['mam_result']['forwarded']
- timestamp = forwarded['delay']['stamp']
- message = forwarded['stanza']
- tab.last_stanza_id = msg['mam_result']['id']
- nick = str(message['from'])
- add_line(tab, text_buffer, message['body'], timestamp, nick, top)
- else:
- for msg in rsm['mam']['results']:
- forwarded = msg['mam_result']['forwarded']
- timestamp = forwarded['delay']['stamp']
- message = forwarded['stanza']
- nick = str(message['from'])
- add_line(tab, text_buffer, message['body'], timestamp, nick, top)
- tab.core.refresh_window()
- return False
+ if msg_count == amount:
+ to_add.reverse()
+ return to_add
+ msgs = []
+ to_add.reverse()
+ return to_add
+ except (IqError, IqTimeout) as exc:
+ log.debug('Unable to complete MAM query: %s', exc, exc_info=True)
+ raise MAMQueryException('Query interrupted')
-async def fetch_history(tab, end: Optional[datetime] = None, amount: Optional[int] = None):
+async def fetch_history(tab: tabs.ChatTab,
+ start: Optional[datetime] = None,
+ end: Optional[datetime] = None,
+ amount: int = 100) -> List[BaseMessage]:
remote_jid = tab.jid
- before = tab.last_stanza_id
+ if not end:
+ for msg in tab._text_buffer.messages:
+ if isinstance(msg, Message):
+ end = msg.time
+ end -= timedelta(microseconds=1)
+ break
if end is None:
end = datetime.now()
- tzone = datetime.now().astimezone().tzinfo
- end = end.replace(tzinfo=tzone).astimezone(tz=timezone.utc)
- end = end.replace(tzinfo=None)
- end = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ')
-
- if amount >= 100:
- amount = 99
+ end = to_utc(end)
+ end_str = datetime.strftime(end, '%Y-%m-%dT%H:%M:%SZ')
- groupchat = isinstance(tab, tabs.MucTab)
+ start_str = None
+ if start is not None:
+ start = to_utc(start)
+ start_str = datetime.strftime(start, '%Y-%m-%dT%H:%M:%SZ')
- results = await query(
- tab.core,
- groupchat,
- remote_jid,
- amount,
+ mam_iterator = await get_mam_iterator(
+ core=tab.core,
+ groupchat=isinstance(tab, tabs.MucTab),
+ remote_jid=remote_jid,
+ amount=amount,
+ end=end_str,
+ start=start_str,
reverse=True,
- end=end,
- before=before,
)
- query_status = await add_messages_to_buffer(tab, True, results, amount)
- tab.query_status = query_status
+ return await retrieve_messages(tab, mam_iterator, amount)
+async def fill_missing_history(tab: tabs.ChatTab, gap: HistoryGap) -> None:
+ start = gap.last_timestamp_before_leave
+ end = gap.first_timestamp_after_join
+ if start:
+ start = start + timedelta(seconds=1)
+ if end:
+ end = end - timedelta(seconds=1)
+ try:
+ messages = await fetch_history(tab, start=start, end=end, amount=999)
+ tab._text_buffer.add_history_messages(messages, gap=gap)
+ if messages:
+ tab.core.refresh_window()
+ except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
+ return
+ finally:
+ tab.query_status = False
-async def on_tab_open(tab) -> None:
+async def on_new_tab_open(tab: tabs.ChatTab) -> None:
+ """Called when opening a new tab"""
amount = 2 * tab.text_win.height
end = datetime.now()
- tab.query_status = True
for message in tab._text_buffer.messages:
- time = message.time
- if time < end:
- end = time
- end = end + timedelta(seconds=-1)
+ if isinstance(message, Message) and to_utc(message.time) < to_utc(end):
+ end = message.time
+ break
+ end = end - timedelta(microseconds=1)
try:
- await fetch_history(tab, end=end, amount=amount)
+ messages = await fetch_history(tab, end=end, amount=amount)
+ tab._text_buffer.add_history_messages(messages)
+ if messages:
+ tab.core.refresh_window()
except (NoMAMSupportException, MAMQueryException, DiscoInfoException):
- tab.query_status = False
return None
+ finally:
+ tab.query_status = False
+
+
+def schedule_tab_open(tab: tabs.ChatTab) -> None:
+ """Set the query status and schedule a MAM query"""
+ tab.query_status = True
+ asyncio.ensure_future(on_tab_open(tab))
+
+async def on_tab_open(tab: tabs.ChatTab) -> None:
+ gap = tab._text_buffer.find_last_gap_muc()
+ if gap is None or not gap.leave_message:
+ await on_new_tab_open(tab)
+ else:
+ await fill_missing_history(tab, gap)
+
+
+def schedule_scroll_up(tab: tabs.ChatTab) -> None:
+ """Set query status and schedule a scroll up"""
+ tab.query_status = True
+ asyncio.ensure_future(on_scroll_up(tab))
-async def on_scroll_up(tab) -> None:
+
+async def on_scroll_up(tab: tabs.ChatTab) -> None:
tw = tab.text_win
# If position in the tab is < two screen pages, then fetch MAM, so that we
@@ -212,22 +265,31 @@ async def on_scroll_up(tab) -> None:
# join if not already available.
total, pos, height = len(tw.built_lines), tw.pos, tw.height
rest = (total - pos) // height
- # Not resetting the state of query_status here, it is changed only after the
- # query is complete (in fetch_history)
- # This is done to stop message repetition, eg: if the user presses PageUp continuously.
- tab.query_status = True
if rest > 1:
+ tab.query_status = False
return None
try:
# XXX: Do we want to fetch a possibly variable number of messages?
# (InfoTab changes height depending on the type of messages, see
# `information_buffer_popup_on`).
- await fetch_history(tab, amount=height)
+ messages = await fetch_history(tab, amount=height)
+ last_message_exists = False
+ if tab._text_buffer.messages:
+ last_message = tab._text_buffer.messages[0]
+ last_message_exists = True
+ if not messages and last_message_exists and not isinstance(last_message, EndOfArchive):
+ time = tab._text_buffer.messages[0].time
+ messages = [EndOfArchive('End of archive reached', time=time)]
+ tab._text_buffer.add_history_messages(messages)
+ if messages:
+ tab.core.refresh_window()
except NoMAMSupportException:
tab.core.information('MAM not supported for %r' % tab.jid, 'Info')
return None
except (MAMQueryException, DiscoInfoException):
tab.core.information('An error occured when fetching MAM for %r' % tab.jid, 'Error')
return None
+ finally:
+ tab.query_status = False