From 00396c158aa032585db88cfd4b622281ba3cbd7f Mon Sep 17 00:00:00 2001 From: mathieui Date: Thu, 11 Dec 2014 22:28:44 +0100 Subject: Fix #2847 (SASL External support) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add two new options, keyfile and certfile, which must be both set for the auth to work. - if both are set, then poezio doesn’t force-prompt a password if there is none specified - add /cert_add, /cert_fetch, /cert_disable, /cert_revoke and /certs commands. - add a page of documentation on the process --- src/tabs/rostertab.py | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) (limited to 'src/tabs/rostertab.py') diff --git a/src/tabs/rostertab.py b/src/tabs/rostertab.py index 52a3a88c..846ba327 100644 --- a/src/tabs/rostertab.py +++ b/src/tabs/rostertab.py @@ -10,9 +10,11 @@ from gettext import gettext as _ import logging log = logging.getLogger(__name__) +import base64 import curses import difflib import os +import ssl from os import getenv, path from . import Tab @@ -146,6 +148,170 @@ class RosterInfoTab(Tab): self.core.xmpp.del_event_handler('session_start', self.check_blocking) self.core.xmpp.add_event_handler('blocked_message', self.on_blocked_message) + def check_saslexternal(self, features): + if 'urn:xmpp:saslcert:1' in features: + self.register_command('certs', self.command_certs, + desc=_('List the fingerprints of certificates' + ' which can connect to your account.'), + shortdesc=_('List allowed client certs.')) + self.register_command('cert_add', self.command_cert_add, + desc=_('Add a client certificate to the authorized ones. ' + 'It must have an unique name and be contained in ' + 'a PEM file. [management] is a boolean indicating' + ' if a client connected using this certificate can' + ' manage the certificates itself.'), + shortdesc=_('Add a client certificate.'), + usage=' [management]') + self.register_command('cert_disable', self.command_cert_disable, + desc=_('Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will not be ' + 'forcefully disconnected.'), + shortdesc=_('Disable a certificate'), + usage='') + self.register_command('cert_revoke', self.command_cert_revoke, + desc=_('Remove a certificate from the list ' + 'of allowed ones. Clients currently ' + 'using this certificate will be ' + 'forcefully disconnected.'), + shortdesc=_('Revoke a certificate'), + usage='') + self.register_command('cert_fetch', self.command_cert_fetch, + desc=_('Retrieve a certificate with its ' + 'name. It will be stored in .'), + shortdesc=_('Fetch a certificate'), + usage=' ') + + @command_args_parser.ignored + def command_certs(self): + """ + /certs + """ + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to retrieve the certificate list.'), + _('Error')) + return + certs = [] + for item in iq['sasl_certs']['items']: + users = '\n'.join(item['users']) + certs.append((item['name'], users)) + + if not certs: + return self.core.information(_('No certificates found'), _('Info')) + msg = _('Certificates:\n') + msg += '\n'.join(((' %s%s' % (item[0] + (': ' if item[1] else ''), item[1])) for item in certs)) + self.core.information(msg, 'Info') + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb, timeout=3) + + @command_args_parser.quoted(2, 1) + def command_cert_add(self, args): + """ + /cert_add [cert-management] + """ + if not args or len(args) < 2: + return self.core.command_help('cert_add') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to add the certificate.'), _('Error')) + else: + self.core.information(_('Certificate added.'), _('Info')) + + name = args[0] + + try: + with open(args[1]) as fd: + crt = fd.read() + crt = crt.replace(ssl.PEM_FOOTER, '').replace(ssl.PEM_HEADER, '').replace(' ', '').replace('\n', '') + except Exception as e: + self.core.information('Unable to read the certificate: %s' % e, 'Error') + return + + if len(args) > 2: + management = args[2] + if management: + management = management.lower() + if management not in ('false', '0'): + management = True + else: + management = False + else: + management = False + else: + management = True + + self.core.xmpp.plugin['xep_0257'].add_cert(name, crt, callback=cb, + allow_management=management) + + @command_args_parser.quoted(1) + def command_cert_disable(self, args): + """ + /cert_disable + """ + if not args: + return self.core.command_help('cert_disable') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to disable the certificate.'), _('Error')) + else: + self.core.information(_('Certificate disabled.'), _('Info')) + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].disable_cert(name, callback=cb) + + @command_args_parser.quoted(1) + def command_cert_revoke(self, args): + """ + /cert_revoke + """ + if not args: + return self.core.command_help('cert_revoke') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to revoke the certificate.'), _('Error')) + else: + self.core.information(_('Certificate revoked.'), _('Info')) + + name = args[0] + + self.core.xmpp.plugin['xep_0257'].revoke_cert(name, callback=cb) + + + @command_args_parser.quoted(2) + def command_cert_fetch(self, args): + """ + /cert_fetch + """ + if not args or len(args) < 2: + return self.core.command_help('cert_fetch') + def cb(iq): + if iq['type'] == 'error': + self.core.information(_('Unable to fetch the certificate.'), + _('Error')) + return + + cert = None + for item in iq['sasl_certs']['items']: + if item['name'] == name: + cert = base64.b64decode(item['x509cert']) + break + + if not cert: + return self.core.information(_('Certificate not found.'), _('Info')) + + cert = ssl.DER_cert_to_PEM_cert(cert) + with open(path, 'w') as fd: + fd.write(cert) + + self.core.information(_('File stored at %s') % path, 'Info') + + name = args[0] + path = args[1] + + self.core.xmpp.plugin['xep_0257'].get_certs(callback=cb) + def on_blocked_message(self, message): """ When we try to send a message to a blocked contact -- cgit v1.2.3