summaryrefslogtreecommitdiff
path: root/plugins/otr.py
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/otr.py')
-rw-r--r--plugins/otr.py781
1 files changed, 524 insertions, 257 deletions
diff --git a/plugins/otr.py b/plugins/otr.py
index 44fdb323..cceadb99 100644
--- a/plugins/otr.py
+++ b/plugins/otr.py
@@ -71,36 +71,28 @@ Command added to Conversation Tabs and Private Tabs:
*NOT* with multiple rewrites in a secure manner, you should do that
yourself if you want to be sure.
+ /otrsmp
+ **Usage:** ``/otrsmp <ask|answer|abort> [question] [secret]``
-To use OTR, make sure the plugin is loaded (if not, then do ``/load otr``).
+ Verify the identify of your contact by using a pre-defined secret.
-A simple workflow looks like this:
+ - The ``abort`` command aborts an ongoing verification
+ - The ``ask`` command start a verification, with a question or not
+ - The ``answer`` command sends back the answer and finishes the verification
-.. code-block:: none
+Managing trust
+--------------
- /otr start
+An OTR conversation can be started with a simple ``/otr start`` and the
+conversation will be encrypted. However it is very often useful to check
+that your are talking to the right person.
-The status of the OTR encryption should appear in the bar between the chat and
-the input as ``OTR: encrypted``.
+To this end, two actions are available, and a message explaining both
+will be prompted each time an **untrusted** conversation is started:
-Then you use ``fpr``/``ourfpr`` to check the fingerprints, and confirm your respective
-identities out-of-band.
-
-You can then use
-
-.. code-block:: none
-
- /otr trust
-
-To set the key as trusted, which will be shown when you start or refresh a conversation
-(the trust status will be in a bold font and if the key is untrusted, the remote fingerprint
-will be shown).
-
-Once you’re done, end the OTR session with
-
-.. code-block:: none
-
- /otr end
+- Checking the knowledge of a shared secret through the use of :term:`/otrsmp`
+- Exchanging fingerprints (``/otr fpr`` and ``/otr ourfpr``) out of band (in a secure channel) to check that both match,
+ then use ``/otr trust`` to add then to the list of trusted fingerprints for this JID.
Files
-----
@@ -128,20 +120,31 @@ Configuration
Decode embedded XHTML.
- keys_dir
- **Default:** ``$XDG_DATA_HOME/poezio/otr``
+ decode_entities
+ **Default:** ``true``
- The directory in which you want keys and fpr to be stored.
+ Decode XML and HTML entities (like ``&amp;``) even when the
+ document isn't valid (if it is valid, it will be decoded even
+ without this option).
- allow_v2
+ decode_newlines
**Default:** ``true``
- Allow OTRv2
+ Decode ``<br/>`` and ``<br>`` tags even when the document
+ isn't valid (if it is valid, it will be decoded even
+ without this option for ``<br/>``, and ``<br>`` will make
+ the document invalid anyway).
- allow_v1
+ keys_dir
+ **Default:** ``$XDG_DATA_HOME/poezio/otr``
+
+ The directory in which you want keys and fpr to be stored.
+
+ require_encryption
**Default:** ``false``
- Allow OTRv1
+ If ``true``, prevents you from sending unencrypted messages, and tries
+ to establish OTR sessions when receiving unencrypted messages.
timeout
**Default:** ``3``
@@ -151,11 +154,11 @@ Configuration
value will disable this notification.
log
- **Default:** false
+ **Default:** ``false``
Log conversations (OTR start/end marker, and messages).
-The :term:`allow_v1`, :term:`allow_v2`, :term:`decode_xhtml`
+The :term:`require_encryption`, :term:`decode_xhtml`, :term:`decode_entities`
and :term:`log` configuration parameters are tab-specific.
Important details
@@ -177,34 +180,134 @@ import logging
log = logging.getLogger(__name__)
import os
+import html
import curses
from potr.context import NotEncryptedError, UnencryptedMessage, ErrorReceived, NotOTRMessage,\
STATE_ENCRYPTED, STATE_PLAINTEXT, STATE_FINISHED, Context, Account, crypt
+import common
import xhtml
from common import safeJID
from config import config
from plugin import BasePlugin
from tabs import ConversationTab, DynamicConversationTab, PrivateTab
from theming import get_theme, dump_tuple
+from decorators import command_args_parser
OTR_DIR = os.path.join(os.getenv('XDG_DATA_HOME') or
- '~/.local/share', 'poezio', 'otr')
+ '~/.local/share', 'poezio', 'otr')
POLICY_FLAGS = {
- 'ALLOW_V1':False,
- 'ALLOW_V2':True,
- 'REQUIRE_ENCRYPTION': False,
- 'SEND_TAG': True,
- 'WHITESPACE_START_AKE': True,
- 'ERROR_START_AKE': True
+ 'ALLOW_V1':False,
+ 'ALLOW_V2':True,
+ 'REQUIRE_ENCRYPTION': False,
+ 'SEND_TAG': True,
+ 'WHITESPACE_START_AKE': True,
+ 'ERROR_START_AKE': True
}
log = logging.getLogger(__name__)
+OTR_TUTORIAL = _(
+"""%(info)sThis contact has not yet been verified.
+You have several methods of authentication available:
+
+1) Verify each other's fingerprints using a secure (and different) channel:
+Your fingerprint: %(normal)s%(our_fpr)s%(info)s
+%(jid_c)s%(jid)s%(info)s's fingerprint: %(normal)s%(remote_fpr)s%(info)s
+Then use the command: /otr trust
+
+2) SMP pre-shared secret you both know:
+/otrsmp ask <secret>
+
+3) SMP pre-shared secret you both know with a question:
+/otrsmp ask <question> <secret>
+""")
+
+OTR_NOT_ENABLED = _('%(jid_c)s%(jid)s%(info)s did not enable '
+ 'OTR after %(secs)s seconds.')
+
+MESSAGE_NOT_SENT = _('%(info)sYour message to %(jid_c)s%(jid)s%(info)s was'
+ ' not sent because your configuration requires an '
+ 'encrypted session.\nWait until it is established or '
+ 'change your configuration.')
+
+OTR_REQUEST = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.')
+
+OTR_OWN_FPR = _('%(info)sYour OTR key fingerprint is '
+ '%(normal)s%(fpr)s%(info)s.')
+
+OTR_REMOTE_FPR = _('%(info)sThe key fingerprint for %(jid_c)s'
+ '%(jid)s%(info)s is %(normal)s%(fpr)s%(info)s.')
+
+OTR_NO_FPR = _('%(jid_c)s%(jid)s%(info)s has no'
+ ' key currently in use.')
+
+OTR_START_TRUSTED = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
+ 'OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_REFRESH_TRUSTED = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_START_UNTRUSTED = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_REFRESH_UNTRUSTED = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
+ ' OTR conversation with %(jid_c)s%(jid)s')
+
+OTR_END = _('%(info)sEnded OTR conversation with %(jid_c)s%(jid)s')
+
+SMP_REQUESTED = _('%(jid_c)s%(jid)s%(info)s has requested SMP verification'
+ '%(q)s%(info)s.\nAnswer with: /otrsmp answer <secret>')
+
+SMP_INITIATED = _('%(info)sInitiated SMP request with '
+ '%(jid_c)s%(jid)s%(info)s.')
+
+SMP_PROGRESS = _('%(info)sSMP progressing.')
+
+SMP_RECIPROCATE = _('%(info)sYou may want to authenticate your peer by asking'
+ ' your own question: /otrsmp ask [question] <secret>')
+
+SMP_SUCCESS = _('%(info)sSMP Verification \x19bsucceeded\x19o%(info)s.')
+
+SMP_FAIL = _('%(info)sSMP Verification \x19bfailed\x19o%(info)s.')
+
+SMP_ABORTED_PEER = _('%(info)sSMP aborted by peer.')
+
+SMP_ABORTED = _('%(info)sSMP aborted.')
+
+MESSAGE_UNENCRYPTED = _('%(info)sThe following message from %(jid_c)s%(jid)s'
+ '%(info)s was \x19bnot\x19o%(info)s encrypted:\x19o\n'
+ '%(msg)s')
+
+MESSAGE_UNREADABLE = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s'
+ '%(info)s was received but is unreadable, as you are'
+ ' not currently communicating privately.')
+
+MESSAGE_INVALID = _('%(info)sThe message from %(jid_c)s%(jid)s%(info)s'
+ ' could not be decrypted.')
+
+OTR_ERROR = _('%(info)sReceived the following error from '
+ '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s')
+
+POTR_ERROR = _('%(info)sAn unspecified error in the OTR plugin occured:\n'
+ '%(exc)s')
+
+TRUST_ADDED = _('%(info)sYou added %(jid_c)s%(bare_jid)s%(info)s with key '
+ '\x19o%(key)s%(info)s to your trusted list.')
+
+
+TRUST_REMOVED = _('%(info)sYou removed %(jid_c)s%(bare_jid)s%(info)s with '
+ 'key \x19o%(key)s%(info)s from your trusted list.')
+
+KEY_DROPPED = _('%(info)sPrivate key dropped.')
+
def hl(tab):
+ """
+ Make a tab beep and change its status.
+ """
if tab.state != 'current':
tab.state = 'private'
@@ -214,12 +317,20 @@ def hl(tab):
curses.beep()
class PoezioContext(Context):
+ """
+ OTR context, specific to a conversation with a contact
+
+ Overrides methods from potr.context.Context
+ """
def __init__(self, account, peer, xmpp, core):
super(PoezioContext, self).__init__(account, peer)
self.xmpp = xmpp
self.core = core
self.flags = {}
self.trustName = safeJID(peer).bare
+ self.in_smp = False
+ self.smp_own = False
+ self.log = 0
def getPolicy(self, key):
if key in self.flags:
@@ -227,6 +338,10 @@ class PoezioContext(Context):
else:
return False
+ def reset_smp(self):
+ self.in_smp = False
+ self.smp_own = False
+
def inject(self, msg, appdata=None):
message = self.xmpp.make_message(mto=self.peer,
mbody=msg.decode('ascii'),
@@ -235,8 +350,13 @@ class PoezioContext(Context):
message.send()
def setState(self, newstate):
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT),
+ 'jid': self.peer,
+ 'bare_jid': safeJID(self.peer).bare
+ }
tab = self.core.get_tab_by_name(self.peer)
if not tab:
@@ -245,55 +365,29 @@ class PoezioContext(Context):
if tab and not tab.locked_resource == safeJID(self.peer).resource:
tab = None
if self.state == STATE_ENCRYPTED:
- if newstate == STATE_ENCRYPTED:
+ if newstate == STATE_ENCRYPTED and tab:
log.debug('OTR conversation with %s refreshed', self.peer)
- if tab:
- if self.getCurrentTrust():
- msg = _('%(info)sRefreshed \x19btrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': self.peer
- }
- tab.add_message(msg, typ=self.log)
- else:
- msg = _('%(info)sRefreshed \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(msg, typ=self.log)
- hl(tab)
+ if self.getCurrentTrust():
+ msg = OTR_REFRESH_TRUSTED % format_dict
+ tab.add_message(msg, typ=self.log)
+ else:
+ msg = OTR_REFRESH_UNTRUSTED % format_dict
+ tab.add_message(msg, typ=self.log)
+ hl(tab)
elif newstate == STATE_FINISHED or newstate == STATE_PLAINTEXT:
log.debug('OTR conversation with %s finished', self.peer)
if tab:
- tab.add_message('%sEnded OTR conversation with %s%s' % (
- color_info, color_jid, self.peer),
- typ=self.log)
- hl(tab)
- else:
- if newstate == STATE_ENCRYPTED:
- if tab:
- if self.getCurrentTrust():
- msg = _('%(info)sStarted a \x19btrusted\x19o%(info)s '
- 'OTR conversation with %(jid_c)s%(jid)s') % {
- 'jid': self.peer,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
- else:
- msg = _('%(info)sStarted an \x19buntrusted\x19o%(info)s'
- ' OTR conversation with %(jid_c)s%(jid)s'
- '%(info)s, key: \x19o%(key)s') % {
- 'jid': self.peer,
- 'key': self.getCurrentKey(),
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(msg, typ=self.log)
+ tab.add_message(OTR_END % format_dict, typ=self.log)
hl(tab)
+ elif newstate == STATE_ENCRYPTED and tab:
+ if self.getCurrentTrust():
+ tab.add_message(OTR_START_TRUSTED % format_dict, typ=self.log)
+ else:
+ format_dict['our_fpr'] = self.user.getPrivkey()
+ format_dict['remote_fpr'] = self.getCurrentKey()
+ tab.add_message(OTR_TUTORIAL % format_dict, typ=0)
+ tab.add_message(OTR_START_UNTRUSTED % format_dict, typ=self.log)
+ hl(tab)
log.debug('Set encryption state of %s to %s', self.peer, states[newstate])
super(PoezioContext, self).setState(newstate)
@@ -302,9 +396,14 @@ class PoezioContext(Context):
self.core.doupdate()
class PoezioAccount(Account):
+ """
+ OTR Account, keeps track of a specific account (ours)
+
+ Redefines the load/save methods from potr.context.Account
+ """
def __init__(self, jid, key_dir):
- super(PoezioAccount, self).__init__(jid, 'xmpp', 1024)
+ super(PoezioAccount, self).__init__(jid, 'xmpp', 0)
self.key_dir = os.path.join(key_dir, jid)
def load_privkey(self):
@@ -348,8 +447,7 @@ class PoezioAccount(Account):
with open(self.key_dir + '.fpr', 'w') as fpr_fd:
for uid, trusts in self.trusts.items():
for fpr, trustVal in trusts.items():
- fpr_fd.write('\t'.join(
- (uid, self.name, 'xmpp', fpr, trustVal)))
+ fpr_fd.write('\t'.join((uid, self.name, 'xmpp', fpr, trustVal)))
fpr_fd.write('\n')
except:
log.exception('Error in save_trusts', exc_info=True)
@@ -360,32 +458,29 @@ class PoezioAccount(Account):
savePrivkey = save_privkey
states = {
- STATE_PLAINTEXT: 'plaintext',
- STATE_ENCRYPTED: 'encrypted',
- STATE_FINISHED: 'finished',
+ STATE_PLAINTEXT: 'plaintext',
+ STATE_ENCRYPTED: 'encrypted',
+ STATE_FINISHED: 'finished',
}
class Plugin(BasePlugin):
def init(self):
# set the default values from the config
- allow_v2 = self.config.get('allow_v2', True)
- POLICY_FLAGS['ALLOW_V2'] = allow_v2
- allow_v1 = self.config.get('allow_v1', False)
- POLICY_FLAGS['ALLOW_v1'] = allow_v1
-
global OTR_DIR
OTR_DIR = os.path.expanduser(self.config.get('keys_dir', '') or OTR_DIR)
try:
os.makedirs(OTR_DIR)
except OSError as e:
if e.errno != 17:
- self.api.information('The OTR-specific folder could not be created'
- ' poezio will be unable to save keys and trusts', 'OTR')
+ self.api.information('The OTR-specific folder could not '
+ 'be created. Poezio will be unable '
+ 'to save keys and trusts', 'OTR')
except:
- self.api.information('The OTR-specific folder could not be created'
- ' poezio will be unable to save keys and trusts', 'OTR')
+ self.api.information('The OTR-specific folder could not '
+ 'be created. Poezio will be unable '
+ 'to save keys and trusts', 'OTR')
self.api.add_event_handler('conversation_msg', self.on_conversation_msg)
self.api.add_event_handler('private_msg', self.on_conversation_msg)
@@ -398,7 +493,7 @@ class Plugin(BasePlugin):
self.account = PoezioAccount(self.core.xmpp.boundjid.bare, OTR_DIR)
self.account.load_trusts()
self.contexts = {}
- usage = '[start|refresh|end|fpr|ourfpr|drop|trust|untrust]'
+ usage = '<start|refresh|end|fpr|ourfpr|drop|trust|untrust>'
shortdesc = 'Manage an OTR conversation'
desc = ('Manage an OTR conversation.\n'
'start/refresh: Start or refresh a conversation\n'
@@ -408,16 +503,26 @@ class Plugin(BasePlugin):
'drop: Remove the current key (FOREVER)\n'
'trust: Set this key for this contact as trusted\n'
'untrust: Remove the trust for the key of this contact\n')
+ smp_usage = '<abort|ask|answer> [question] [answer]'
+ smp_short = 'Identify a contact'
+ smp_desc = ('Verify the identify of your contact by using a pre-defined secret.\n'
+ 'abort: Abort an ongoing verification\n'
+ 'ask: Start a verification, with a question or not\n'
+ 'answer: Finish a verification\n')
+
+ self.api.add_tab_command(ConversationTab, 'otrsmp', self.command_smp,
+ help=smp_desc, usage=smp_usage, short=smp_short,
+ completion=self.completion_smp)
+ self.api.add_tab_command(PrivateTab, 'otrsmp', self.command_smp,
+ help=smp_desc, usage=smp_usage, short=smp_short,
+ completion=self.completion_smp)
+
self.api.add_tab_command(ConversationTab, 'otr', self.command_otr,
- help=desc,
- usage=usage,
- short=shortdesc,
- completion=self.completion_otr)
+ help=desc, usage=usage, short=shortdesc,
+ completion=self.completion_otr)
self.api.add_tab_command(PrivateTab, 'otr', self.command_otr,
- help=desc,
- usage=usage,
- short=shortdesc,
- completion=self.completion_otr)
+ help=desc, usage=usage, short=shortdesc,
+ completion=self.completion_otr)
def cleanup(self):
for context in self.contexts.values():
@@ -427,109 +532,159 @@ class Plugin(BasePlugin):
PrivateTab.remove_information_element('otr')
def get_context(self, jid):
+ """
+ Retrieve or create an OTR context
+ """
jid = safeJID(jid).full
if not jid in self.contexts:
flags = POLICY_FLAGS.copy()
- policy = self.config.get_by_tabname('encryption_policy', jid, default='ondemand').lower()
+ require = self.config.get_by_tabname('require_encryption',
+ jid, default=False)
+ flags['REQUIRE_ENCRYPTION'] = require
logging_policy = self.config.get_by_tabname('log', jid, default='false').lower()
- allow_v2 = self.config.get_by_tabname('allow_v2', jid, default='true').lower()
- flags['ALLOW_V2'] = (allow_v2 != 'false')
- allow_v1 = self.config.get_by_tabname('allow_v1', jid, default='false').lower()
- flags['ALLOW_V1'] = (allow_v1 == 'true')
self.contexts[jid] = PoezioContext(self.account, jid, self.core.xmpp, self.core)
self.contexts[jid].log = 1 if logging_policy != 'false' else 0
self.contexts[jid].flags = flags
return self.contexts[jid]
def on_conversation_msg(self, msg, tab):
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
+ """
+ Message received
+ """
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': msg['from']
+ }
try:
ctx = self.get_context(msg['from'])
txt, tlvs = ctx.receiveMessage(msg["body"].encode('utf-8'))
+
+ # SMP
+ if tlvs:
+ self.handle_tlvs(tlvs, ctx, tab, format_dict)
except UnencryptedMessage as err:
# received an unencrypted message inside an OTR session
- text = _('%(info)sThe following message from %(jid_c)s%(jid)s'
- '%(info)s was \x19bnot\x19o%(info)s encrypted:'
- '\x19o\n%(msg)s') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': msg['from'],
- 'msg': err.args[0].decode('utf-8')}
- tab.add_message(text, jid=msg['from'],
- typ=0)
- del msg['body']
- del msg['html']
- hl(tab)
- self.core.refresh_window()
+ self.unencrypted_message_received(err, ctx, msg, tab, format_dict)
+ self.otr_start(tab, tab.name, format_dict)
+ return
+ except NotOTRMessage as err:
+ # ignore non-otr messages
+ # if we expected an OTR message, we would have
+ # got an UnencryptedMesssage
+ # but do an additional check because of a bug with potr and py3k
+ if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'):
+ self.unencrypted_message_received(err, ctx, msg, tab, format_dict)
+ self.otr_start(tab, tab.name, format_dict)
return
except ErrorReceived as err:
# Received an OTR error
- text = _('%(info)sReceived the following error from '
- '%(jid_c)s%(jid)s%(info)s:\x19o %(err)s') % {
- 'jid': msg['from'],
- 'err': err.args[0],
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(text, typ=0)
+ format_dict['err'] = err.args[0].error.decode('utf-8', errors='replace')
+ tab.add_message(OTR_ERROR % format_dict, typ=0)
del msg['body']
del msg['html']
hl(tab)
self.core.refresh_window()
return
- except NotOTRMessage as err:
- # ignore non-otr messages
- # if we expected an OTR message, we would have
- # got an UnencryptedMesssage
- # but do an additional check because of a bug with py3k
- if ctx.state != STATE_PLAINTEXT or ctx.getPolicy('REQUIRE_ENCRYPTION'):
-
- text = _('%(info)sThe following message from '
- '%(jid_c)s%(jid)s%(info)s was \x19b'
- 'not\x19o%(info)s encrypted:\x19o\n%(msg)s') % {
- 'jid': msg['from'],
- 'msg': err.args[0].decode('utf-8'),
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, jid=msg['from'],
- typ=ctx.log)
- del msg['body']
- del msg['html']
- hl(tab)
- self.core.refresh_window()
- return
- return
except NotEncryptedError as err:
- text = _('%(info)sAn encrypted message from %(jid_c)s%(jid)s'
- '%(info)s was received but is unreadable, as you are'
- ' not currently communicating privately.') % {
- 'info': color_info,
- 'jid_c': color_jid,
- 'jid': msg['from']}
- tab.add_message(text, jid=msg['from'],
- typ=0)
+ # Encrypted message received, but unreadable as we do not have
+ # an OTR session in place.
+ text = MESSAGE_UNREADABLE % format_dict
+ tab.add_message(text, jid=msg['from'], typ=0)
hl(tab)
del msg['body']
del msg['html']
self.core.refresh_window()
return
except crypt.InvalidParameterError:
- tab.add_message('%sThe message from %s%s%s could not be decrypted.'
- % (color_info, color_jid, msg['from'], color_info),
- jid=msg['from'], typ=0)
+ # Malformed OTR payload and stuff
+ text = MESSAGE_INVALID % format_dict
+ tab.add_message(text, jid=msg['from'], typ=0)
hl(tab)
del msg['body']
del msg['html']
self.core.refresh_window()
return
- except:
- tab.add_message('%sAn unspecified error in the OTR plugin occured'
- % color_info,
- typ=0)
+ except Exception:
+ # Unexpected error
+ import traceback
+ exc = traceback.format_exc()
+ format_dict['exc'] = exc
+ tab.add_message(POTR_ERROR % format_dict, typ=0)
log.error('Unspecified error in the OTR plugin', exc_info=True)
return
+ # No error, proceed with the message
+ self.encrypted_message_received(msg, ctx, tab, txt)
+ def handle_tlvs(self, tlvs, ctx, tab, format_dict):
+ """
+ If the message had a TLV, it means we received part of an SMP
+ exchange.
+ """
+ smp1q = get_tlv(tlvs, potr.proto.SMP1QTLV)
+ smp1 = get_tlv(tlvs, potr.proto.SMP1TLV)
+ smp2 = get_tlv(tlvs, potr.proto.SMP2TLV)
+ smp3 = get_tlv(tlvs, potr.proto.SMP3TLV)
+ smp4 = get_tlv(tlvs, potr.proto.SMP4TLV)
+ abort = get_tlv(tlvs, potr.proto.SMPABORTTLV)
+ if abort:
+ ctx.reset_smp()
+ tab.add_message(SMP_ABORTED_PEER % format_dict, typ=0)
+ elif ctx.in_smp and not ctx.smpIsValid():
+ ctx.reset_smp()
+ tab.add_message(SMP_ABORTED % format_dict, typ=0)
+ elif smp1 or smp1q:
+ # Received an SMP request (with a question or not)
+ if smp1q:
+ try:
+ question = ' with question: \x19o' + smp1q.msg.decode('utf-8')
+ except UnicodeDecodeError:
+ self.api.information('The peer sent a question but it had a wrong encoding', 'Error')
+ question = ''
+ else:
+ question = ''
+ ctx.in_smp = True
+ # we did not initiate it
+ ctx.smp_own = False
+ format_dict['q'] = question
+ tab.add_message(SMP_REQUESTED % format_dict, typ=0)
+ elif smp2:
+ # SMP reply received
+ if not ctx.in_smp:
+ ctx.reset_smp()
+ else:
+ tab.add_message(SMP_PROGRESS % format_dict, typ=0)
+ elif smp3 or smp4:
+ # Type 4 (SMP message 3) or 5 (SMP message 4) TLVs received
+ # in both cases it is the final message of the SMP exchange
+ if ctx.smpIsSuccess():
+ tab.add_message(SMP_SUCCESS % format_dict, typ=0)
+ if not ctx.getCurrentTrust():
+ tab.add_message(SMP_RECIPROCATE % format_dict, typ=0)
+ else:
+ tab.add_message(SMP_FAIL % format_dict, typ=0)
+ ctx.reset_smp()
+ hl(tab)
+ self.core.refresh_window()
+
+ def unencrypted_message_received(self, err, ctx, msg, tab, format_dict):
+ """
+ An unencrypted message was received while we expected it to be
+ encrypted. Display it with a warning.
+ """
+ format_dict['msg'] = err.args[0].decode('utf-8')
+ text = MESSAGE_UNENCRYPTED % format_dict
+ tab.add_message(text, jid=msg['from'], typ=ctx.log)
+ del msg['body']
+ del msg['html']
+ hl(tab)
+ self.core.refresh_window()
+
+ def encrypted_message_received(self, msg, ctx, tab, txt):
+ """
+ A properly encrypted message was received, so we add it to the
+ buffer, and try to format it according to the configuration.
+ """
# remove xhtml
del msg['html']
del msg['body']
@@ -544,11 +699,25 @@ class Plugin(BasePlugin):
nick_color = get_theme().COLOR_REMOTE_USER
body = txt.decode()
+ decode_entities = self.config.get_by_tabname('decode_entities',
+ msg['from'].bare,
+ default=True)
+ decode_newlines = self.config.get_by_tabname('decode_newlines',
+ msg['from'].bare,
+ default=True)
if self.config.get_by_tabname('decode_xhtml', msg['from'].bare, default=True):
try:
body = xhtml.xhtml_to_poezio_colors(body, force=True)
- except:
- pass
+ except Exception:
+ if decode_entities:
+ body = html.unescape(body)
+ if decode_newlines:
+ body = body.replace('<br/>', '\n').replace('<br>', '\n')
+ else:
+ if decode_entities:
+ body = html.unescape(body)
+ if decode_newlines:
+ body = body.replace('<br/>', '\n').replace('<br>', '\n')
tab.add_message(body, nickname=tab.nick, jid=msg['from'],
forced_user=user, typ=ctx.log,
nick_color=nick_color)
@@ -556,113 +725,127 @@ class Plugin(BasePlugin):
self.core.refresh_window()
del msg['body']
+ def find_encrypted_context_with_matching(self, bare_jid):
+ """
+ Find an OTR session from a bare JID.
+
+ Useful when a dynamic tab unlocks, which would lead to sending
+ unencrypted messages until it locks again, if we didn’t fallback
+ with this.
+ """
+ for ctx in self.contexts:
+ if safeJID(ctx).bare == bare_jid and self.contexts[ctx].state == STATE_ENCRYPTED:
+ return self.contexts[ctx]
+ return None
+
def on_conversation_say(self, msg, tab):
"""
On message sent
"""
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
- name = safeJID(tab.name)
- name.resource = tab.locked_resource
- name = name.full
+ jid = safeJID(tab.name)
+ jid.resource = tab.locked_resource
+ name = jid.full
else:
name = tab.name
+ jid = safeJID(tab.name)
+
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': name,
+ }
+
ctx = self.contexts.get(name)
+ if isinstance(tab, DynamicConversationTab) and not tab.locked_resource:
+ log.debug('Unlocked tab %s found, falling back to the first encrypted chat we find.', name)
+ ctx = self.find_encrypted_context_with_matching(jid.bare)
+
if ctx and ctx.state == STATE_ENCRYPTED:
ctx.sendMessage(0, msg['body'].encode('utf-8'))
if not tab.send_chat_state('active'):
tab.send_chat_state('inactive', always_send=True)
tab.add_message(msg['body'],
- nickname=self.core.own_nick or tab.own_nick,
- nick_color=get_theme().COLOR_OWN_NICK,
- identifier=msg['id'],
- jid=self.core.xmpp.boundjid,
- typ=ctx.log)
+ nickname=self.core.own_nick or tab.own_nick,
+ nick_color=get_theme().COLOR_OWN_NICK,
+ identifier=msg['id'],
+ jid=self.core.xmpp.boundjid,
+ typ=ctx.log)
# remove everything from the message so that it doesn’t get sent
del msg['body']
del msg['replace']
del msg['html']
+ elif ctx and ctx.getPolicy('REQUIRE_ENCRYPTION'):
+ tab.add_message(MESSAGE_NOT_SENT % format_dict, typ=0)
+ del msg['body']
+ del msg['replace']
+ del msg['html']
+ self.otr_start(tab, name, format_dict)
def display_encryption_status(self, jid):
+ """
+ Returns the text to display in the infobar (the OTR status)
+ """
context = self.get_context(jid)
+ if safeJID(jid).bare == jid and context.state != STATE_ENCRYPTED:
+ ctx = self.find_encrypted_context_with_matching(jid)
+ if ctx:
+ context = ctx
state = states[context.state]
- return ' OTR: %s' % state
+ trust = 'trusted' if context.getCurrentTrust() else 'untrusted'
+
+ return ' OTR: %s (%s)' % (state, trust)
def command_otr(self, arg):
"""
/otr [start|refresh|end|fpr|ourfpr]
"""
- arg = arg.strip()
+ args = common.shell_split(arg)
+ if not args:
+ return self.core.command_help('otr')
+ action = args.pop(0)
tab = self.api.current_tab()
name = tab.name
- color_jid = '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID)
- color_info = '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT)
- color_normal = '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT)
if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
name = safeJID(tab.name)
name.resource = tab.locked_resource
name = name.full
- if arg == 'end': # close the session
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'normal': '\x19%s}' % dump_tuple(get_theme().COLOR_NORMAL_TEXT),
+ 'jid': name,
+ 'bare_jid': safeJID(name).bare
+ }
+
+ if action == 'end': # close the session
context = self.get_context(name)
context.disconnect()
- elif arg == 'start' or arg == 'refresh':
- otr = self.get_context(name)
- secs = self.config.get('timeout', 3)
- def notify_otr_timeout():
- if otr.state != STATE_ENCRYPTED:
- text = _('%(jid_c)s%(jid)s%(info)s did not enable'
- ' OTR after %(sec)s seconds.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid,
- 'sec': secs}
- tab.add_message(text, typ=0)
- self.core.refresh_window()
- if secs > 0:
- event = self.api.create_delayed_event(secs, notify_otr_timeout)
- self.api.add_timed_event(event)
- self.core.xmpp.send_message(mto=name, mtype='chat',
- mbody=self.contexts[name].sendMessage(0, b'?OTRv?').decode())
- text = _('%(info)sOTR request to %(jid_c)s%(jid)s%(info)s sent.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
- elif arg == 'ourfpr':
- fpr = self.account.getPrivkey()
- text = _('%(info)sYour OTR key fingerprint is %(norm)s%(fpr)s.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'norm': color_normal,
- 'fpr': fpr}
- tab.add_message(text, typ=0)
- elif arg == 'fpr':
+ if isinstance(tab, DynamicConversationTab) and not tab.locked_resource:
+ ctx = self.find_encrypted_context_with_matching(safeJID(name).bare)
+ ctx.disconnect()
+ elif action == 'start' or action == 'refresh':
+ self.otr_start(tab, name, format_dict)
+ elif action == 'ourfpr':
+ format_dict['fpr'] = self.account.getPrivkey()
+ tab.add_message(OTR_OWN_FPR % format_dict, typ=0)
+ elif action == 'fpr':
if name in self.contexts:
ctx = self.contexts[name]
if ctx.getCurrentKey() is not None:
- text = _('%(info)sThe key fingerprint for %(jid_c)s'
- '%(jid)s%(info)s is %(norm)s%(fpr)s%(info)s.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'norm': color_normal,
- 'jid_c': color_jid,
- 'fpr': ctx.getCurrentKey()}
- tab.add_message(text, typ=0)
+ format_dict['fpr'] = ctx.getCurrentKey()
+ tab.add_message(OTR_REMOTE_FPR % format_dict, typ=0)
else:
- text = _('%(jid_c)s%(jid)s%(info)s has no'
- ' key currently in use.') % {
- 'jid': tab.name,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
- elif arg == 'drop':
+ tab.add_message(OTR_NO_FPR % format_dict, typ=0)
+ elif action == 'drop':
# drop the privkey (and obviously, end the current conversations before that)
for context in self.contexts.values():
if context.state not in (STATE_FINISHED, STATE_PLAINTEXT):
context.disconnect()
self.account.drop_privkey()
- tab.add_message('%sPrivate key dropped.' % color_info, typ=0)
- elif arg == 'trust':
+ tab.add_message(KEY_DROPPED % format_dict, typ=0)
+ elif action == 'trust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@@ -670,16 +853,11 @@ class Plugin(BasePlugin):
else:
return
if not ctx.getCurrentTrust():
+ format_dict['key'] = key
ctx.setTrust(fpr, 'verified')
self.account.saveTrusts()
- text = _('%(info)sYou added %(jid_c)s%(jid)s%(info)s with key '
- '\x19o%(key)s%(info)s to your trusted list.') % {
- 'jid': ctx.trustName,
- 'key': key,
- 'info': color_info,
- 'jid_c': color_jid}
- tab.add_message(text, typ=0)
- elif arg == 'untrust':
+ tab.add_message(TRUST_ADDED % format_dict, typ=0)
+ elif action == 'untrust':
ctx = self.get_context(name)
key = ctx.getCurrentKey()
if key:
@@ -687,19 +865,108 @@ class Plugin(BasePlugin):
else:
return
if ctx.getCurrentTrust():
+ format_dict['key'] = key
ctx.setTrust(fpr, '')
self.account.saveTrusts()
- text = _('%(info)sYou removed %(jid_c)s%(jid)s%(info)s with '
- 'key \x19o%(key)s%(info)s from your trusted list.') % {
- 'jid': ctx.trustName,
- 'key': key,
- 'info': color_info,
- 'jid_c': color_jid}
-
- tab.add_message(text, typ=0)
+ tab.add_message(TRUST_REMOVED % format_dict, typ=0)
self.core.refresh_window()
- def completion_otr(self, the_input):
+ def otr_start(self, tab, name, format_dict):
+ """
+ Start an otr conversation with a contact
+ """
+ secs = self.config.get('timeout', 3)
+ def notify_otr_timeout():
+ tab_name = tab.name
+ otr = self.get_context(tab_name)
+ if isinstance(tab, DynamicConversationTab):
+ if tab.locked_resource:
+ tab_name = safeJID(tab.name)
+ tab_name.resource = tab.locked_resource
+ tab_name = tab_name.full
+ otr = self.get_context(tab_name)
+ if otr.state != STATE_ENCRYPTED:
+ format_dict['secs'] = secs
+ text = OTR_NOT_ENABLED % format_dict
+ tab.add_message(text, typ=0)
+ self.core.refresh_window()
+ if secs > 0:
+ event = self.api.create_delayed_event(secs, notify_otr_timeout)
+ self.api.add_timed_event(event)
+ body = self.get_context(name).sendMessage(0, b'?OTRv?').decode()
+ self.core.xmpp.send_message(mto=name, mtype='chat', mbody=body)
+ tab.add_message(OTR_REQUEST % format_dict, typ=0)
+
+ @staticmethod
+ def completion_otr(the_input):
+ """
+ Completion for /otr
+ """
comp = ['start', 'fpr', 'ourfpr', 'refresh', 'end', 'trust', 'untrust']
return the_input.new_completion(comp, 1, quotify=False)
+ @command_args_parser.quoted(1, 2)
+ def command_smp(self, args):
+ """
+ /otrsmp <ask|answer|abort> [question] [secret]
+ """
+ if args is None or not args:
+ return self.core.command_help('otrsmp')
+ length = len(args)
+ action = args.pop(0)
+ if length == 2:
+ question = None
+ secret = args.pop(0).encode('utf-8')
+ elif length == 3:
+ question = args.pop(0).encode('utf-8')
+ secret = args.pop(0).encode('utf-8')
+ else:
+ question = secret = None
+
+ tab = self.api.current_tab()
+ name = tab.name
+ if isinstance(tab, DynamicConversationTab) and tab.locked_resource:
+ name = safeJID(tab.name)
+ name.resource = tab.locked_resource
+ name = name.full
+
+ format_dict = {
+ 'jid_c': '\x19%s}' % dump_tuple(get_theme().COLOR_MUC_JID),
+ 'info': '\x19%s}' % dump_tuple(get_theme().COLOR_INFORMATION_TEXT),
+ 'jid': name,
+ 'bare_jid': safeJID(name).bare
+ }
+
+ ctx = self.get_context(name)
+ if ctx.state != STATE_ENCRYPTED:
+ self.api.information('The current conversation is not encrypted',
+ 'Error')
+ return
+
+ if action == 'ask':
+ ctx.in_smp = True
+ ctx.smp_own = True
+ if question:
+ ctx.smpInit(secret, question)
+ else:
+ ctx.smpInit(secret)
+ tab.add_message(SMP_INITIATED % format_dict, typ=0)
+ elif action == 'answer':
+ ctx.smpGotSecret(secret)
+ elif action == 'abort':
+ if ctx.in_smp:
+ ctx.smpAbort()
+ tab.add_message(SMP_ABORTED % format_dict, typ=0)
+ self.core.refresh_window()
+
+ @staticmethod
+ def completion_smp(the_input):
+ """Completion for /otrsmp"""
+ if the_input.get_argument_position() == 1:
+ return the_input.new_completion(['ask', 'answer', 'abort'], 1, quotify=False)
+
+def get_tlv(tlvs, cls):
+ """Find the instance of a class in a list"""
+ for tlv in tlvs:
+ if isinstance(tlv, cls):
+ return tlv