1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
import pyotr
from sleekxmpp.xmlstream.stanzabase import JID
import logging
log = logging.getLogger(__name__)
from plugin import BasePlugin
import tabs
from tabs import ConversationTab
class Plugin(BasePlugin):
def init(self):
self.contacts = {}
# a dict of {full-JID: OTR object}
self.add_event_handler('conversation_say_after', self.on_conversation_say)
self.add_event_handler('conversation_msg', self.on_conversation_msg)
self.add_tab_command(ConversationTab, 'otr', self.command_otr, "Usage: /otr <start|end>\notr: Start or stop OTR for the current conversation", self.otr_completion)
ConversationTab.add_information_element('otr', self.display_encryption_status)
def cleanup(self):
ConversationTab.remove_information_element('otr')
self.del_tab_command(ConversationTab, 'otr')
def otr_special(self, tab, typ):
def helper(msg):
tab.add_message('%s: %s' % (typ, msg.decode()))
return helper
def otr_on_state_change(self, tab):
def helper(old, new):
old = self.otr_state(old)
new = self.otr_state(new)
tab.add_message('OTR state has changed from %s to %s' % (old, new))
return helper
def get_otr(self, tab):
if tab not in self.contacts:
self.contacts[tab] = pyotr.OTR(on_error=self.otr_special(tab, 'Error'), on_warn=self.otr_special(tab, 'Warn'), on_state_change=self.otr_on_state_change(tab))
return self.contacts[tab]
def on_conversation_say(self, message, tab):
"""
Feed the message through the OTR filter
"""
to = message['to']
if not message['body']:
# there’s nothing to encrypt if this is a chatstate, for example
return
otr_state = self.get_otr(tab)
# Not sure what to do with xhtml bodies, and I don't like them anyway ;)
del message['xhtml_im']
say = otr_state.transform_msg(message['body'].encode())
if say is not None:
message['body'] = say.decode()
else:
del message['body']
def on_conversation_msg(self, message, tab):
"""
Feed the message through the OTR filter
"""
fro = message['from']
if not message['body']:
# there’s nothing to decrypt if this is a chatstate, for example
return
otr_state = self.get_otr(tab)
# Not sure what to do with xhtml bodies, and I don't like them anyway ;)
del message['xhtml_im']
display, reply = otr_state.handle_msg(message['body'].encode())
#self.core.information('D: {!r}, R: {!r}'.format(display, reply))
if display is not None:
message['body'] = display.decode()
else:
del message['body']
if reply is not None:
self.otr_say(tab, reply.decode())
@staticmethod
def otr_state(state):
if state == pyotr.MSG_STATE_PLAINTEXT:
return 'plaintext'
elif state == pyotr.MSG_STATE_ENCRYPTED:
return 'encrypted'
elif state == pyotr.MSG_STATE_FINISHED:
return 'finished'
def display_encryption_status(self, jid):
"""
Returns the status of encryption for the associated jid. This is to be used
in the ConversationTab’s InfoWin.
"""
tab = self.core.get_tab_by_name(jid, tabs.ConversationTab)
if tab not in self.contacts:
return ''
state = self.otr_state(self.contacts[tab].state)
return ' OTR: %s' % (state,)
def otr_say(self, tab, line):
msg = self.core.xmpp.make_message(tab.get_name())
msg['type'] = 'chat'
msg['body'] = line
msg.send()
def command_otr(self, args):
"""
A command to start or end OTR encryption
"""
args = args.split()
if not args:
return self.core.command_help("otr")
if isinstance(self.core.current_tab(), ConversationTab):
jid = JID(self.core.current_tab().get_name())
command = args[0]
if command == 'start':
otr_state = self.get_otr(self.core.current_tab())
self.otr_say(self.core.current_tab(), otr_state.start().decode())
elif command == 'end':
otr_state = self.get_otr(self.core.current_tab())
msg = otr_state.end()
if msg is not None:
self.otr_say(self.core.current_tab(), msg.decode())
elif command == 'fpr':
otr_state = self.get_otr(self.core.current_tab())
our = otr_state.our_fpr
if our:
our = hex(int.from_bytes(our, 'big'))[2:].ljust(40).upper()
their = otr_state.their_fpr
if their:
their = hex(int.from_bytes(their, 'big'))[2:].ljust(40).upper()
self.core.current_tab().add_message('Your: %s Their: %s' % (our, their))
self.core.refresh_window()
def otr_completion(self, the_input):
return the_input.auto_completion(['start', 'fpr', 'end'], '', quotify=False)
|