From 5ab77c745270d7d5c016c1dc7ef2a82533a4b16e Mon Sep 17 00:00:00 2001 From: Florent Le Coz Date: Thu, 17 Jul 2014 14:19:04 +0200 Subject: Rename to slixmpp --- slixmpp/xmlstream/cert.py | 184 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 slixmpp/xmlstream/cert.py (limited to 'slixmpp/xmlstream/cert.py') diff --git a/slixmpp/xmlstream/cert.py b/slixmpp/xmlstream/cert.py new file mode 100644 index 00000000..71146f36 --- /dev/null +++ b/slixmpp/xmlstream/cert.py @@ -0,0 +1,184 @@ +import logging +from datetime import datetime, timedelta + +# Make a call to strptime before starting threads to +# prevent thread safety issues. +datetime.strptime('1970-01-01 12:00:00', "%Y-%m-%d %H:%M:%S") + + +try: + from pyasn1.codec.der import decoder, encoder + from pyasn1.type.univ import Any, ObjectIdentifier, OctetString + from pyasn1.type.char import BMPString, IA5String, UTF8String + from pyasn1.type.useful import GeneralizedTime + from pyasn1_modules.rfc2459 import (Certificate, DirectoryString, + SubjectAltName, GeneralNames, + GeneralName) + from pyasn1_modules.rfc2459 import id_ce_subjectAltName as SUBJECT_ALT_NAME + from pyasn1_modules.rfc2459 import id_at_commonName as COMMON_NAME + + XMPP_ADDR = ObjectIdentifier('1.3.6.1.5.5.7.8.5') + SRV_NAME = ObjectIdentifier('1.3.6.1.5.5.7.8.7') + + HAVE_PYASN1 = True +except ImportError: + HAVE_PYASN1 = False + + +log = logging.getLogger(__name__) + + +class CertificateError(Exception): + pass + + +def decode_str(data): + encoding = 'utf-16-be' if isinstance(data, BMPString) else 'utf-8' + return bytes(data).decode(encoding) + + +def extract_names(raw_cert): + results = {'CN': set(), + 'DNS': set(), + 'SRV': set(), + 'URI': set(), + 'XMPPAddr': set()} + + cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0] + tbs = cert.getComponentByName('tbsCertificate') + subject = tbs.getComponentByName('subject') + extensions = tbs.getComponentByName('extensions') or [] + + # Extract the CommonName(s) from the cert. + for rdnss in subject: + for rdns in rdnss: + for name in rdns: + oid = name.getComponentByName('type') + value = name.getComponentByName('value') + + if oid != COMMON_NAME: + continue + + value = decoder.decode(value, asn1Spec=DirectoryString())[0] + value = decode_str(value.getComponent()) + results['CN'].add(value) + + # Extract the Subject Alternate Names (DNS, SRV, URI, XMPPAddr) + for extension in extensions: + oid = extension.getComponentByName('extnID') + if oid != SUBJECT_ALT_NAME: + continue + + value = decoder.decode(extension.getComponentByName('extnValue'), + asn1Spec=OctetString())[0] + sa_names = decoder.decode(value, asn1Spec=SubjectAltName())[0] + for name in sa_names: + name_type = name.getName() + if name_type == 'dNSName': + results['DNS'].add(decode_str(name.getComponent())) + if name_type == 'uniformResourceIdentifier': + value = decode_str(name.getComponent()) + if value.startswith('xmpp:'): + results['URI'].add(value[5:]) + elif name_type == 'otherName': + name = name.getComponent() + + oid = name.getComponentByName('type-id') + value = name.getComponentByName('value') + + if oid == XMPP_ADDR: + value = decoder.decode(value, asn1Spec=UTF8String())[0] + results['XMPPAddr'].add(decode_str(value)) + elif oid == SRV_NAME: + value = decoder.decode(value, asn1Spec=IA5String())[0] + results['SRV'].add(decode_str(value)) + + return results + + +def extract_dates(raw_cert): + if not HAVE_PYASN1: + log.warning("Could not find pyasn1 and pyasn1_modules. " + \ + "SSL certificate expiration COULD NOT BE VERIFIED.") + return None, None + + cert = decoder.decode(raw_cert, asn1Spec=Certificate())[0] + tbs = cert.getComponentByName('tbsCertificate') + validity = tbs.getComponentByName('validity') + + not_before = validity.getComponentByName('notBefore') + not_before = str(not_before.getComponent()) + + not_after = validity.getComponentByName('notAfter') + not_after = str(not_after.getComponent()) + + if isinstance(not_before, GeneralizedTime): + not_before = datetime.strptime(not_before, '%Y%m%d%H%M%SZ') + else: + not_before = datetime.strptime(not_before, '%y%m%d%H%M%SZ') + + if isinstance(not_after, GeneralizedTime): + not_after = datetime.strptime(not_after, '%Y%m%d%H%M%SZ') + else: + not_after = datetime.strptime(not_after, '%y%m%d%H%M%SZ') + + return not_before, not_after + + +def get_ttl(raw_cert): + not_before, not_after = extract_dates(raw_cert) + if not_after is None: + return None + return not_after - datetime.utcnow() + + +def verify(expected, raw_cert): + if not HAVE_PYASN1: + log.warning("Could not find pyasn1 and pyasn1_modules. " + \ + "SSL certificate COULD NOT BE VERIFIED.") + return + + not_before, not_after = extract_dates(raw_cert) + cert_names = extract_names(raw_cert) + + now = datetime.utcnow() + + if not_before > now: + raise CertificateError( + 'Certificate has not entered its valid date range.') + + if not_after <= now: + raise CertificateError( + 'Certificate has expired.') + + if '.' in expected: + expected_wild = expected[expected.index('.'):] + else: + expected_wild = expected + expected_srv = '_xmpp-client.%s' % expected + + for name in cert_names['XMPPAddr']: + if name == expected: + return True + for name in cert_names['SRV']: + if name == expected_srv or name == expected: + return True + for name in cert_names['DNS']: + if name == expected: + return True + if name.startswith('*'): + if '.' in name: + name_wild = name[name.index('.'):] + else: + name_wild = name + if expected_wild == name_wild: + return True + for name in cert_names['URI']: + if name == expected: + return True + for name in cert_names['CN']: + if name == expected: + return True + + raise CertificateError( + 'Could not match certficate against hostname: %s' % expected) -- cgit v1.2.3