summaryrefslogtreecommitdiff
path: root/src/tabs
diff options
context:
space:
mode:
authormathieui <mathieui@mathieui.net>2014-12-11 22:28:44 +0100
committermathieui <mathieui@mathieui.net>2014-12-11 22:28:44 +0100
commit00396c158aa032585db88cfd4b622281ba3cbd7f (patch)
treedfe9711c6ccc3b908c1de8a06bc5080139113bf4 /src/tabs
parent21d8a3e7e19dc639262ac7fa7d7817351ff8b4c1 (diff)
downloadpoezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.gz
poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.bz2
poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.xz
poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.zip
Fix #2847 (SASL External support)
- Add two new options, keyfile and certfile, which must be both set for the auth to work. - if both are set, then poezio doesn’t force-prompt a password if there is none specified - add /cert_add, /cert_fetch, /cert_disable, /cert_revoke and /certs commands. - add a page of documentation on the process
Diffstat (limited to 'src/tabs')
-rw-r--r--src/tabs/rostertab.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/src/tabs/rostertab.py b/src/tabs/rostertab.py
index 52a3a88c..846ba327 100644
--- a/src/tabs/rostertab.py
+++ b/src/tabs/rostertab.py
@@ -10,9 +10,11 @@ from gettext import gettext as _
import logging
log = logging.getLogger(__name__)
+import base64
import curses
import difflib
import os
+import ssl
from os import getenv, path
from . import Tab
@@ -146,6 +148,170 @@ class RosterInfoTab(Tab):
self.core.xmpp.del_event_handler('session_start', self.check_blocking)
self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message)
+ def check_saslexternal(self, features):
+ if 'urn:xmpp:saslcert:1' in features:
+ self.register_command('certs', self.command_certs,
+ desc=_('List the fingerprints of certificates'
+ ' which can connect to your account.'),
+ shortdesc=_('List allowed client certs.'))
+ self.register_command('cert_add', self.command_cert_add,
+ desc=_('Add a client certificate to the authorized ones. '
+ 'It must have an unique name and be contained in '
+ 'a PEM file. [management] is a boolean indicating'
+ ' if a client connected using this certificate can'
+ ' manage the certificates itself.'),
+ shortdesc=_('Add a client certificate.'),
+ usage='<name> <certificate path> [management]')
+ self.register_command('cert_disable', self.command_cert_disable,
+ desc=_('Remove a certificate from the list '
+ 'of allowed ones. Clients currently '
+ 'using this certificate will not be '
+ 'forcefully disconnected.'),
+ shortdesc=_('Disable a certificate'),
+ usage='<name>')
+ self.register_command('cert_revoke', self.command_cert_revoke,
+ desc=_('Remove a certificate from the list '
+ 'of allowed ones. Clients currently '
+ 'using this certificate will be '
+ 'forcefully disconnected.'),
+ shortdesc=_('Revoke a certificate'),
+ usage='<name>')
+ self.register_command('cert_fetch', self.command_cert_fetch,
+ desc=_('Retrieve a certificate with its '
+ 'name. It will be stored in <path>.'),
+ shortdesc=_('Fetch a certificate'),
+ usage='<name> <path>')
+
+ @command_args_parser.ignored
+ def command_certs(self):
+ """
+ /certs
+ """
+ def cb(iq):
+ if iq['type'] == 'error':
+ self.core.information(_('Unable to retrieve the certificate list.'),
+ _('Error'))
+ return
+ certs = []
+ for item in iq['sasl_certs']['items']:
+ users = '\n'.join(item['users'])
+ certs.append((item['name'], users))
+
+ if not certs:
+ return self.core.information(_('No certificates found'), _('Info'))
+ msg = _('Certificates:\n')
+ msg += '\n'.join(((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1])) for item in certs))
+ self.core.information(msg, 'Info')
+
+ self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3)
+
+ @command_args_parser.quoted(2, 1)
+ def command_cert_add(self, args):
+ """
+ /cert_add <name> <certfile> [cert-management]
+ """
+ if not args or len(args) < 2:
+ return self.core.command_help('cert_add')
+ def cb(iq):
+ if iq['type'] == 'error':
+ self.core.information(_('Unable to add the certificate.'), _('Error'))
+ else:
+ self.core.information(_('Certificate added.'), _('Info'))
+
+ name = args[0]
+
+ try:
+ with open(args[1]) as fd:
+ crt = fd.read()
+ crt = crt.replace(ssl.PEM_FOOTER, '').replace(ssl.PEM_HEADER, '').replace(' ', '').replace('\n', '')
+ except Exception as e:
+ self.core.information('Unable to read the certificate: %s' % e, 'Error')
+ return
+
+ if len(args) > 2:
+ management = args[2]
+ if management:
+ management = management.lower()
+ if management not in ('false', '0'):
+ management = True
+ else:
+ management = False
+ else:
+ management = False
+ else:
+ management = True
+
+ self.core.xmpp.plugin['xep_0257'].add_cert(name, crt, callback=cb,
+ allow_management=management)
+
+ @command_args_parser.quoted(1)
+ def command_cert_disable(self, args):
+ """
+ /cert_disable <name>
+ """
+ if not args:
+ return self.core.command_help('cert_disable')
+ def cb(iq):
+ if iq['type'] == 'error':
+ self.core.information(_('Unable to disable the certificate.'), _('Error'))
+ else:
+ self.core.information(_('Certificate disabled.'), _('Info'))
+
+ name = args[0]
+
+ self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb)
+
+ @command_args_parser.quoted(1)
+ def command_cert_revoke(self, args):
+ """
+ /cert_revoke <name>
+ """
+ if not args:
+ return self.core.command_help('cert_revoke')
+ def cb(iq):
+ if iq['type'] == 'error':
+ self.core.information(_('Unable to revoke the certificate.'), _('Error'))
+ else:
+ self.core.information(_('Certificate revoked.'), _('Info'))
+
+ name = args[0]
+
+ self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb)
+
+
+ @command_args_parser.quoted(2)
+ def command_cert_fetch(self, args):
+ """
+ /cert_fetch <name> <path>
+ """
+ if not args or len(args) < 2:
+ return self.core.command_help('cert_fetch')
+ def cb(iq):
+ if iq['type'] == 'error':
+ self.core.information(_('Unable to fetch the certificate.'),
+ _('Error'))
+ return
+
+ cert = None
+ for item in iq['sasl_certs']['items']:
+ if item['name'] == name:
+ cert = base64.b64decode(item['x509cert'])
+ break
+
+ if not cert:
+ return self.core.information(_('Certificate not found.'), _('Info'))
+
+ cert = ssl.DER_cert_to_PEM_cert(cert)
+ with open(path, 'w') as fd:
+ fd.write(cert)
+
+ self.core.information(_('File stored at %s') % path, 'Info')
+
+ name = args[0]
+ path = args[1]
+
+ self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb)
+
def on_blocked_message(self, message):
"""
When we try to send a message to a blocked contact