diff options
Diffstat (limited to 'sleekxmpp/util/sasl/mechanisms.py')
-rw-r--r-- | sleekxmpp/util/sasl/mechanisms.py | 550 |
1 files changed, 0 insertions, 550 deletions
diff --git a/sleekxmpp/util/sasl/mechanisms.py b/sleekxmpp/util/sasl/mechanisms.py deleted file mode 100644 index 7a7ebf7b..00000000 --- a/sleekxmpp/util/sasl/mechanisms.py +++ /dev/null @@ -1,550 +0,0 @@ -# -*- coding: utf-8 -*- -""" - sleekxmpp.util.sasl.mechanisms - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - A collection of supported SASL mechanisms. - - This module was originally based on Dave Cridland's Suelta library. - - Part of SleekXMPP: The Sleek XMPP Library - - :copryight: (c) 2004-2013 David Alan Cridland - :copyright: (c) 2013 Nathanael C. Fritz, Lance J.T. Stout - - :license: MIT, see LICENSE for more details -""" - -import sys -import hmac -import random - -from base64 import b64encode, b64decode - -from sleekxmpp.util import bytes, hash, XOR, quote, num_to_bytes -from sleekxmpp.util.sasl.client import sasl_mech, Mech, \ - SASLCancelled, SASLFailed, \ - SASLMutualAuthFailed - - -@sasl_mech(0) -class ANONYMOUS(Mech): - - name = 'ANONYMOUS' - - def process(self, challenge=b''): - return b'Anonymous, Suelta' - - -@sasl_mech(1) -class LOGIN(Mech): - - name = 'LOGIN' - required_credentials = set(['username', 'password']) - - def setup(self, name): - self.step = 0 - - def process(self, challenge=b''): - if not challenge: - return b'' - - if self.step == 0: - self.step = 1 - return self.credentials['username'] - else: - return self.credentials['password'] - - -@sasl_mech(2) -class PLAIN(Mech): - - name = 'PLAIN' - required_credentials = set(['username', 'password']) - optional_credentials = set(['authzid']) - security = set(['encrypted', 'encrypted_plain', 'unencrypted_plain']) - - def setup(self, name): - if not self.security_settings['encrypted']: - if not self.security_settings['unencrypted_plain']: - raise SASLCancelled('PLAIN without encryption') - else: - if not self.security_settings['encrypted_plain']: - raise SASLCancelled('PLAIN with encryption') - - def process(self, challenge=b''): - authzid = self.credentials['authzid'] - authcid = self.credentials['username'] - password = self.credentials['password'] - return authzid + b'\x00' + authcid + b'\x00' + password - - -@sasl_mech(100) -class EXTERNAL(Mech): - - name = 'EXTERNAL' - optional_credentials = set(['authzid']) - - def process(self, challenge=b''): - return self.credentials['authzid'] - - -@sasl_mech(31) -class X_FACEBOOK_PLATFORM(Mech): - - name = 'X-FACEBOOK-PLATFORM' - required_credentials = set(['api_key', 'access_token']) - - def process(self, challenge=b''): - if challenge: - values = {} - for kv in challenge.split(b'&'): - key, value = kv.split(b'=') - values[key] = value - - resp_data = { - b'method': values[b'method'], - b'v': b'1.0', - b'call_id': b'1.0', - b'nonce': values[b'nonce'], - b'access_token': self.credentials['access_token'], - b'api_key': self.credentials['api_key'] - } - - resp = '&'.join(['%s=%s' % (k.decode("utf-8"), v.decode("utf-8")) for k, v in resp_data.items()]) - return bytes(resp) - return b'' - - -@sasl_mech(10) -class X_MESSENGER_OAUTH2(Mech): - - name = 'X-MESSENGER-OAUTH2' - required_credentials = set(['access_token']) - - def process(self, challenge=b''): - return self.credentials['access_token'] - - -@sasl_mech(10) -class X_OAUTH2(Mech): - - name = 'X-OAUTH2' - required_credentials = set(['username', 'access_token']) - - def process(self, challenge=b''): - return b'\x00' + self.credentials['username'] + \ - b'\x00' + self.credentials['access_token'] - - -@sasl_mech(3) -class X_GOOGLE_TOKEN(Mech): - - name = 'X-GOOGLE-TOKEN' - required_credentials = set(['email', 'access_token']) - - def process(self, challenge=b''): - email = self.credentials['email'] - token = self.credentials['access_token'] - return b'\x00' + email + b'\x00' + token - - -@sasl_mech(20) -class CRAM(Mech): - - name = 'CRAM' - use_hashes = True - required_credentials = set(['username', 'password']) - security = set(['encrypted', 'unencrypted_cram']) - - def setup(self, name): - self.hash_name = name[5:] - self.hash = hash(self.hash_name) - if self.hash is None: - raise SASLCancelled('Unknown hash: %s' % self.hash_name) - if not self.security_settings['encrypted']: - if not self.security_settings['unencrypted_cram']: - raise SASLCancelled('Unecrypted CRAM-%s' % self.hash_name) - - def process(self, challenge=b''): - if not challenge: - return None - - username = self.credentials['username'] - password = self.credentials['password'] - - mac = hmac.HMAC(key=password, digestmod=self.hash) - mac.update(challenge) - - return username + b' ' + bytes(mac.hexdigest()) - - -@sasl_mech(60) -class SCRAM(Mech): - - name = 'SCRAM' - use_hashes = True - channel_binding = True - required_credentials = set(['username', 'password']) - optional_credentials = set(['authzid', 'channel_binding']) - security = set(['encrypted', 'unencrypted_scram']) - - def setup(self, name): - self.use_channel_binding = False - if name[-5:] == '-PLUS': - name = name[:-5] - self.use_channel_binding = True - - self.hash_name = name[6:] - self.hash = hash(self.hash_name) - - if self.hash is None: - raise SASLCancelled('Unknown hash: %s' % self.hash_name) - if not self.security_settings['encrypted']: - if not self.security_settings['unencrypted_scram']: - raise SASLCancelled('Unencrypted SCRAM') - - self.step = 0 - self._mutual_auth = False - - def HMAC(self, key, msg): - return hmac.HMAC(key=key, msg=msg, digestmod=self.hash).digest() - - def Hi(self, text, salt, iterations): - text = bytes(text) - ui1 = self.HMAC(text, salt + b'\0\0\0\01') - ui = ui1 - for i in range(iterations - 1): - ui1 = self.HMAC(text, ui1) - ui = XOR(ui, ui1) - return ui - - def H(self, text): - return self.hash(text).digest() - - def saslname(self, value): - value = value.decode("utf-8") - escaped = [] - for char in value: - if char == ',': - escaped += '=2C' - elif char == '=': - escaped += '=3D' - else: - escaped += char - return "".join(escaped).encode("utf-8") - - def parse(self, challenge): - items = {} - for key, value in [item.split(b'=', 1) for item in challenge.split(b',')]: - items[key] = value - return items - - def process(self, challenge=b''): - steps = [self.process_1, self.process_2, self.process_3] - return steps[self.step](challenge) - - def process_1(self, challenge): - self.step = 1 - data = {} - - self.cnonce = bytes(('%s' % random.random())[2:]) - - gs2_cbind_flag = b'n' - if self.credentials['channel_binding']: - if self.use_channel_binding: - gs2_cbind_flag = b'p=tls-unique' - else: - gs2_cbind_flag = b'y' - - authzid = b'' - if self.credentials['authzid']: - authzid = b'a=' + self.saslname(self.credentials['authzid']) - - self.gs2_header = gs2_cbind_flag + b',' + authzid + b',' - - nonce = b'r=' + self.cnonce - username = b'n=' + self.saslname(self.credentials['username']) - - self.client_first_message_bare = username + b',' + nonce - self.client_first_message = self.gs2_header + \ - self.client_first_message_bare - - return self.client_first_message - - def process_2(self, challenge): - self.step = 2 - - data = self.parse(challenge) - if b'm' in data: - raise SASLCancelled('Received reserved attribute.') - - salt = b64decode(data[b's']) - iteration_count = int(data[b'i']) - nonce = data[b'r'] - - if nonce[:len(self.cnonce)] != self.cnonce: - raise SASLCancelled('Invalid nonce') - - cbind_data = b'' - if self.use_channel_binding: - cbind_data = self.credentials['channel_binding'] - cbind_input = self.gs2_header + cbind_data - channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'') - - client_final_message_without_proof = channel_binding + b',' + \ - b'r=' + nonce - - salted_password = self.Hi(self.credentials['password'], - salt, - iteration_count) - client_key = self.HMAC(salted_password, b'Client Key') - stored_key = self.H(client_key) - auth_message = self.client_first_message_bare + b',' + \ - challenge + b',' + \ - client_final_message_without_proof - client_signature = self.HMAC(stored_key, auth_message) - client_proof = XOR(client_key, client_signature) - server_key = self.HMAC(salted_password, b'Server Key') - - self.server_signature = self.HMAC(server_key, auth_message) - - client_final_message = client_final_message_without_proof + \ - b',p=' + b64encode(client_proof) - - return client_final_message - - def process_3(self, challenge): - data = self.parse(challenge) - verifier = data.get(b'v', None) - error = data.get(b'e', 'Unknown error') - - if not verifier: - raise SASLFailed(error) - - if b64decode(verifier) != self.server_signature: - raise SASLMutualAuthFailed() - - self._mutual_auth = True - - return b'' - - -@sasl_mech(30) -class DIGEST(Mech): - - name = 'DIGEST' - use_hashes = True - required_credentials = set(['username', 'password', 'realm', 'service', 'host']) - optional_credentials = set(['authzid', 'service-name']) - security = set(['encrypted', 'unencrypted_digest']) - - def setup(self, name): - self.hash_name = name[7:] - self.hash = hash(self.hash_name) - if self.hash is None: - raise SASLCancelled('Unknown hash: %s' % self.hash_name) - if not self.security_settings['encrypted']: - if not self.security_settings['unencrypted_digest']: - raise SASLCancelled('Unencrypted DIGEST') - - self.qops = [b'auth'] - self.qop = b'auth' - self.maxbuf = b'65536' - self.nonce = b'' - self.cnonce = b'' - self.nonce_count = 1 - - def parse(self, challenge=b''): - data = {} - var_name = b'' - var_value = b'' - - # States: var, new_var, end, quote, escaped_quote - state = 'var' - - - for char in challenge: - if sys.version_info >= (3, 0): - char = bytes([char]) - - if state == 'var': - if char.isspace(): - continue - if char == b'=': - state = 'value' - else: - var_name += char - elif state == 'value': - if char == b'"': - state = 'quote' - elif char == b',': - if var_name: - data[var_name.decode('utf-8')] = var_value - var_name = b'' - var_value = b'' - state = 'var' - else: - var_value += char - elif state == 'escaped': - var_value += char - elif state == 'quote': - if char == b'\\': - state = 'escaped' - elif char == b'"': - state = 'end' - else: - var_value += char - else: - if char == b',': - if var_name: - data[var_name.decode('utf-8')] = var_value - var_name = b'' - var_value = b'' - state = 'var' - else: - var_value += char - - if var_name: - data[var_name.decode('utf-8')] = var_value - var_name = b'' - var_value = b'' - state = 'var' - return data - - def MAC(self, key, seq, msg): - mac = hmac.HMAC(key=key, digestmod=self.hash) - seqnum = num_to_bytes(seq) - mac.update(seqnum) - mac.update(msg) - return mac.digest()[:10] + b'\x00\x01' + seqnum - - def A1(self): - username = self.credentials['username'] - password = self.credentials['password'] - authzid = self.credentials['authzid'] - realm = self.credentials['realm'] - - a1 = self.hash() - a1.update(username + b':' + realm + b':' + password) - a1 = a1.digest() - a1 += b':' + self.nonce + b':' + self.cnonce - if authzid: - a1 += b':' + authzid - - return bytes(a1) - - def A2(self, prefix=b''): - a2 = prefix + b':' + self.digest_uri() - if self.qop in (b'auth-int', b'auth-conf'): - a2 += b':00000000000000000000000000000000' - return bytes(a2) - - def response(self, prefix=b''): - nc = bytes('%08x' % self.nonce_count) - - a1 = bytes(self.hash(self.A1()).hexdigest().lower()) - a2 = bytes(self.hash(self.A2(prefix)).hexdigest().lower()) - s = self.nonce + b':' + nc + b':' + self.cnonce + \ - b':' + self.qop + b':' + a2 - - return bytes(self.hash(a1 + b':' + s).hexdigest().lower()) - - def digest_uri(self): - serv_type = self.credentials['service'] - serv_name = self.credentials['service-name'] - host = self.credentials['host'] - - uri = serv_type + b'/' + host - if serv_name and host != serv_name: - uri += b'/' + serv_name - return uri - - def respond(self): - data = { - 'username': quote(self.credentials['username']), - 'authzid': quote(self.credentials['authzid']), - 'realm': quote(self.credentials['realm']), - 'nonce': quote(self.nonce), - 'cnonce': quote(self.cnonce), - 'nc': bytes('%08x' % self.nonce_count), - 'qop': self.qop, - 'digest-uri': quote(self.digest_uri()), - 'response': self.response(b'AUTHENTICATE'), - 'maxbuf': self.maxbuf, - 'charset': 'utf-8' - } - resp = b'' - for key, value in data.items(): - if value and value != b'""': - resp += b',' + bytes(key) + b'=' + bytes(value) - return resp[1:] - - def process(self, challenge=b''): - if not challenge: - if self.cnonce and self.nonce and self.nonce_count and self.qop: - self.nonce_count += 1 - return self.respond() - return None - - data = self.parse(challenge) - if 'rspauth' in data: - if data['rspauth'] != self.response(): - raise SASLMutualAuthFailed() - else: - self.nonce_count = 1 - self.cnonce = bytes('%s' % random.random())[2:] - self.qops = data.get('qop', [b'auth']) - self.qop = b'auth' - if 'nonce' in data: - self.nonce = data['nonce'] - if 'realm' in data and not self.credentials['realm']: - self.credentials['realm'] = data['realm'] - - return self.respond() - - -try: - import kerberos -except ImportError: - pass -else: - @sasl_mech(75) - class GSSAPI(Mech): - - name = 'GSSAPI' - required_credentials = set(['username', 'service-name']) - optional_credentials = set(['authzid']) - - def setup(self, name): - authzid = self.credentials['authzid'] - if not authzid: - authzid = 'xmpp@%s' % self.credentials['service-name'] - - _, self.gss = kerberos.authGSSClientInit(authzid) - self.step = 0 - - def process(self, challenge=b''): - b64_challenge = b64encode(challenge) - try: - if self.step == 0: - result = kerberos.authGSSClientStep(self.gss, b64_challenge) - if result != kerberos.AUTH_GSS_CONTINUE: - self.step = 1 - elif not challenge: - kerberos.authGSSClientClean(self.gss) - return b'' - elif self.step == 1: - username = self.credentials['username'] - - kerberos.authGSSClientUnwrap(self.gss, b64_challenge) - resp = kerberos.authGSSClientResponse(self.gss) - kerberos.authGSSClientWrap(self.gss, resp, username) - - resp = kerberos.authGSSClientResponse(self.gss) - except kerberos.GSSError as e: - raise SASLCancelled('Kerberos error: %s' % e) - if not resp: - return b'' - else: - return b64decode(resp) |