From 1133baa5f01a0f1c507bed7ad4d7cafbf8292799 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sun, 25 Mar 2012 23:40:22 +0200 Subject: Add the OTR plugin --- plugins/otr.py | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 plugins/otr.py (limited to 'plugins') diff --git a/plugins/otr.py b/plugins/otr.py new file mode 100644 index 00000000..b674c0fd --- /dev/null +++ b/plugins/otr.py @@ -0,0 +1,136 @@ +import pyotr +from sleekxmpp.xmlstream.stanzabase import JID + +import logging +log = logging.getLogger(__name__) + +from plugin import BasePlugin + +import tabs +from tabs import ConversationTab + +class Plugin(BasePlugin): + def init(self): + self.contacts = {} + # a dict of {full-JID: OTR object} + self.add_event_handler('conversation_say_after', self.on_conversation_say) + self.add_event_handler('conversation_msg', self.on_conversation_msg) + + self.add_command('otr', self.command_otr, "Usage: /otr \notr: Start or stop OTR for the current conversation", self.otr_completion) + ConversationTab.add_information_element('otr', self.display_encryption_status) + + def cleanup(self): + ConversationTab.remove_information_element('otr') + self.del_tab_command(ConversationTab, 'otr') + + def otr_special(self, tab, typ): + def helper(msg): + tab.add_message('%s: %s' % (typ, msg.decode())) + return helper + + def otr_on_state_change(self, tab): + def helper(old, new): + old = self.otr_state(old) + new = self.otr_state(new) + tab.add_message('OTR state has changed from %s to %s' % (old, new)) + return helper + + def get_otr(self, tab): + if tab not in self.contacts: + self.contacts[tab] = pyotr.OTR(on_error=self.otr_special(tab, 'Error'), on_warn=self.otr_special(tab, 'Warn'), on_state_change=self.otr_on_state_change(tab)) + return self.contacts[tab] + + def on_conversation_say(self, message, tab): + """ + Feed the message through the OTR filter + """ + to = message['to'] + if not message['body']: + # there’s nothing to encrypt if this is a chatstate, for example + return + otr_state = self.get_otr(tab) + # Not sure what to do with xhtml bodies, and I don't like them anyway ;) + del message['xhtml_im'] + say = otr_state.transform_msg(message['body'].encode()) + if say is not None: + message['body'] = say.decode() + else: + del message['body'] + + def on_conversation_msg(self, message, tab): + """ + Feed the message through the OTR filter + """ + fro = message['from'] + if not message['body']: + # there’s nothing to decrypt if this is a chatstate, for example + return + otr_state = self.get_otr(tab) + # Not sure what to do with xhtml bodies, and I don't like them anyway ;) + del message['xhtml_im'] + display, reply = otr_state.handle_msg(message['body'].encode()) + #self.core.information('D: {!r}, R: {!r}'.format(display, reply)) + if display is not None: + message['body'] = display.decode() + else: + del message['body'] + if reply is not None: + self.otr_say(tab, reply.decode()) + + @staticmethod + def otr_state(state): + if state == pyotr.MSG_STATE_PLAINTEXT: + return 'plaintext' + elif state == pyotr.MSG_STATE_ENCRYPTED: + return 'encrypted' + elif state == pyotr.MSG_STATE_FINISHED: + return 'finished' + + def display_encryption_status(self, jid): + """ + Returns the status of encryption for the associated jid. This is to be used + in the ConversationTab’s InfoWin. + """ + tab = self.core.get_tab_by_name(jid, tabs.ConversationTab) + if tab not in self.contacts: + return '' + state = self.otr_state(self.contacts[tab].state) + return ' OTR: %s' % (state,) + + def otr_say(self, tab, line): + msg = self.core.xmpp.make_message(tab.get_name()) + msg['type'] = 'chat' + msg['body'] = line + msg.send() + + def command_otr(self, args): + """ + A command to start or end OTR encryption + """ + args = args.split() + if not args: + return self.core.command_help("otr") + if isinstance(self.core.current_tab(), ConversationTab): + jid = JID(self.core.current_tab().get_name()) + command = args[0] + if command == 'start': + otr_state = self.get_otr(self.core.current_tab()) + self.otr_say(self.core.current_tab(), otr_state.start().decode()) + elif command == 'end': + otr_state = self.get_otr(self.core.current_tab()) + msg = otr_state.end() + if msg is not None: + self.otr_say(self.core.current_tab(), msg.decode()) + elif command == 'fpr': + otr_state = self.get_otr(self.core.current_tab()) + our = otr_state.our_fpr + if our: + our = hex(int.from_bytes(our, 'big'))[2:].ljust(40).upper() + their = otr_state.their_fpr + if their: + their = hex(int.from_bytes(their, 'big'))[2:].ljust(40).upper() + self.core.current_tab().add_message('Your: %s Their: %s' % (our, their)) + self.core.refresh_window() + + def otr_completion(self, the_input): + return the_input.auto_completion(['start', 'end'], ' ') -- cgit v1.2.3