summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/otr.py597
1 files changed, 320 insertions, 277 deletions
diff --git a/plugins/otr.py b/plugins/otr.py
index fcb1640f..3fc8c65e 100644
--- a/plugins/otr.py
+++ b/plugins/otr.py
@@ -156,15 +156,11 @@ Configuration
The directory in which you want keys and fpr to be stored.
- allow_v2
- **Default:** ``true``
-
- Allow OTRv2
-
- allow_v1
+ require_encryption
**Default:** ``false``
- Allow OTRv1
+ If ``true``, prevents you from sending unencrypted messages, and tries
+ to establish OTR sessions when receiving unencrypted messages.
timeout
**Default:** ``3``
@@ -174,11 +170,11 @@ Configuration
value will disable this notification.
log
- **Default:** false
+ **Default:** ``false``
Log conversations (OTR start/end marker, and messages).
-The :term:`allow_v1`, :term:`allow_v2`, :term:`decode_xhtml`, :term:`decode_entities`
+The :term:`require_encryption`, :term:`decode_xhtml`, :term:`decode_entities`
and :term:`log` configuration parameters are tab-specific.
Important details
@@ -216,15 +212,15 @@ from theming import get_theme, dump_tuple
from decorators import command_args_parser
OTR_DIR = os.path.join(os.getenv('XDG_DATA_HOME') or
- '~/.local/share', 'poezio', 'otr')
+ '~/.local/share', 'poezio', 'otr')
POLICY_FLAGS = {
- 'ALLOW_V1':False,
- 'ALLOW_V2':True,
- 'REQUIRE_ENCRYPTION': False,
- 'SEND_TAG': True,
- 'WHITESPACE_START_AKE': True,
- 'ERROR_START_AKE': True
+ 'ALLOW_V1':False,
+ 'ALLOW_V2':True,
+ 'REQUIRE_ENCRYPTION': False,
+ 'SEND_TAG': True,
+ 'WHITESPACE_START_AKE': True,
+ 'ERROR_START_AKE': True
}
log = logging.getLogger(__name__)
@@ -246,7 +242,88 @@ Then use the command: /otr trust
/otrsmp ask <question> <secret>
""")
+OTR_NOT_ENABLED = _('%(jid_c)s%(jid)s%(info)s did not enable '
+ 'OTR after %(sec)s seconds.')
+
+MESSAGE_NOT_SENT = _('%(info)sYour message to %(jid_c)s%(jid)s%(info)s was'
+ ' not sent because your configuration requires an '
+ 'encrypted session.\nWait until it is established or '
+ 'change your configuration.')
+
+OTR_REQUEST = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.')
+
+OTR_OWN_FPR = _('%(info)sYour OTR key fingerprint is '
+ '%(normal)s%(fpr)s%(info)s.')
+
+OTR_REMOTE_FPR = _('%(info)sThe key fingerprint for %(jid_c)s'
+ '%(jid)s%(info)s is %(normal)s%(fpr)s%(info)s.')
+
+OTR_NO_FPR = _('%(jid_c)s%(jid)s%(info)s has no'
+ ' key currently in use.')
+
+OTR_START_TRUSTED = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
+ 'OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_REFRESH_TRUSTED = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_START_UNTRUSTED = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_REFRESH_UNTRUSTED = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_END = _('%(info)sEnded OTR conversation with %(jid_c)s%(jid)s')
+
+SMP_REQUESTED = _('%(jid_c)s%(jid)s%(info)s has requested SMP verification'
+ '%(q)s%(info)s.\nAnswer with: /otrsmp answer <secret>')
+
+SMP_INITIATED = _('%(info)sInitiated SMP request with '
+ '%(jid_c)s%(jid)s%(info)s.')
+
+SMP_PROGRESS = _('%(info)sSMP progressing.')
+
+SMP_RECIPROCATE = _('%(info)sYou may want to authenticate your peer by asking'
+ ' your own question: /otrsmp ask [question] <secret>')
+
+SMP_SUCCESS = _('%(info)sSMP Verification \x19bsucceeded\x19o%(info)s.')
+
+SMP_FAIL = _('%(info)sSMP Verification \x19bfailed\x19o%(info)s.')
+
+SMP_ABORTED_PEER = _('%(info)sSMP aborted by peer.')
+
+SMP_ABORTED = _('%(info)sSMP aborted.')
+
+MESSAGE_UNENCRYPTED = _('%(info)sThe following message from %(jid_c)s%(jid)s'
+ '%(info)s was \x19bnot\x19o%(info)s encrypted:\x19o\n'
+ '%(msg)s')
+
+MESSAGE_UNREADABLE = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s'
+ '%(info)s was received but is unreadable, as you are'
+ ' not currently communicating privately.')
+
+MESSAGE_INVALID = _('%(info)sThe message from %(jid_c)s%(jid)s%(info)s'
+ ' could not be decrypted.')
+
+OTR_ERROR = _('%(info)sReceived the following error from '
+ '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s')
+
+POTR_ERROR = _('%(info)sAn unspecified error in the OTR plugin occured:\n'
+ '%(exc)s')
+
+TRUST_ADDED = _('%(info)sYou added %(jid_c)s%(bare_jid)s%(info)s with key '
+ '\x19o%(key)s%(info)s to your trusted list.')
+
+
+TRUST_REMOVED = _('%(info)sYou removed %(jid_c)s%(bare_jid)s%(info)s with '
+ 'key \x19o%(key)s%(info)s from your trusted list.')
+
+KEY_DROPPED = _('%(info)sPrivate key dropped.')
+
def hl(tab):
+ """
+ Make a tab beep and change its status.
+ """
if tab.state != 'current':
tab.state = 'private'
@@ -256,6 +333,11 @@ def hl(tab):
curses.beep()
class PoezioContext(Context):
+ """
+ OTR context, specific to a conversation with a contact
+
+ Overrides methods from potr.context.Context
+ """
def __init__(self, account, peer, xmpp, core):
super(PoezioContext, self).__init__(account, peer)
self.xmpp = xmpp
@@ -264,6 +346,7 @@ class PoezioContext(Context):
self.trustName = safeJID(peer).bare
self.in_smp = False
self.smp_own = False
+ self.log = 0
def getPolicy(self, key):
if key in self.flags:
@@ -283,9 +366,13 @@ class PoezioContext(Context):
message.send()
def setState(self, newstate):
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT)
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT),
+ 'jid': self.peer,
+ 'bare_jid': safeJID(self.peer).bare
+ }
tab = self.core.get_tab_by_name(self.peer)
if not tab:
@@ -297,56 +384,25 @@ class PoezioContext(Context):
if newstate == STATE_ENCRYPTED and tab:
log.debug('OTR conversation with %s refreshed', self.peer)
if self.getCurrentTrust():
- msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': self.peer
- }
+ msg = OTR_REFRESH_TRUSTED % format_dict
tab.add_message(msg, typ=self.log)
else:
- msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
-
+ msg = OTR_REFRESH_UNTRUSTED % format_dict
tab.add_message(msg, typ=self.log)
hl(tab)
elif newstate == STATE_FINISHED or newstate == STATE_PLAINTEXT:
log.debug('OTR conversation with %s finished', self.peer)
if tab:
- tab.add_message('%sEnded OTR conversation with %s%s' % (
- color_info, color_jid, self.peer),
- typ=self.log)
+ tab.add_message(OTR_END % format_dict, typ=self.log)
hl(tab)
elif newstate == STATE_ENCRYPTED and tab:
if self.getCurrentTrust():
- msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
- 'OTR conversation with %(jid_c)s%(jid)s') % {
- 'jid': self.peer,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
+ tab.add_message(OTR_START_TRUSTED % format_dict, typ=self.log)
else:
- tab.add_message(OTR_TUTORIAL % {
- 'jid': safeJID(self.peer).bare,
- 'remote_fpr': self.getCurrentKey(),
- 'our_fpr': self.user.getPrivkey(),
- 'info': color_info,
- 'normal': color_normal,
- 'jid_c': color_jid},
- typ=0)
- msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
+ format_dict['our_fpr'] = self.user.getPrivkey()
+ format_dict['remote_fpr'] = self.getCurrentKey()
+ tab.add_message(OTR_TUTORIAL % format_dict, typ=0)
+ tab.add_message(OTR_START_UNTRUSTED % format_dict, typ=self.log)
hl(tab)
log.debug('Set encryption state of %s to %s', self.peer, states[newstate])
@@ -356,6 +412,11 @@ class PoezioContext(Context):
self.core.doupdate()
class PoezioAccount(Account):
+ """
+ OTR Account, keeps track of a specific account (ours)
+
+ Redefines the load/save methods from potr.context.Account
+ """
def __init__(self, jid, key_dir):
super(PoezioAccount, self).__init__(jid, 'xmpp', 1024)
@@ -402,8 +463,7 @@ class PoezioAccount(Account):
with open(self.key_dir + '.fpr', 'w') as fpr_fd:
for uid, trusts in self.trusts.items():
for fpr, trustVal in trusts.items():
- fpr_fd.write('\t'.join(
- (uid, self.name, 'xmpp', fpr, trustVal)))
+ fpr_fd.write('\t'.join((uid, self.name, 'xmpp', fpr, trustVal)))
fpr_fd.write('\n')
except:
log.exception('Error in save_trusts', exc_info=True)
@@ -414,32 +474,29 @@ class PoezioAccount(Account):
savePrivkey = save_privkey
states = {
- STATE_PLAINTEXT: 'plaintext',
- STATE_ENCRYPTED: 'encrypted',
- STATE_FINISHED: 'finished',
+ STATE_PLAINTEXT: 'plaintext',
+ STATE_ENCRYPTED: 'encrypted',
+ STATE_FINISHED: 'finished',
}
class Plugin(BasePlugin):
def init(self):
# set the default values from the config
- allow_v2 = self.config.get('allow_v2', True)
- POLICY_FLAGS['ALLOW_V2'] = allow_v2
- allow_v1 = self.config.get('allow_v1', False)
- POLICY_FLAGS['ALLOW_v1'] = allow_v1
-
global OTR_DIR
OTR_DIR = os.path.expanduser(self.config.get('keys_dir', '') or OTR_DIR)
try:
os.makedirs(OTR_DIR)
except OSError as e:
if e.errno != 17:
- self.api.information('The OTR-specific folder could not be created'
- ' poezio will be unable to save keys and trusts', 'OTR')
+ self.api.information('The OTR-specific folder could not '
+ 'be created. Poezio will be unable '
+ 'to save keys and trusts', 'OTR')
except:
- self.api.information('The OTR-specific folder could not be created'
- ' poezio will be unable to save keys and trusts', 'OTR')
+ self.api.information('The OTR-specific folder could not '
+ 'be created. Poezio will be unable '
+ 'to save keys and trusts', 'OTR')
self.api.add_event_handler('conversation_msg', self.on_conversation_msg)
self.api.add_event_handler('private_msg', self.on_conversation_msg)
@@ -470,26 +527,18 @@ class Plugin(BasePlugin):
'answer: Finish a verification\n')
self.api.add_tab_command(ConversationTab, 'otrsmp', self.command_smp,
- help=smp_desc,
- usage=smp_usage,
- short=smp_short,
- completion=self.completion_smp)
+ help=smp_desc, usage=smp_usage, short=smp_short,
+ completion=self.completion_smp)
self.api.add_tab_command(PrivateTab, 'otrsmp', self.command_smp,
- help=smp_desc,
- usage=smp_usage,
- short=smp_short,
- completion=self.completion_smp)
+ help=smp_desc, usage=smp_usage, short=smp_short,
+ completion=self.completion_smp)
self.api.add_tab_command(ConversationTab, 'otr', self.command_otr,
- help=desc,
- usage=usage,
- short=shortdesc,
- completion=self.completion_otr)
+ help=desc, usage=usage, short=shortdesc,
+ completion=self.completion_otr)
self.api.add_tab_command(PrivateTab, 'otr', self.command_otr,
- help=desc,
- usage=usage,
- short=shortdesc,
- completion=self.completion_otr)
+ help=desc, usage=usage, short=shortdesc,
+ completion=self.completion_otr)
def cleanup(self):
for context in self.contexts.values():
@@ -499,166 +548,158 @@ class Plugin(BasePlugin):
PrivateTab.remove_information_element('otr')
def get_context(self, jid):
+ """
+ Retrieve or create an OTR context
+ """
jid = safeJID(jid).full
if not jid in self.contexts:
flags = POLICY_FLAGS.copy()
- policy = self.config.get_by_tabname('encryption_policy', jid, default='ondemand').lower()
+ require = self.config.get_by_tabname('require_encryption',
+ jid, default=False)
+ flags['REQUIRE_ENCRYPTION'] = require
logging_policy = self.config.get_by_tabname('log', jid, default='false').lower()
- allow_v2 = self.config.get_by_tabname('allow_v2', jid, default='true').lower()
- flags['ALLOW_V2'] = (allow_v2 != 'false')
- allow_v1 = self.config.get_by_tabname('allow_v1', jid, default='false').lower()
- flags['ALLOW_V1'] = (allow_v1 == 'true')
self.contexts[jid] = PoezioContext(self.account, jid, self.core.xmpp, self.core)
self.contexts[jid].log = 1 if logging_policy != 'false' else 0
self.contexts[jid].flags = flags
return self.contexts[jid]
def on_conversation_msg(self, msg, tab):
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ """
+ Message received
+ """
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': msg['from']
+ }
try:
ctx = self.get_context(msg['from'])
txt, tlvs = ctx.receiveMessage(msg["body"].encode('utf-8'))
+ # SMP
if tlvs:
- smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV)
- smp1 = get_tlv(tlvs, potr.proto.SMP1TLV)
- smp2 = get_tlv(tlvs, potr.proto.SMP2TLV)
- smp3 = get_tlv(tlvs, potr.proto.SMP3TLV)
- smp4 = get_tlv(tlvs, potr.proto.SMP4TLV)
- abort = get_tlv(tlvs, potr.proto.SMPABORTTLV)
- if abort:
- ctx.reset_smp()
- tab.add_message('%sSMP aborted by peer.' % color_info, typ=0)
- elif ctx.in_smp and not ctx.smpIsValid():
- ctx.reset_smp()
- tab.add_message('%sSMP aborted.' % color_info, typ=0)
- elif smp1 or smp1q:
- if smp1q:
- try:
- question = ' with question: \x19o' + smp1q.msg.decode('utf-8')
- except UnicodeDecodeError:
- self.api.information('The peer sent a question but it had a wrong encoding', 'Error')
- question = ''
- else:
- question = ''
- ctx.in_smp = True
- ctx.smp_own = False
- tab.add_message('%(info)sPeer %(jid_c)s%(jid)s%(info)s has '
- 'requested SMP verification%(q)s%(info)s.\n'
- 'Answer with: /otrsmp answer <secret>' % {
- 'q': question,
- 'info': color_info,
- 'jid': tab.name,
- 'jid_c': color_jid}, typ=0)
- elif smp2:
- if not ctx.in_smp:
- ctx.reset_smp()
- else:
- tab.add_message('%sSMP progressing.' % color_info, typ=0)
- elif smp3 or smp4:
- if ctx.smpIsSuccess():
- if not ctx.getCurrentTrust():
- tab.add_message('%sYou may want to authenticate '
- 'your peer by asking your own '
- 'question: /otrsmp ask [question]'
- ' <secret>' % color_info,
- typ=0)
- ctx.reset_smp()
- tab.add_message('%sSMP Verification \x19bsucceeded' % color_info,
- typ=0)
- #self.smp_finish('SMP verification succeeded.', 'success')
- else:
- ctx.reset_smp()
- tab.add_message('%sSMP Verification \x19bfailed' % color_info,
- typ=0)
- #self.smp_finish('SMP verification failed.', 'error')
- self.core.refresh_window()
-
-
+ self.handle_tlvs(tlvs, ctx, tab, format_dict)
except UnencryptedMessage as err:
# received an unencrypted message inside an OTR session
- text = _('%(info)sThe following message from %(jid_c)s%(jid)s'
- '%(info)s was \x19bnot\x19o%(info)s encrypted:'
- '\x19o\n%(msg)s') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': msg['from'],
- 'msg': err.args[0].decode('utf-8')}
- tab.add_message(text, jid=msg['from'],
- typ=0)
- del msg['body']
- del msg['html']
- hl(tab)
- self.core.refresh_window()
+ self.unencrypted_message_received(err, ctx, msg, tab, format_dict)
+ self.command_otr('start')
+ return
+ except NotOTRMessage as err:
+ # ignore non-otr messages
+ # if we expected an OTR message, we would have
+ # got an UnencryptedMesssage
+ # but do an additional check because of a bug with potr and py3k
+ if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'):
+ self.unencrypted_message_received(err, ctx, msg, tab, format_dict)
+ self.command_otr('start')
return
except ErrorReceived as err:
# Received an OTR error
- text = _('%(info)sReceived the following error from '
- '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s') % {
- 'jid': msg['from'],
- 'err': err.args[0],
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(text, typ=0)
+ format_dict['err'] = err.args[0]
+ tab.add_message(OTR_ERROR % format_dict, typ=0)
del msg['body']
del msg['html']
hl(tab)
self.core.refresh_window()
return
- except NotOTRMessage as err:
- # ignore non-otr messages
- # if we expected an OTR message, we would have
- # got an UnencryptedMesssage
- # but do an additional check because of a bug with py3k
- if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'):
-
- text = _('%(info)sThe following message from '
- '%(jid_c)s%(jid)s%(info)s was \x19b'
- 'not\x19o%(info)s encrypted:\x19o\n%(msg)s') % {
- 'jid': msg['from'],
- 'msg': err.args[0].decode('utf-8'),
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, jid=msg['from'],
- typ=ctx.log)
- del msg['body']
- del msg['html']
- hl(tab)
- self.core.refresh_window()
- return
- return
except NotEncryptedError as err:
- text = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s'
- '%(info)s was received but is unreadable, as you are'
- ' not currently communicating privately.') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': msg['from']}
- tab.add_message(text, jid=msg['from'],
- typ=0)
+ # Encrypted message received, but unreadable as we do not have
+ # an OTR session in place.
+ text = MESSAGE_UNREADABLE % format_dict
+ tab.add_message(text, jid=msg['from'], typ=0)
hl(tab)
del msg['body']
del msg['html']
self.core.refresh_window()
return
except crypt.InvalidParameterError:
- tab.add_message('%sThe message from %s%s%s could not be decrypted.'
- % (color_info, color_jid, msg['from'], color_info),
- jid=msg['from'], typ=0)
+ # Malformed OTR payload and stuff
+ text = MESSAGE_INVALID % format_dict
+ tab.add_message(text, jid=msg['from'], typ=0)
hl(tab)
del msg['body']
del msg['html']
self.core.refresh_window()
return
- except:
- tab.add_message('%sAn unspecified error in the OTR plugin occured'
- % color_info,
- typ=0)
+ except Exception:
+ # Unexpected error
+ import traceback
+ exc = traceback.format_exc()
+ format_dict['exc'] = exc
+ tab.add_message(POTR_ERROR % format_dict, typ=0)
log.error('Unspecified error in the OTR plugin', exc_info=True)
return
+ # No error, proceed with the message
+ self.encrypted_message_received(msg, ctx, tab, txt)
+
+ def handle_tlvs(self, tlvs, ctx, tab, format_dict):
+ """
+ If the message had a TLV, it means we received part of an SMP
+ exchange.
+ """
+ smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV)
+ smp1 = get_tlv(tlvs, potr.proto.SMP1TLV)
+ smp2 = get_tlv(tlvs, potr.proto.SMP2TLV)
+ smp3 = get_tlv(tlvs, potr.proto.SMP3TLV)
+ smp4 = get_tlv(tlvs, potr.proto.SMP4TLV)
+ abort = get_tlv(tlvs, potr.proto.SMPABORTTLV)
+ if abort:
+ ctx.reset_smp()
+ tab.add_message(SMP_ABORTED_PEER % format_dict, typ=0)
+ elif ctx.in_smp and not ctx.smpIsValid():
+ ctx.reset_smp()
+ tab.add_message(SMP_ABORTED % format_dict, typ=0)
+ elif smp1 or smp1q:
+ # Received an SMP request (with a question or not)
+ if smp1q:
+ try:
+ question = ' with question: \x19o' + smp1q.msg.decode('utf-8')
+ except UnicodeDecodeError:
+ self.api.information('The peer sent a question but it had a wrong encoding', 'Error')
+ question = ''
+ else:
+ question = ''
+ ctx.in_smp = True
+ # we did not initiate it
+ ctx.smp_own = False
+ format_dict['q'] = question
+ tab.add_message(SMP_REQUESTED % format_dict, typ=0)
+ elif smp2:
+ # SMP reply received
+ if not ctx.in_smp:
+ ctx.reset_smp()
+ else:
+ tab.add_message(SMP_PROGRESS % format_dict, typ=0)
+ elif smp3 or smp4:
+ # Type 4 (SMP message 3) or 5 (SMP message 4) TLVs received
+ # in both cases it is the final message of the SMP exchange
+ if ctx.smpIsSuccess():
+ tab.add_message(SMP_SUCCESS % format_dict, typ=0)
+ if not ctx.getCurrentTrust():
+ tab.add_message(SMP_RECIPROCATE % format_dict, typ=0)
+ else:
+ tab.add_message(SMP_FAIL % format_dict, typ=0)
+ ctx.reset_smp()
+ self.core.refresh_window()
+ def unencrypted_message_received(self, err, ctx, msg, tab, format_dict):
+ """
+ An unencrypted message was received while we expected it to be
+ encrypted. Display it with a warning.
+ """
+ format_dict['msg'] = err.args[0].decode('utf-8')
+ text = MESSAGE_UNENCRYPTED % format_dict
+ tab.add_message(text, jid=msg['from'], typ=ctx.log)
+ del msg['body']
+ del msg['html']
+ hl(tab)
+ self.core.refresh_window()
+
+ def encrypted_message_received(self, msg, ctx, tab, txt):
+ """
+ A properly encrypted message was received, so we add it to the
+ buffer, and try to format it according to the configuration.
+ """
# remove xhtml
del msg['html']
del msg['body']
@@ -700,6 +741,13 @@ class Plugin(BasePlugin):
del msg['body']
def find_encrypted_context_with_matching(self, bare_jid):
+ """
+ Find an OTR session from a bare JID.
+
+ Useful when a dynamic tab unlocks, which would lead to sending
+ unencrypted messages until it locks again, if we didn’t fallback
+ with this.
+ """
for ctx in self.contexts:
if safeJID(ctx).bare == bare_jid and self.contexts[ctx].state == STATE_ENCRYPTED:
return self.contexts[ctx]
@@ -717,6 +765,12 @@ class Plugin(BasePlugin):
name = tab.name
jid = safeJID(tab.name)
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': name,
+ }
+
ctx = self.contexts.get(name)
if isinstance(tab, DynamicConversationTab) and not tab.locked_resource:
log.debug('Unlocked tab %s found, falling back to the first encrypted chat we find.', name)
@@ -728,17 +782,26 @@ class Plugin(BasePlugin):
tab.send_chat_state('inactive', always_send=True)
tab.add_message(msg['body'],
- nickname=self.core.own_nick or tab.own_nick,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=msg['id'],
- jid=self.core.xmpp.boundjid,
- typ=ctx.log)
+ nickname=self.core.own_nick or tab.own_nick,
+ nick_color=get_theme().COLOR_OWN_NICK,
+ identifier=msg['id'],
+ jid=self.core.xmpp.boundjid,
+ typ=ctx.log)
# remove everything from the message so that it doesn’t get sent
del msg['body']
del msg['replace']
del msg['html']
+ elif ctx.getPolicy('REQUIRE_ENCRYPTION'):
+ tab.add_message(MESSAGE_NOT_SENT % format_dict, typ=0)
+ del msg['body']
+ del msg['replace']
+ del msg['html']
+ self.command_otr('start')
def display_encryption_status(self, jid):
+ """
+ Returns the text to display in the infobar (the OTR status)
+ """
context = self.get_context(jid)
if safeJID(jid).bare == jid and context.state != STATE_ENCRYPTED:
ctx = self.find_encrypted_context_with_matching(jid)
@@ -759,13 +822,18 @@ class Plugin(BasePlugin):
action = args.pop(0)
tab = self.api.current_tab()
name = tab.name
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT)
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
name = safeJID(tab.name)
name.resource = tab.locked_resource
name = name.full
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT),
+ 'jid': name,
+ 'bare_jid': safeJID(name).bare
+ }
+
if action == 'end': # close the session
context = self.get_context(name)
context.disconnect()
@@ -788,58 +856,34 @@ class Plugin(BasePlugin):
name = name.full
otr = self.get_context(name)
if otr.state != STATE_ENCRYPTED:
- text = _('%(jid_c)s%(jid)s%(info)s did not enable'
- ' OTR after %(sec)s seconds.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid,
- 'sec': secs}
+ format_dict['secs'] = secs
+ text = OTR_NOT_ENABLED % format_dict
tab.add_message(text, typ=0)
self.core.refresh_window()
if secs > 0:
event = self.api.create_delayed_event(secs, notify_otr_timeout)
self.api.add_timed_event(event)
- self.core.xmpp.send_message(mto=name, mtype='chat',
- mbody=self.contexts[name].sendMessage(0, b'?OTRv?').decode())
- text = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
+ body = self.contexts[name].sendMessage(0, b'?OTRv?').decode()
+ self.core.xmpp.send_message(mto=name, mtype='chat', mbody=body)
+ tab.add_message(OTR_REQUEST % format_dict, typ=0)
elif action == 'ourfpr':
- fpr = self.account.getPrivkey()
- text = _('%(info)sYour OTR key fingerprint is %(norm)s%(fpr)s.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'norm': color_normal,
- 'fpr': fpr}
- tab.add_message(text, typ=0)
+ format_dict['fpr'] = self.account.getPrivkey()
+ tab.add_message(OTR_OWN_FPR % format_dict, typ=0)
elif action == 'fpr':
if name in self.contexts:
ctx = self.contexts[name]
if ctx.getCurrentKey() is not None:
- text = _('%(info)sThe key fingerprint for %(jid_c)s'
- '%(jid)s%(info)s is %(norm)s%(fpr)s%(info)s.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'norm': color_normal,
- 'jid_c': color_jid,
- 'fpr': ctx.getCurrentKey()}
- tab.add_message(text, typ=0)
+ format_dict['fpr'] = ctx.getCurrentKey()
+ tab.add_message(OTR_REMOTE_FPR % format_dict, typ=0)
else:
- text = _('%(jid_c)s%(jid)s%(info)s has no'
- ' key currently in use.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
+ tab.add_message(OTR_NO_FPR % format_dict, typ=0)
elif action == 'drop':
# drop the privkey (and obviously, end the current conversations before that)
for context in self.contexts.values():
if context.state not in (STATE_FINISHED, STATE_PLAINTEXT):
context.disconnect()
self.account.drop_privkey()
- tab.add_message('%sPrivate key dropped.' % color_info, typ=0)
+ tab.add_message(KEY_DROPPED % format_dict, typ=0)
elif action == 'trust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
@@ -848,15 +892,10 @@ class Plugin(BasePlugin):
else:
return
if not ctx.getCurrentTrust():
+ format_dict['key'] = key
ctx.setTrust(fpr, 'verified')
self.account.saveTrusts()
- text = _('%(info)sYou added %(jid_c)s%(jid)s%(info)s with key '
- '\x19o%(key)s%(info)s to your trusted list.') % {
- 'jid': ctx.trustName,
- 'key': key,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
+ tab.add_message(TRUST_ADDED % format_dict, typ=0)
elif action == 'untrust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
@@ -865,19 +904,17 @@ class Plugin(BasePlugin):
else:
return
if ctx.getCurrentTrust():
+ format_dict['key'] = key
ctx.setTrust(fpr, '')
self.account.saveTrusts()
- text = _('%(info)sYou removed %(jid_c)s%(jid)s%(info)s with '
- 'key \x19o%(key)s%(info)s from your trusted list.') % {
- 'jid': ctx.trustName,
- 'key': key,
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(text, typ=0)
+ tab.add_message(TRUST_REMOVED % format_dict, typ=0)
self.core.refresh_window()
- def completion_otr(self, the_input):
+ @staticmethod
+ def completion_otr(the_input):
+ """
+ Completion for /otr
+ """
comp = ['start', 'fpr', 'ourfpr', 'refresh', 'end', 'trust', 'untrust']
return the_input.new_completion(comp, 1, quotify=False)
@@ -888,8 +925,6 @@ class Plugin(BasePlugin):
"""
if args is None or not args:
return self.core.command_help('otrsmp')
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
length = len(args)
action = args.pop(0)
if length == 2:
@@ -908,9 +943,18 @@ class Plugin(BasePlugin):
name.resource = tab.locked_resource
name = name.full
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': name,
+ 'bare_jid': safeJID(name).bare
+ }
+
ctx = self.get_context(name)
if ctx.state != STATE_ENCRYPTED:
- return self.api.information('The current conversation is not encrypted', 'Error')
+ self.api.information('The current conversation is not encrypted',
+ 'Error')
+ return
if action == 'ask':
ctx.in_smp = True
@@ -919,24 +963,23 @@ class Plugin(BasePlugin):
ctx.smpInit(secret, question)
else:
ctx.smpInit(secret)
- tab.add_message('%(info)sInitiated SMP request with %(jid_c)s'
- '%(jid)s%(info)s.' % {
- 'info': color_info,
- 'jid': name,
- 'jid_c': color_jid}, typ=0)
+ tab.add_message(SMP_INITIATED % format_dict, typ=0)
elif action == 'answer':
ctx.smpGotSecret(secret)
elif action == 'abort':
if ctx.in_smp:
ctx.smpAbort()
- tab.add_message('%sSMP aborted.' % color_info, typ=0)
+ tab.add_message(SMP_ABORTED % format_dict, typ=0)
self.core.refresh_window()
- def completion_smp(self, the_input):
+ @staticmethod
+ def completion_smp(the_input):
+ """Completion for /otrsmp"""
if the_input.get_argument_position() == 1:
return the_input.new_completion(['ask', 'answer', 'abort'], 1, quotify=False)
def get_tlv(tlvs, cls):
+ """Find the instance of a class in a list"""
for tlv in tlvs:
if isinstance(tlv, cls):
return tlv