1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2012 Nathanael C. Fritz,
Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import logging
import hashlib
from slixmpp.stanza import Iq, Message, Presence
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.base import BasePlugin
from slixmpp.plugins.xep_0231 import stanza, BitsOfBinary
log = logging.getLogger(__name__)
class XEP_0231(BasePlugin):
"""
XEP-0231 Bits of Binary
"""
name = 'xep_0231'
description = 'XEP-0231: Bits of Binary'
dependencies = set(['xep_0030'])
def plugin_init(self):
self._cids = {}
register_stanza_plugin(Iq, BitsOfBinary)
register_stanza_plugin(Message, BitsOfBinary)
register_stanza_plugin(Presence, BitsOfBinary)
self.xmpp.register_handler(
Callback('Bits of Binary - Iq',
StanzaPath('iq/bob'),
self._handle_bob_iq))
self.xmpp.register_handler(
Callback('Bits of Binary - Message',
StanzaPath('message/bob'),
self._handle_bob))
self.xmpp.register_handler(
Callback('Bits of Binary - Presence',
StanzaPath('presence/bob'),
self._handle_bob))
self.api.register(self._get_bob, 'get_bob', default=True)
self.api.register(self._set_bob, 'set_bob', default=True)
self.api.register(self._del_bob, 'del_bob', default=True)
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature='urn:xmpp:bob')
self.xmpp.remove_handler('Bits of Binary - Iq')
self.xmpp.remove_handler('Bits of Binary - Message')
self.xmpp.remove_handler('Bits of Binary - Presence')
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('urn:xmpp:bob')
def set_bob(self, data, mtype, cid=None, max_age=None):
if cid is None:
cid = 'sha1+%s@bob.xmpp.org' % hashlib.sha1(data).hexdigest()
bob = BitsOfBinary()
bob['data'] = data
bob['type'] = mtype
bob['cid'] = cid
bob['max_age'] = max_age
self.api['set_bob'](args=bob)
return cid
def get_bob(self, jid=None, cid=None, cached=True, ifrom=None,
block=True, timeout=None, callback=None):
if cached:
data = self.api['get_bob'](None, None, ifrom, args=cid)
if data is not None:
if not isinstance(data, Iq):
iq = self.xmpp.Iq()
iq.append(data)
return iq
return data
iq = self.xmpp.Iq()
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'get'
iq['bob']['cid'] = cid
return iq.send(block=block, timeout=timeout, callback=callback)
def del_bob(self, cid):
self.api['del_bob'](args=cid)
def _handle_bob_iq(self, iq):
cid = iq['bob']['cid']
if iq['type'] == 'result':
self.api['set_bob'](iq['from'], None, iq['to'], args=iq['bob'])
self.xmpp.event('bob', iq)
elif iq['type'] == 'get':
data = self.api['get_bob'](iq['to'], None, iq['from'], args=cid)
if isinstance(data, Iq):
data['id'] = iq['id']
data.send()
return
iq = iq.reply()
iq.append(data)
iq.send()
def _handle_bob(self, stanza):
self.api['set_bob'](stanza['from'], None,
stanza['to'], args=stanza['bob'])
self.xmpp.event('bob', stanza)
# =================================================================
def _set_bob(self, jid, node, ifrom, bob):
self._cids[bob['cid']] = bob
def _get_bob(self, jid, node, ifrom, cid):
if cid in self._cids:
return self._cids[cid]
else:
raise XMPPError('item-not-found')
def _del_bob(self, jid, node, ifrom, cid):
if cid in self._cids:
del self._cids[cid]
|