summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/otr.py280
1 files changed, 228 insertions, 52 deletions
diff --git a/plugins/otr.py b/plugins/otr.py
index 7488ed46..fcb1640f 100644
--- a/plugins/otr.py
+++ b/plugins/otr.py
@@ -71,6 +71,14 @@ Command added to Conversation Tabs and Private Tabs:
*NOT* with multiple rewrites in a secure manner, you should do that
yourself if you want to be sure.
+ /otrsmp
+ **Usage:** ``/otrsmp <ask|answer|abort> [question] [secret]``
+
+ Verify the identify of your contact by using a pre-defined secret.
+
+ - The ``abort`` command aborts an ongoing verification
+ - The ``ask`` command start a verification, with a question or not
+ - The ``answer`` command answers a verification and ends the smp session
To use OTR, make sure the plugin is loaded (if not, then do ``/load otr``).
@@ -198,12 +206,14 @@ import curses
from potr.context import NotEncryptedError, UnencryptedMessage, ErrorReceived, NotOTRMessage,\
STATE_ENCRYPTED, STATE_PLAINTEXT, STATE_FINISHED, Context, Account, crypt
+import common
import xhtml
from common import safeJID
from config import config
from plugin import BasePlugin
from tabs import ConversationTab, DynamicConversationTab, PrivateTab
from theming import get_theme, dump_tuple
+from decorators import command_args_parser
OTR_DIR = os.path.join(os.getenv('XDG_DATA_HOME') or
'~/.local/share', 'poezio', 'otr')
@@ -220,6 +230,22 @@ POLICY_FLAGS = {
log = logging.getLogger(__name__)
+OTR_TUTORIAL = _(
+"""%(info)sThis contact has not yet been verified.
+You have several methods of authentication available:
+
+1) Verify each other's fingerprints using a secure (and different) channel:
+Your fingerprint: %(normal)s%(our_fpr)s%(info)s
+%(jid_c)s%(jid)s%(info)s's fingerprint: %(normal)s%(remote_fpr)s%(info)s
+Then use the command: /otr trust
+
+2) SMP pre-shared secret you both know:
+/otrsmp ask <secret>
+
+3) SMP pre-shared secret you both know with a question:
+/otrsmp ask <question> <secret>
+""")
+
def hl(tab):
if tab.state != 'current':
tab.state = 'private'
@@ -236,6 +262,8 @@ class PoezioContext(Context):
self.core = core
self.flags = {}
self.trustName = safeJID(peer).bare
+ self.in_smp = False
+ self.smp_own = False
def getPolicy(self, key):
if key in self.flags:
@@ -243,6 +271,10 @@ class PoezioContext(Context):
else:
return False
+ def reset_smp(self):
+ self.in_smp = False
+ self.smp_own = False
+
def inject(self, msg, appdata=None):
message = self.xmpp.make_message(mto=self.peer,
mbody=msg.decode('ascii'),
@@ -253,6 +285,7 @@ class PoezioContext(Context):
def setState(self, newstate):
color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT)
tab = self.core.get_tab_by_name(self.peer)
if not tab:
@@ -261,28 +294,27 @@ class PoezioContext(Context):
if tab and not tab.locked_resource == safeJID(self.peer).resource:
tab = None
if self.state == STATE_ENCRYPTED:
- if newstate == STATE_ENCRYPTED:
+ if newstate == STATE_ENCRYPTED and tab:
log.debug('OTR conversation with %s refreshed', self.peer)
- if tab:
- if self.getCurrentTrust():
- msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': self.peer
- }
- tab.add_message(msg, typ=self.log)
- else:
- msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(msg, typ=self.log)
- hl(tab)
+ if self.getCurrentTrust():
+ msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s') % {
+ 'info': color_info,
+ 'jid_c': color_jid,
+ 'jid': self.peer
+ }
+ tab.add_message(msg, typ=self.log)
+ else:
+ msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s'
+ '%(info)s, key: \x19o%(key)s') % {
+ 'jid': self.peer,
+ 'key': self.getCurrentKey(),
+ 'info': color_info,
+ 'jid_c': color_jid}
+
+ tab.add_message(msg, typ=self.log)
+ hl(tab)
elif newstate == STATE_FINISHED or newstate == STATE_PLAINTEXT:
log.debug('OTR conversation with %s finished', self.peer)
if tab:
@@ -290,26 +322,32 @@ class PoezioContext(Context):
color_info, color_jid, self.peer),
typ=self.log)
hl(tab)
- else:
- if newstate == STATE_ENCRYPTED:
- if tab:
- if self.getCurrentTrust():
- msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
- 'OTR conversation with %(jid_c)s%(jid)s') % {
- 'jid': self.peer,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
- else:
- msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
- hl(tab)
+ elif newstate == STATE_ENCRYPTED and tab:
+ if self.getCurrentTrust():
+ msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
+ 'OTR conversation with %(jid_c)s%(jid)s') % {
+ 'jid': self.peer,
+ 'info': color_info,
+ 'jid_c': color_jid}
+ tab.add_message(msg, typ=self.log)
+ else:
+ tab.add_message(OTR_TUTORIAL % {
+ 'jid': safeJID(self.peer).bare,
+ 'remote_fpr': self.getCurrentKey(),
+ 'our_fpr': self.user.getPrivkey(),
+ 'info': color_info,
+ 'normal': color_normal,
+ 'jid_c': color_jid},
+ typ=0)
+ msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s'
+ '%(info)s, key: \x19o%(key)s') % {
+ 'jid': self.peer,
+ 'key': self.getCurrentKey(),
+ 'info': color_info,
+ 'jid_c': color_jid}
+ tab.add_message(msg, typ=self.log)
+ hl(tab)
log.debug('Set encryption state of %s to %s', self.peer, states[newstate])
super(PoezioContext, self).setState(newstate)
@@ -414,7 +452,7 @@ class Plugin(BasePlugin):
self.account = PoezioAccount(self.core.xmpp.boundjid.bare, OTR_DIR)
self.account.load_trusts()
self.contexts = {}
- usage = '[start|refresh|end|fpr|ourfpr|drop|trust|untrust]'
+ usage = '<start|refresh|end|fpr|ourfpr|drop|trust|untrust>'
shortdesc = 'Manage an OTR conversation'
desc = ('Manage an OTR conversation.\n'
'start/refresh: Start or refresh a conversation\n'
@@ -424,6 +462,24 @@ class Plugin(BasePlugin):
'drop: Remove the current key (FOREVER)\n'
'trust: Set this key for this contact as trusted\n'
'untrust: Remove the trust for the key of this contact\n')
+ smp_usage = '<abort|ask|answer> [question] [answer]'
+ smp_short = 'Identify a contact'
+ smp_desc = ('Verify the identify of your contact by using a pre-defined secret.\n'
+ 'abort: Abort an ongoing verification\n'
+ 'ask: Start a verification, with a question or not\n'
+ 'answer: Finish a verification\n')
+
+ self.api.add_tab_command(ConversationTab, 'otrsmp', self.command_smp,
+ help=smp_desc,
+ usage=smp_usage,
+ short=smp_short,
+ completion=self.completion_smp)
+ self.api.add_tab_command(PrivateTab, 'otrsmp', self.command_smp,
+ help=smp_desc,
+ usage=smp_usage,
+ short=smp_short,
+ completion=self.completion_smp)
+
self.api.add_tab_command(ConversationTab, 'otr', self.command_otr,
help=desc,
usage=usage,
@@ -463,6 +519,63 @@ class Plugin(BasePlugin):
try:
ctx = self.get_context(msg['from'])
txt, tlvs = ctx.receiveMessage(msg["body"].encode('utf-8'))
+
+ if tlvs:
+ smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV)
+ smp1 = get_tlv(tlvs, potr.proto.SMP1TLV)
+ smp2 = get_tlv(tlvs, potr.proto.SMP2TLV)
+ smp3 = get_tlv(tlvs, potr.proto.SMP3TLV)
+ smp4 = get_tlv(tlvs, potr.proto.SMP4TLV)
+ abort = get_tlv(tlvs, potr.proto.SMPABORTTLV)
+ if abort:
+ ctx.reset_smp()
+ tab.add_message('%sSMP aborted by peer.' % color_info, typ=0)
+ elif ctx.in_smp and not ctx.smpIsValid():
+ ctx.reset_smp()
+ tab.add_message('%sSMP aborted.' % color_info, typ=0)
+ elif smp1 or smp1q:
+ if smp1q:
+ try:
+ question = ' with question: \x19o' + smp1q.msg.decode('utf-8')
+ except UnicodeDecodeError:
+ self.api.information('The peer sent a question but it had a wrong encoding', 'Error')
+ question = ''
+ else:
+ question = ''
+ ctx.in_smp = True
+ ctx.smp_own = False
+ tab.add_message('%(info)sPeer %(jid_c)s%(jid)s%(info)s has '
+ 'requested SMP verification%(q)s%(info)s.\n'
+ 'Answer with: /otrsmp answer <secret>' % {
+ 'q': question,
+ 'info': color_info,
+ 'jid': tab.name,
+ 'jid_c': color_jid}, typ=0)
+ elif smp2:
+ if not ctx.in_smp:
+ ctx.reset_smp()
+ else:
+ tab.add_message('%sSMP progressing.' % color_info, typ=0)
+ elif smp3 or smp4:
+ if ctx.smpIsSuccess():
+ if not ctx.getCurrentTrust():
+ tab.add_message('%sYou may want to authenticate '
+ 'your peer by asking your own '
+ 'question: /otrsmp ask [question]'
+ ' <secret>' % color_info,
+ typ=0)
+ ctx.reset_smp()
+ tab.add_message('%sSMP Verification \x19bsucceeded' % color_info,
+ typ=0)
+ #self.smp_finish('SMP verification succeeded.', 'success')
+ else:
+ ctx.reset_smp()
+ tab.add_message('%sSMP Verification \x19bfailed' % color_info,
+ typ=0)
+ #self.smp_finish('SMP verification failed.', 'error')
+ self.core.refresh_window()
+
+
except UnencryptedMessage as err:
# received an unencrypted message inside an OTR session
text = _('%(info)sThe following message from %(jid_c)s%(jid)s'
@@ -632,13 +745,18 @@ class Plugin(BasePlugin):
if ctx:
context = ctx
state = states[context.state]
- return ' OTR: %s' % state
+ trust = 'trusted' if context.getCurrentTrust() else 'untrusted'
+
+ return ' OTR: %s (%s)' % (state, trust)
def command_otr(self, arg):
"""
/otr [start|refresh|end|fpr|ourfpr]
"""
- arg = arg.strip()
+ args = common.shell_split(arg)
+ if not args:
+ return self.api.core.command_help('otr')
+ action = args.pop(0)
tab = self.api.current_tab()
name = tab.name
color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
@@ -648,14 +766,13 @@ class Plugin(BasePlugin):
name = safeJID(tab.name)
name.resource = tab.locked_resource
name = name.full
- if arg == 'end': # close the session
+ if action == 'end': # close the session
context = self.get_context(name)
context.disconnect()
if isinstance(tab, DynamicConversationTab) and not tab.locked_resource:
ctx = self.find_encrypted_context_with_matching(safeJID(name).bare)
ctx.disconnect()
-
- elif arg == 'start' or arg == 'refresh':
+ elif action == 'start' or action == 'refresh':
otr = self.get_context(name)
secs = self.config.get('timeout', 3)
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
@@ -689,7 +806,7 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
- elif arg == 'ourfpr':
+ elif action == 'ourfpr':
fpr = self.account.getPrivkey()
text = _('%(info)sYour OTR key fingerprint is %(norm)s%(fpr)s.') % {
'jid': tab.name,
@@ -697,7 +814,7 @@ class Plugin(BasePlugin):
'norm': color_normal,
'fpr': fpr}
tab.add_message(text, typ=0)
- elif arg == 'fpr':
+ elif action == 'fpr':
if name in self.contexts:
ctx = self.contexts[name]
if ctx.getCurrentKey() is not None:
@@ -716,14 +833,14 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
- elif arg == 'drop':
+ elif action == 'drop':
# drop the privkey (and obviously, end the current conversations before that)
for context in self.contexts.values():
if context.state not in (STATE_FINISHED, STATE_PLAINTEXT):
context.disconnect()
self.account.drop_privkey()
tab.add_message('%sPrivate key dropped.' % color_info, typ=0)
- elif arg == 'trust':
+ elif action == 'trust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@@ -740,7 +857,7 @@ class Plugin(BasePlugin):
'info': color_info,
'jid_c': color_jid}
tab.add_message(text, typ=0)
- elif arg == 'untrust':
+ elif action == 'untrust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@@ -764,3 +881,62 @@ class Plugin(BasePlugin):
comp = ['start', 'fpr', 'ourfpr', 'refresh', 'end', 'trust', 'untrust']
return the_input.new_completion(comp, 1, quotify=False)
+ @command_args_parser.quoted(1, 2)
+ def command_smp(self, args):
+ """
+ /otrsmp <ask|answer|abort> [question] [secret]
+ """
+ if args is None or not args:
+ return self.core.command_help('otrsmp')
+ color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
+ color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ length = len(args)
+ action = args.pop(0)
+ if length == 2:
+ question = None
+ secret = args.pop(0).encode('utf-8')
+ elif length == 3:
+ question = args.pop(0).encode('utf-8')
+ secret = args.pop(0).encode('utf-8')
+ else:
+ question = secret = None
+
+ tab = self.api.current_tab()
+ name = tab.name
+ if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
+ name = safeJID(tab.name)
+ name.resource = tab.locked_resource
+ name = name.full
+
+ ctx = self.get_context(name)
+ if ctx.state != STATE_ENCRYPTED:
+ return self.api.information('The current conversation is not encrypted', 'Error')
+
+ if action == 'ask':
+ ctx.in_smp = True
+ ctx.smp_own = True
+ if question:
+ ctx.smpInit(secret, question)
+ else:
+ ctx.smpInit(secret)
+ tab.add_message('%(info)sInitiated SMP request with %(jid_c)s'
+ '%(jid)s%(info)s.' % {
+ 'info': color_info,
+ 'jid': name,
+ 'jid_c': color_jid}, typ=0)
+ elif action == 'answer':
+ ctx.smpGotSecret(secret)
+ elif action == 'abort':
+ if ctx.in_smp:
+ ctx.smpAbort()
+ tab.add_message('%sSMP aborted.' % color_info, typ=0)
+ self.core.refresh_window()
+
+ def completion_smp(self, the_input):
+ if the_input.get_argument_position() == 1:
+ return the_input.new_completion(['ask', 'answer', 'abort'], 1, quotify=False)
+
+def get_tlv(tlvs, cls):
+ for tlv in tlvs:
+ if isinstance(tlv, cls):
+ return tlv