diff options
author | mathieui <mathieui@mathieui.net> | 2014-12-11 22:28:44 +0100 |
---|---|---|
committer | mathieui <mathieui@mathieui.net> | 2014-12-11 22:28:44 +0100 |
commit | 00396c158aa032585db88cfd4b622281ba3cbd7f (patch) | |
tree | dfe9711c6ccc3b908c1de8a06bc5080139113bf4 /src/tabs | |
parent | 21d8a3e7e19dc639262ac7fa7d7817351ff8b4c1 (diff) | |
download | poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.gz poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.bz2 poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.tar.xz poezio-00396c158aa032585db88cfd4b622281ba3cbd7f.zip |
Fix #2847 (SASL External support)
- Add two new options, keyfile and certfile, which must be both set for
the auth to work.
- if both are set, then poezio doesn’t force-prompt a password if there
is none specified
- add /cert_add, /cert_fetch, /cert_disable, /cert_revoke and /certs
commands.
- add a page of documentation on the process
Diffstat (limited to 'src/tabs')
-rw-r--r-- | src/tabs/rostertab.py | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/src/tabs/rostertab.py b/src/tabs/rostertab.py index 52a3a88c..846ba327 100644 --- a/src/tabs/rostertab.py +++ b/src/tabs/rostertab.py @@ -10,9 +10,11 @@ from gettext import gettext as _ import logging log = logging.getLogger(__name__) +import base64 import curses import difflib import os +import ssl from os import getenv, path from . import Tab @@ -146,6 +148,170 @@ class RosterInfoTab(Tab): self.core.xmpp.del_event_handler('session_start', self.check_blocking) self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message) + def check_saslexternal(self, features): + if 'urn:xmpp:saslcert:1' in features: + self.register_command('certs', self.command_certs, + desc=_('List the fingerprints of certificates' + ' which can connect to your account.'), + shortdesc=_('List allowed client certs.')) + self.register_command('cert_add', self.command_cert_add, + desc=_('Add a client certificate to the authorized ones. ' + 'It must have an unique name and be contained in ' + 'a PEM file. [management] is a boolean indicating' + ' if a client connected using this certificate can' + ' manage the certificates itself.'), + shortdesc=_('Add a client certificate.'), + usage='<name> <certificate path> [management]') + self.register_command('cert_disable', self.command_cert_disable, + desc=_('Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will not be ' + 'forcefully disconnected.'), + shortdesc=_('Disable a certificate'), + usage='<name>') + self.register_command('cert_revoke', self.command_cert_revoke, + desc=_('Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will be ' + 'forcefully disconnected.'), + shortdesc=_('Revoke a certificate'), + usage='<name>') + self.register_command('cert_fetch', self.command_cert_fetch, + desc=_('Retrieve a certificate with its ' + 'name. It will be stored in <path>.'), + shortdesc=_('Fetch a certificate'), + usage='<name> <path>') + + @command_args_parser.ignored + def command_certs(self): + """ + /certs + """ + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to retrieve the certificate list.'), + _('Error')) + return + certs = [] + for item in iq['sasl_certs']['items']: + users = '\n'.join(item['users']) + certs.append((item['name'], users)) + + if not certs: + return self.core.information(_('No certificates found'), _('Info')) + msg = _('Certificates:\n') + msg += '\n'.join(((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1])) for item in certs)) + self.core.information(msg, 'Info') + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3) + + @command_args_parser.quoted(2, 1) + def command_cert_add(self, args): + """ + /cert_add <name> <certfile> [cert-management] + """ + if not args or len(args) < 2: + return self.core.command_help('cert_add') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to add the certificate.'), _('Error')) + else: + self.core.information(_('Certificate added.'), _('Info')) + + name = args[0] + + try: + with open(args[1]) as fd: + crt = fd.read() + crt = crt.replace(ssl.PEM_FOOTER, '').replace(ssl.PEM_HEADER, '').replace(' ', '').replace('\n', '') + except Exception as e: + self.core.information('Unable to read the certificate: %s' % e, 'Error') + return + + if len(args) > 2: + management = args[2] + if management: + management = management.lower() + if management not in ('false', '0'): + management = True + else: + management = False + else: + management = False + else: + management = True + + self.core.xmpp.plugin['xep_0257'].add_cert(name, crt, callback=cb, + allow_management=management) + + @command_args_parser.quoted(1) + def command_cert_disable(self, args): + """ + /cert_disable <name> + """ + if not args: + return self.core.command_help('cert_disable') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to disable the certificate.'), _('Error')) + else: + self.core.information(_('Certificate disabled.'), _('Info')) + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb) + + @command_args_parser.quoted(1) + def command_cert_revoke(self, args): + """ + /cert_revoke <name> + """ + if not args: + return self.core.command_help('cert_revoke') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to revoke the certificate.'), _('Error')) + else: + self.core.information(_('Certificate revoked.'), _('Info')) + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb) + + + @command_args_parser.quoted(2) + def command_cert_fetch(self, args): + """ + /cert_fetch <name> <path> + """ + if not args or len(args) < 2: + return self.core.command_help('cert_fetch') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to fetch the certificate.'), + _('Error')) + return + + cert = None + for item in iq['sasl_certs']['items']: + if item['name'] == name: + cert = base64.b64decode(item['x509cert']) + break + + if not cert: + return self.core.information(_('Certificate not found.'), _('Info')) + + cert = ssl.DER_cert_to_PEM_cert(cert) + with open(path, 'w') as fd: + fd.write(cert) + + self.core.information(_('File stored at %s') % path, 'Info') + + name = args[0] + path = args[1] + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb) + def on_blocked_message(self, message): """ When we try to send a message to a blocked contact |