summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0012/last_activity.py
blob: 56905de03ae0dfced69bc887d4efa446d276242c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185

# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import logging
from asyncio import Future
from datetime import datetime, timedelta
from typing import (
    Dict,
    Optional
)

from slixmpp.plugins import BasePlugin
from slixmpp import JID
from slixmpp.stanza import Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0012 import stanza, LastActivity


log = logging.getLogger(__name__)


class XEP_0012(BasePlugin):

    """
    XEP-0012 Last Activity
    """

    name = 'xep_0012'
    description = 'XEP-0012: Last Activity'
    dependencies = {'xep_0030'}
    stanza = stanza

    def plugin_init(self):
        register_stanza_plugin(Iq, LastActivity)

        self._last_activities = {}

        self.xmpp.register_handler(
            CoroutineCallback('Last Activity',
                 StanzaPath('iq@type=get/last_activity'),
                 self._handle_get_last_activity))

        self.api.register(self._default_get_last_activity,
                'get_last_activity',
                default=True)
        self.api.register(self._default_set_last_activity,
                'set_last_activity',
                default=True)
        self.api.register(self._default_del_last_activity,
                'del_last_activity',
                default=True)

    def plugin_end(self):
        self.xmpp.remove_handler('Last Activity')
        self.xmpp['xep_0030'].del_feature(feature='jabber:iq:last')

    def session_bind(self, jid):
        self.xmpp['xep_0030'].add_feature('jabber:iq:last')

    def begin_idle(self, jid: Optional[JID] = None, status: Optional[str] = None) -> Future:
        """Reset the last activity for the given JID.

        .. versionchanged:: 1.8.0
            This function now returns a Future.

        :param status: Optional status.
        """
        return self.set_last_activity(jid, 0, status)

    def end_idle(self, jid: Optional[JID] = None) -> Future:
        """Remove the last activity of a JID.

        .. versionchanged:: 1.8.0
            This function now returns a Future.
        """
        return self.del_last_activity(jid)

    def start_uptime(self, status: Optional[str] = None) -> Future:
        """
        .. versionchanged:: 1.8.0
            This function now returns a Future.
        """
        return self.set_last_activity(None, 0, status)

    def set_last_activity(self, jid=None, seconds=None, status=None) -> Future:
        """Set last activity for a JID.

        .. versionchanged:: 1.8.0
            This function now returns a Future.
        """
        return self.api['set_last_activity'](jid, args={
            'seconds': seconds,
            'status': status
        })

    def del_last_activity(self, jid: JID) -> Future:
        """Remove the last activity of a JID.

        .. versionchanged:: 1.8.0
            This function now returns a Future.
        """
        return self.api['del_last_activity'](jid)

    def get_last_activity(self, jid: JID, local: bool = False,
                          ifrom: Optional[JID] = None, **iqkwargs) -> Future:
        """Get last activity for a specific JID.

        :param local: Fetch the value from the local cache.
        """
        if jid is not None and not isinstance(jid, JID):
            jid = JID(jid)

        if self.xmpp.is_component:
            if jid.domain == self.xmpp.boundjid.domain:
                local = True
        else:
            if str(jid) == str(self.xmpp.boundjid):
                local = True
        jid = jid.full

        if local or jid in (None, ''):
            log.debug("Looking up local last activity data for %s", jid)
            return self.api['get_last_activity'](jid, None, ifrom, None)

        iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
        iq.enable('last_activity')
        return iq.send(**iqkwargs)

    async def _handle_get_last_activity(self, iq: Iq):
        log.debug("Received last activity query from " + \
                  "<%s> to <%s>.", iq['from'], iq['to'])
        reply = await self.api['get_last_activity'](iq['to'], None, iq['from'], iq)
        reply.send()

    # =================================================================
    # Default in-memory implementations for storing last activity data.
    # =================================================================

    def _default_set_last_activity(self, jid: JID, node: str, ifrom: JID, data: Dict):
        seconds = data.get('seconds', None)
        if seconds is None:
            seconds = 0

        status = data.get('status', None)
        if status is None:
            status = ''

        self._last_activities[jid] = {
            'seconds': datetime.now() - timedelta(seconds=seconds),
            'status': status}

    def _default_del_last_activity(self, jid: JID, node: str, ifrom: JID, data: Dict):
        if jid in self._last_activities:
            del self._last_activities[jid]

    def _default_get_last_activity(self, jid: JID, node: str, ifrom: JID, iq: Iq) -> Iq:
        if not isinstance(iq, Iq):
            reply = self.xmpp.Iq()
        else:
            reply = iq.reply()

        if jid not in self._last_activities:
            raise XMPPError('service-unavailable')

        bare = JID(jid).bare

        if bare != self.xmpp.boundjid.bare:
            if bare in self.xmpp.roster[jid]:
                sub = self.xmpp.roster[jid][bare]['subscription']
                if sub not in ('from', 'both'):
                    raise XMPPError('forbidden')

        td = datetime.now() - self._last_activities[jid]['seconds']
        seconds = td.seconds + td.days * 24 * 3600
        status = self._last_activities[jid]['status']

        reply['last_activity']['seconds'] = seconds
        reply['last_activity']['status'] = status

        return reply