From f04728a82b473e4f505d1c4931296e3eab6dbf23 Mon Sep 17 00:00:00 2001 From: mathieui Date: Sat, 3 Jan 2015 16:23:34 +0100 Subject: Rework the OTR plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - remove the allow_v1 and allow_v2 options (allow_v1 will now be always false, as no one would want it to true, and allow_v2 will always be true, which should also be what everyone wants until potr supports OTRv3) - add a “require_encryption” configuration option - move all the plugin-generated messages to module level, to improve readability - split up some parts and add comments --- plugins/otr.py | 597 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 320 insertions(+), 277 deletions(-) (limited to 'plugins') diff --git a/plugins/otr.py b/plugins/otr.py index fcb1640f..3fc8c65e 100644 --- a/plugins/otr.py +++ b/plugins/otr.py @@ -156,15 +156,11 @@ Configuration The directory in which you want keys and fpr to be stored. - allow_v2 - **Default:** ``true`` - - Allow OTRv2 - - allow_v1 + require_encryption **Default:** ``false`` - Allow OTRv1 + If ``true``, prevents you from sending unencrypted messages, and tries + to establish OTR sessions when receiving unencrypted messages. timeout **Default:** ``3`` @@ -174,11 +170,11 @@ Configuration value will disable this notification. log - **Default:** false + **Default:** ``false`` Log conversations (OTR start/end marker, and messages). -The :term:`allow_v1`, :term:`allow_v2`, :term:`decode_xhtml`, :term:`decode_entities` +The :term:`require_encryption`, :term:`decode_xhtml`, :term:`decode_entities` and :term:`log` configuration parameters are tab-specific. Important details @@ -216,15 +212,15 @@ from theming import get_theme, dump_tuple from decorators import command_args_parser OTR_DIR = os.path.join(os.getenv('XDG_DATA_HOME') or - '~/.local/share', 'poezio', 'otr') + '~/.local/share', 'poezio', 'otr') POLICY_FLAGS = { - 'ALLOW_V1':False, - 'ALLOW_V2':True, - 'REQUIRE_ENCRYPTION': False, - 'SEND_TAG': True, - 'WHITESPACE_START_AKE': True, - 'ERROR_START_AKE': True + 'ALLOW_V1':False, + 'ALLOW_V2':True, + 'REQUIRE_ENCRYPTION': False, + 'SEND_TAG': True, + 'WHITESPACE_START_AKE': True, + 'ERROR_START_AKE': True } log = logging.getLogger(__name__) @@ -246,7 +242,88 @@ Then use the command: /otr trust /otrsmp ask """) +OTR_NOT_ENABLED = _('%(jid_c)s%(jid)s%(info)s did not enable ' + 'OTR after %(sec)s seconds.') + +MESSAGE_NOT_SENT = _('%(info)sYour message to %(jid_c)s%(jid)s%(info)s was' + ' not sent because your configuration requires an ' + 'encrypted session.\nWait until it is established or ' + 'change your configuration.') + +OTR_REQUEST = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.') + +OTR_OWN_FPR = _('%(info)sYour OTR key fingerprint is ' + '%(normal)s%(fpr)s%(info)s.') + +OTR_REMOTE_FPR = _('%(info)sThe key fingerprint for %(jid_c)s' + '%(jid)s%(info)s is %(normal)s%(fpr)s%(info)s.') + +OTR_NO_FPR = _('%(jid_c)s%(jid)s%(info)s has no' + ' key currently in use.') + +OTR_START_TRUSTED = _('%(info)sStarted a \x19btrusted\x19o%(info)s ' + 'OTR conversation with %(jid_c)s%(jid)s') + +OTR_REFRESH_TRUSTED = _('%(info)sRefreshed \x19btrusted\x19o%(info)s' + ' OTR conversation with %(jid_c)s%(jid)s') + +OTR_START_UNTRUSTED = _('%(info)sStarted an \x19buntrusted\x19o%(info)s' + ' OTR conversation with %(jid_c)s%(jid)s') + +OTR_REFRESH_UNTRUSTED = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s' + ' OTR conversation with %(jid_c)s%(jid)s') + +OTR_END = _('%(info)sEnded OTR conversation with %(jid_c)s%(jid)s') + +SMP_REQUESTED = _('%(jid_c)s%(jid)s%(info)s has requested SMP verification' + '%(q)s%(info)s.\nAnswer with: /otrsmp answer ') + +SMP_INITIATED = _('%(info)sInitiated SMP request with ' + '%(jid_c)s%(jid)s%(info)s.') + +SMP_PROGRESS = _('%(info)sSMP progressing.') + +SMP_RECIPROCATE = _('%(info)sYou may want to authenticate your peer by asking' + ' your own question: /otrsmp ask [question] ') + +SMP_SUCCESS = _('%(info)sSMP Verification \x19bsucceeded\x19o%(info)s.') + +SMP_FAIL = _('%(info)sSMP Verification \x19bfailed\x19o%(info)s.') + +SMP_ABORTED_PEER = _('%(info)sSMP aborted by peer.') + +SMP_ABORTED = _('%(info)sSMP aborted.') + +MESSAGE_UNENCRYPTED = _('%(info)sThe following message from %(jid_c)s%(jid)s' + '%(info)s was \x19bnot\x19o%(info)s encrypted:\x19o\n' + '%(msg)s') + +MESSAGE_UNREADABLE = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s' + '%(info)s was received but is unreadable, as you are' + ' not currently communicating privately.') + +MESSAGE_INVALID = _('%(info)sThe message from %(jid_c)s%(jid)s%(info)s' + ' could not be decrypted.') + +OTR_ERROR = _('%(info)sReceived the following error from ' + '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s') + +POTR_ERROR = _('%(info)sAn unspecified error in the OTR plugin occured:\n' + '%(exc)s') + +TRUST_ADDED = _('%(info)sYou added %(jid_c)s%(bare_jid)s%(info)s with key ' + '\x19o%(key)s%(info)s to your trusted list.') + + +TRUST_REMOVED = _('%(info)sYou removed %(jid_c)s%(bare_jid)s%(info)s with ' + 'key \x19o%(key)s%(info)s from your trusted list.') + +KEY_DROPPED = _('%(info)sPrivate key dropped.') + def hl(tab): + """ + Make a tab beep and change its status. + """ if tab.state != 'current': tab.state = 'private' @@ -256,6 +333,11 @@ def hl(tab): curses.beep() class PoezioContext(Context): + """ + OTR context, specific to a conversation with a contact + + Overrides methods from potr.context.Context + """ def __init__(self, account, peer, xmpp, core): super(PoezioContext, self).__init__(account, peer) self.xmpp = xmpp @@ -264,6 +346,7 @@ class PoezioContext(Context): self.trustName = safeJID(peer).bare self.in_smp = False self.smp_own = False + self.log = 0 def getPolicy(self, key): if key in self.flags: @@ -283,9 +366,13 @@ class PoezioContext(Context): message.send() def setState(self, newstate): - color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT) + format_dict = { + 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID), + 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT), + 'jid': self.peer, + 'bare_jid': safeJID(self.peer).bare + } tab = self.core.get_tab_by_name(self.peer) if not tab: @@ -297,56 +384,25 @@ class PoezioContext(Context): if newstate == STATE_ENCRYPTED and tab: log.debug('OTR conversation with %s refreshed', self.peer) if self.getCurrentTrust(): - msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s' - ' OTR conversation with %(jid_c)s%(jid)s') % { - 'info': color_info, - 'jid_c': color_jid, - 'jid': self.peer - } + msg = OTR_REFRESH_TRUSTED % format_dict tab.add_message(msg, typ=self.log) else: - msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s' - ' OTR conversation with %(jid_c)s%(jid)s' - '%(info)s, key: \x19o%(key)s') % { - 'jid': self.peer, - 'key': self.getCurrentKey(), - 'info': color_info, - 'jid_c': color_jid} - + msg = OTR_REFRESH_UNTRUSTED % format_dict tab.add_message(msg, typ=self.log) hl(tab) elif newstate == STATE_FINISHED or newstate == STATE_PLAINTEXT: log.debug('OTR conversation with %s finished', self.peer) if tab: - tab.add_message('%sEnded OTR conversation with %s%s' % ( - color_info, color_jid, self.peer), - typ=self.log) + tab.add_message(OTR_END % format_dict, typ=self.log) hl(tab) elif newstate == STATE_ENCRYPTED and tab: if self.getCurrentTrust(): - msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s ' - 'OTR conversation with %(jid_c)s%(jid)s') % { - 'jid': self.peer, - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(msg, typ=self.log) + tab.add_message(OTR_START_TRUSTED % format_dict, typ=self.log) else: - tab.add_message(OTR_TUTORIAL % { - 'jid': safeJID(self.peer).bare, - 'remote_fpr': self.getCurrentKey(), - 'our_fpr': self.user.getPrivkey(), - 'info': color_info, - 'normal': color_normal, - 'jid_c': color_jid}, - typ=0) - msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s' - ' OTR conversation with %(jid_c)s%(jid)s' - '%(info)s, key: \x19o%(key)s') % { - 'jid': self.peer, - 'key': self.getCurrentKey(), - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(msg, typ=self.log) + format_dict['our_fpr'] = self.user.getPrivkey() + format_dict['remote_fpr'] = self.getCurrentKey() + tab.add_message(OTR_TUTORIAL % format_dict, typ=0) + tab.add_message(OTR_START_UNTRUSTED % format_dict, typ=self.log) hl(tab) log.debug('Set encryption state of %s to %s', self.peer, states[newstate]) @@ -356,6 +412,11 @@ class PoezioContext(Context): self.core.doupdate() class PoezioAccount(Account): + """ + OTR Account, keeps track of a specific account (ours) + + Redefines the load/save methods from potr.context.Account + """ def __init__(self, jid, key_dir): super(PoezioAccount, self).__init__(jid, 'xmpp', 1024) @@ -402,8 +463,7 @@ class PoezioAccount(Account): with open(self.key_dir + '.fpr', 'w') as fpr_fd: for uid, trusts in self.trusts.items(): for fpr, trustVal in trusts.items(): - fpr_fd.write('\t'.join( - (uid, self.name, 'xmpp', fpr, trustVal))) + fpr_fd.write('\t'.join((uid, self.name, 'xmpp', fpr, trustVal))) fpr_fd.write('\n') except: log.exception('Error in save_trusts', exc_info=True) @@ -414,32 +474,29 @@ class PoezioAccount(Account): savePrivkey = save_privkey states = { - STATE_PLAINTEXT: 'plaintext', - STATE_ENCRYPTED: 'encrypted', - STATE_FINISHED: 'finished', + STATE_PLAINTEXT: 'plaintext', + STATE_ENCRYPTED: 'encrypted', + STATE_FINISHED: 'finished', } class Plugin(BasePlugin): def init(self): # set the default values from the config - allow_v2 = self.config.get('allow_v2', True) - POLICY_FLAGS['ALLOW_V2'] = allow_v2 - allow_v1 = self.config.get('allow_v1', False) - POLICY_FLAGS['ALLOW_v1'] = allow_v1 - global OTR_DIR OTR_DIR = os.path.expanduser(self.config.get('keys_dir', '') or OTR_DIR) try: os.makedirs(OTR_DIR) except OSError as e: if e.errno != 17: - self.api.information('The OTR-specific folder could not be created' - ' poezio will be unable to save keys and trusts', 'OTR') + self.api.information('The OTR-specific folder could not ' + 'be created. Poezio will be unable ' + 'to save keys and trusts', 'OTR') except: - self.api.information('The OTR-specific folder could not be created' - ' poezio will be unable to save keys and trusts', 'OTR') + self.api.information('The OTR-specific folder could not ' + 'be created. Poezio will be unable ' + 'to save keys and trusts', 'OTR') self.api.add_event_handler('conversation_msg', self.on_conversation_msg) self.api.add_event_handler('private_msg', self.on_conversation_msg) @@ -470,26 +527,18 @@ class Plugin(BasePlugin): 'answer: Finish a verification\n') self.api.add_tab_command(ConversationTab, 'otrsmp', self.command_smp, - help=smp_desc, - usage=smp_usage, - short=smp_short, - completion=self.completion_smp) + help=smp_desc, usage=smp_usage, short=smp_short, + completion=self.completion_smp) self.api.add_tab_command(PrivateTab, 'otrsmp', self.command_smp, - help=smp_desc, - usage=smp_usage, - short=smp_short, - completion=self.completion_smp) + help=smp_desc, usage=smp_usage, short=smp_short, + completion=self.completion_smp) self.api.add_tab_command(ConversationTab, 'otr', self.command_otr, - help=desc, - usage=usage, - short=shortdesc, - completion=self.completion_otr) + help=desc, usage=usage, short=shortdesc, + completion=self.completion_otr) self.api.add_tab_command(PrivateTab, 'otr', self.command_otr, - help=desc, - usage=usage, - short=shortdesc, - completion=self.completion_otr) + help=desc, usage=usage, short=shortdesc, + completion=self.completion_otr) def cleanup(self): for context in self.contexts.values(): @@ -499,166 +548,158 @@ class Plugin(BasePlugin): PrivateTab.remove_information_element('otr') def get_context(self, jid): + """ + Retrieve or create an OTR context + """ jid = safeJID(jid).full if not jid in self.contexts: flags = POLICY_FLAGS.copy() - policy = self.config.get_by_tabname('encryption_policy', jid, default='ondemand').lower() + require = self.config.get_by_tabname('require_encryption', + jid, default=False) + flags['REQUIRE_ENCRYPTION'] = require logging_policy = self.config.get_by_tabname('log', jid, default='false').lower() - allow_v2 = self.config.get_by_tabname('allow_v2', jid, default='true').lower() - flags['ALLOW_V2'] = (allow_v2 != 'false') - allow_v1 = self.config.get_by_tabname('allow_v1', jid, default='false').lower() - flags['ALLOW_V1'] = (allow_v1 == 'true') self.contexts[jid] = PoezioContext(self.account, jid, self.core.xmpp, self.core) self.contexts[jid].log = 1 if logging_policy != 'false' else 0 self.contexts[jid].flags = flags return self.contexts[jid] def on_conversation_msg(self, msg, tab): - color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) + """ + Message received + """ + format_dict = { + 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID), + 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'jid': msg['from'] + } try: ctx = self.get_context(msg['from']) txt, tlvs = ctx.receiveMessage(msg["body"].encode('utf-8')) + # SMP if tlvs: - smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV) - smp1 = get_tlv(tlvs, potr.proto.SMP1TLV) - smp2 = get_tlv(tlvs, potr.proto.SMP2TLV) - smp3 = get_tlv(tlvs, potr.proto.SMP3TLV) - smp4 = get_tlv(tlvs, potr.proto.SMP4TLV) - abort = get_tlv(tlvs, potr.proto.SMPABORTTLV) - if abort: - ctx.reset_smp() - tab.add_message('%sSMP aborted by peer.' % color_info, typ=0) - elif ctx.in_smp and not ctx.smpIsValid(): - ctx.reset_smp() - tab.add_message('%sSMP aborted.' % color_info, typ=0) - elif smp1 or smp1q: - if smp1q: - try: - question = ' with question: \x19o' + smp1q.msg.decode('utf-8') - except UnicodeDecodeError: - self.api.information('The peer sent a question but it had a wrong encoding', 'Error') - question = '' - else: - question = '' - ctx.in_smp = True - ctx.smp_own = False - tab.add_message('%(info)sPeer %(jid_c)s%(jid)s%(info)s has ' - 'requested SMP verification%(q)s%(info)s.\n' - 'Answer with: /otrsmp answer ' % { - 'q': question, - 'info': color_info, - 'jid': tab.name, - 'jid_c': color_jid}, typ=0) - elif smp2: - if not ctx.in_smp: - ctx.reset_smp() - else: - tab.add_message('%sSMP progressing.' % color_info, typ=0) - elif smp3 or smp4: - if ctx.smpIsSuccess(): - if not ctx.getCurrentTrust(): - tab.add_message('%sYou may want to authenticate ' - 'your peer by asking your own ' - 'question: /otrsmp ask [question]' - ' ' % color_info, - typ=0) - ctx.reset_smp() - tab.add_message('%sSMP Verification \x19bsucceeded' % color_info, - typ=0) - #self.smp_finish('SMP verification succeeded.', 'success') - else: - ctx.reset_smp() - tab.add_message('%sSMP Verification \x19bfailed' % color_info, - typ=0) - #self.smp_finish('SMP verification failed.', 'error') - self.core.refresh_window() - - + self.handle_tlvs(tlvs, ctx, tab, format_dict) except UnencryptedMessage as err: # received an unencrypted message inside an OTR session - text = _('%(info)sThe following message from %(jid_c)s%(jid)s' - '%(info)s was \x19bnot\x19o%(info)s encrypted:' - '\x19o\n%(msg)s') % { - 'info': color_info, - 'jid_c': color_jid, - 'jid': msg['from'], - 'msg': err.args[0].decode('utf-8')} - tab.add_message(text, jid=msg['from'], - typ=0) - del msg['body'] - del msg['html'] - hl(tab) - self.core.refresh_window() + self.unencrypted_message_received(err, ctx, msg, tab, format_dict) + self.command_otr('start') + return + except NotOTRMessage as err: + # ignore non-otr messages + # if we expected an OTR message, we would have + # got an UnencryptedMesssage + # but do an additional check because of a bug with potr and py3k + if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'): + self.unencrypted_message_received(err, ctx, msg, tab, format_dict) + self.command_otr('start') return except ErrorReceived as err: # Received an OTR error - text = _('%(info)sReceived the following error from ' - '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s') % { - 'jid': msg['from'], - 'err': err.args[0], - 'info': color_info, - 'jid_c': color_jid} - - tab.add_message(text, typ=0) + format_dict['err'] = err.args[0] + tab.add_message(OTR_ERROR % format_dict, typ=0) del msg['body'] del msg['html'] hl(tab) self.core.refresh_window() return - except NotOTRMessage as err: - # ignore non-otr messages - # if we expected an OTR message, we would have - # got an UnencryptedMesssage - # but do an additional check because of a bug with py3k - if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'): - - text = _('%(info)sThe following message from ' - '%(jid_c)s%(jid)s%(info)s was \x19b' - 'not\x19o%(info)s encrypted:\x19o\n%(msg)s') % { - 'jid': msg['from'], - 'msg': err.args[0].decode('utf-8'), - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(text, jid=msg['from'], - typ=ctx.log) - del msg['body'] - del msg['html'] - hl(tab) - self.core.refresh_window() - return - return except NotEncryptedError as err: - text = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s' - '%(info)s was received but is unreadable, as you are' - ' not currently communicating privately.') % { - 'info': color_info, - 'jid_c': color_jid, - 'jid': msg['from']} - tab.add_message(text, jid=msg['from'], - typ=0) + # Encrypted message received, but unreadable as we do not have + # an OTR session in place. + text = MESSAGE_UNREADABLE % format_dict + tab.add_message(text, jid=msg['from'], typ=0) hl(tab) del msg['body'] del msg['html'] self.core.refresh_window() return except crypt.InvalidParameterError: - tab.add_message('%sThe message from %s%s%s could not be decrypted.' - % (color_info, color_jid, msg['from'], color_info), - jid=msg['from'], typ=0) + # Malformed OTR payload and stuff + text = MESSAGE_INVALID % format_dict + tab.add_message(text, jid=msg['from'], typ=0) hl(tab) del msg['body'] del msg['html'] self.core.refresh_window() return - except: - tab.add_message('%sAn unspecified error in the OTR plugin occured' - % color_info, - typ=0) + except Exception: + # Unexpected error + import traceback + exc = traceback.format_exc() + format_dict['exc'] = exc + tab.add_message(POTR_ERROR % format_dict, typ=0) log.error('Unspecified error in the OTR plugin', exc_info=True) return + # No error, proceed with the message + self.encrypted_message_received(msg, ctx, tab, txt) + + def handle_tlvs(self, tlvs, ctx, tab, format_dict): + """ + If the message had a TLV, it means we received part of an SMP + exchange. + """ + smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV) + smp1 = get_tlv(tlvs, potr.proto.SMP1TLV) + smp2 = get_tlv(tlvs, potr.proto.SMP2TLV) + smp3 = get_tlv(tlvs, potr.proto.SMP3TLV) + smp4 = get_tlv(tlvs, potr.proto.SMP4TLV) + abort = get_tlv(tlvs, potr.proto.SMPABORTTLV) + if abort: + ctx.reset_smp() + tab.add_message(SMP_ABORTED_PEER % format_dict, typ=0) + elif ctx.in_smp and not ctx.smpIsValid(): + ctx.reset_smp() + tab.add_message(SMP_ABORTED % format_dict, typ=0) + elif smp1 or smp1q: + # Received an SMP request (with a question or not) + if smp1q: + try: + question = ' with question: \x19o' + smp1q.msg.decode('utf-8') + except UnicodeDecodeError: + self.api.information('The peer sent a question but it had a wrong encoding', 'Error') + question = '' + else: + question = '' + ctx.in_smp = True + # we did not initiate it + ctx.smp_own = False + format_dict['q'] = question + tab.add_message(SMP_REQUESTED % format_dict, typ=0) + elif smp2: + # SMP reply received + if not ctx.in_smp: + ctx.reset_smp() + else: + tab.add_message(SMP_PROGRESS % format_dict, typ=0) + elif smp3 or smp4: + # Type 4 (SMP message 3) or 5 (SMP message 4) TLVs received + # in both cases it is the final message of the SMP exchange + if ctx.smpIsSuccess(): + tab.add_message(SMP_SUCCESS % format_dict, typ=0) + if not ctx.getCurrentTrust(): + tab.add_message(SMP_RECIPROCATE % format_dict, typ=0) + else: + tab.add_message(SMP_FAIL % format_dict, typ=0) + ctx.reset_smp() + self.core.refresh_window() + def unencrypted_message_received(self, err, ctx, msg, tab, format_dict): + """ + An unencrypted message was received while we expected it to be + encrypted. Display it with a warning. + """ + format_dict['msg'] = err.args[0].decode('utf-8') + text = MESSAGE_UNENCRYPTED % format_dict + tab.add_message(text, jid=msg['from'], typ=ctx.log) + del msg['body'] + del msg['html'] + hl(tab) + self.core.refresh_window() + + def encrypted_message_received(self, msg, ctx, tab, txt): + """ + A properly encrypted message was received, so we add it to the + buffer, and try to format it according to the configuration. + """ # remove xhtml del msg['html'] del msg['body'] @@ -700,6 +741,13 @@ class Plugin(BasePlugin): del msg['body'] def find_encrypted_context_with_matching(self, bare_jid): + """ + Find an OTR session from a bare JID. + + Useful when a dynamic tab unlocks, which would lead to sending + unencrypted messages until it locks again, if we didn’t fallback + with this. + """ for ctx in self.contexts: if safeJID(ctx).bare == bare_jid and self.contexts[ctx].state == STATE_ENCRYPTED: return self.contexts[ctx] @@ -717,6 +765,12 @@ class Plugin(BasePlugin): name = tab.name jid = safeJID(tab.name) + format_dict = { + 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID), + 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'jid': name, + } + ctx = self.contexts.get(name) if isinstance(tab, DynamicConversationTab) and not tab.locked_resource: log.debug('Unlocked tab %s found, falling back to the first encrypted chat we find.', name) @@ -728,17 +782,26 @@ class Plugin(BasePlugin): tab.send_chat_state('inactive', always_send=True) tab.add_message(msg['body'], - nickname=self.core.own_nick or tab.own_nick, - nick_color=get_theme().COLOR_OWN_NICK, - identifier=msg['id'], - jid=self.core.xmpp.boundjid, - typ=ctx.log) + nickname=self.core.own_nick or tab.own_nick, + nick_color=get_theme().COLOR_OWN_NICK, + identifier=msg['id'], + jid=self.core.xmpp.boundjid, + typ=ctx.log) # remove everything from the message so that it doesn’t get sent del msg['body'] del msg['replace'] del msg['html'] + elif ctx.getPolicy('REQUIRE_ENCRYPTION'): + tab.add_message(MESSAGE_NOT_SENT % format_dict, typ=0) + del msg['body'] + del msg['replace'] + del msg['html'] + self.command_otr('start') def display_encryption_status(self, jid): + """ + Returns the text to display in the infobar (the OTR status) + """ context = self.get_context(jid) if safeJID(jid).bare == jid and context.state != STATE_ENCRYPTED: ctx = self.find_encrypted_context_with_matching(jid) @@ -759,13 +822,18 @@ class Plugin(BasePlugin): action = args.pop(0) tab = self.api.current_tab() name = tab.name - color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) - color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT) if isinstance(tab, DynamicConversationTab) and tab.locked_resource: name = safeJID(tab.name) name.resource = tab.locked_resource name = name.full + format_dict = { + 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID), + 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT), + 'jid': name, + 'bare_jid': safeJID(name).bare + } + if action == 'end': # close the session context = self.get_context(name) context.disconnect() @@ -788,58 +856,34 @@ class Plugin(BasePlugin): name = name.full otr = self.get_context(name) if otr.state != STATE_ENCRYPTED: - text = _('%(jid_c)s%(jid)s%(info)s did not enable' - ' OTR after %(sec)s seconds.') % { - 'jid': tab.name, - 'info': color_info, - 'jid_c': color_jid, - 'sec': secs} + format_dict['secs'] = secs + text = OTR_NOT_ENABLED % format_dict tab.add_message(text, typ=0) self.core.refresh_window() if secs > 0: event = self.api.create_delayed_event(secs, notify_otr_timeout) self.api.add_timed_event(event) - self.core.xmpp.send_message(mto=name, mtype='chat', - mbody=self.contexts[name].sendMessage(0, b'?OTRv?').decode()) - text = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.') % { - 'jid': tab.name, - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(text, typ=0) + body = self.contexts[name].sendMessage(0, b'?OTRv?').decode() + self.core.xmpp.send_message(mto=name, mtype='chat', mbody=body) + tab.add_message(OTR_REQUEST % format_dict, typ=0) elif action == 'ourfpr': - fpr = self.account.getPrivkey() - text = _('%(info)sYour OTR key fingerprint is %(norm)s%(fpr)s.') % { - 'jid': tab.name, - 'info': color_info, - 'norm': color_normal, - 'fpr': fpr} - tab.add_message(text, typ=0) + format_dict['fpr'] = self.account.getPrivkey() + tab.add_message(OTR_OWN_FPR % format_dict, typ=0) elif action == 'fpr': if name in self.contexts: ctx = self.contexts[name] if ctx.getCurrentKey() is not None: - text = _('%(info)sThe key fingerprint for %(jid_c)s' - '%(jid)s%(info)s is %(norm)s%(fpr)s%(info)s.') % { - 'jid': tab.name, - 'info': color_info, - 'norm': color_normal, - 'jid_c': color_jid, - 'fpr': ctx.getCurrentKey()} - tab.add_message(text, typ=0) + format_dict['fpr'] = ctx.getCurrentKey() + tab.add_message(OTR_REMOTE_FPR % format_dict, typ=0) else: - text = _('%(jid_c)s%(jid)s%(info)s has no' - ' key currently in use.') % { - 'jid': tab.name, - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(text, typ=0) + tab.add_message(OTR_NO_FPR % format_dict, typ=0) elif action == 'drop': # drop the privkey (and obviously, end the current conversations before that) for context in self.contexts.values(): if context.state not in (STATE_FINISHED, STATE_PLAINTEXT): context.disconnect() self.account.drop_privkey() - tab.add_message('%sPrivate key dropped.' % color_info, typ=0) + tab.add_message(KEY_DROPPED % format_dict, typ=0) elif action == 'trust': ctx = self.get_context(name) key = ctx.getCurrentKey() @@ -848,15 +892,10 @@ class Plugin(BasePlugin): else: return if not ctx.getCurrentTrust(): + format_dict['key'] = key ctx.setTrust(fpr, 'verified') self.account.saveTrusts() - text = _('%(info)sYou added %(jid_c)s%(jid)s%(info)s with key ' - '\x19o%(key)s%(info)s to your trusted list.') % { - 'jid': ctx.trustName, - 'key': key, - 'info': color_info, - 'jid_c': color_jid} - tab.add_message(text, typ=0) + tab.add_message(TRUST_ADDED % format_dict, typ=0) elif action == 'untrust': ctx = self.get_context(name) key = ctx.getCurrentKey() @@ -865,19 +904,17 @@ class Plugin(BasePlugin): else: return if ctx.getCurrentTrust(): + format_dict['key'] = key ctx.setTrust(fpr, '') self.account.saveTrusts() - text = _('%(info)sYou removed %(jid_c)s%(jid)s%(info)s with ' - 'key \x19o%(key)s%(info)s from your trusted list.') % { - 'jid': ctx.trustName, - 'key': key, - 'info': color_info, - 'jid_c': color_jid} - - tab.add_message(text, typ=0) + tab.add_message(TRUST_REMOVED % format_dict, typ=0) self.core.refresh_window() - def completion_otr(self, the_input): + @staticmethod + def completion_otr(the_input): + """ + Completion for /otr + """ comp = ['start', 'fpr', 'ourfpr', 'refresh', 'end', 'trust', 'untrust'] return the_input.new_completion(comp, 1, quotify=False) @@ -888,8 +925,6 @@ class Plugin(BasePlugin): """ if args is None or not args: return self.core.command_help('otrsmp') - color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID) - color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT) length = len(args) action = args.pop(0) if length == 2: @@ -908,9 +943,18 @@ class Plugin(BasePlugin): name.resource = tab.locked_resource name = name.full + format_dict = { + 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID), + 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT), + 'jid': name, + 'bare_jid': safeJID(name).bare + } + ctx = self.get_context(name) if ctx.state != STATE_ENCRYPTED: - return self.api.information('The current conversation is not encrypted', 'Error') + self.api.information('The current conversation is not encrypted', + 'Error') + return if action == 'ask': ctx.in_smp = True @@ -919,24 +963,23 @@ class Plugin(BasePlugin): ctx.smpInit(secret, question) else: ctx.smpInit(secret) - tab.add_message('%(info)sInitiated SMP request with %(jid_c)s' - '%(jid)s%(info)s.' % { - 'info': color_info, - 'jid': name, - 'jid_c': color_jid}, typ=0) + tab.add_message(SMP_INITIATED % format_dict, typ=0) elif action == 'answer': ctx.smpGotSecret(secret) elif action == 'abort': if ctx.in_smp: ctx.smpAbort() - tab.add_message('%sSMP aborted.' % color_info, typ=0) + tab.add_message(SMP_ABORTED % format_dict, typ=0) self.core.refresh_window() - def completion_smp(self, the_input): + @staticmethod + def completion_smp(the_input): + """Completion for /otrsmp""" if the_input.get_argument_position() == 1: return the_input.new_completion(['ask', 'answer', 'abort'], 1, quotify=False) def get_tlv(tlvs, cls): + """Find the instance of a class in a list""" for tlv in tlvs: if isinstance(tlv, cls): return tlv -- cgit v1.2.3