summaryrefslogtreecommitdiff
path: root/sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py
diff options
context:
space:
mode:
Diffstat (limited to 'sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py')
-rw-r--r--sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py273
1 files changed, 273 insertions, 0 deletions
diff --git a/sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py b/sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py
new file mode 100644
index 00000000..5492c553
--- /dev/null
+++ b/sleekxmpp/thirdparty/suelta/mechanisms/digest_md5.py
@@ -0,0 +1,273 @@
+import sys
+
+import random
+
+from sleekxmpp.thirdparty.suelta.util import hash, bytes, quote
+from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
+from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
+
+
+
+def parse_challenge(stuff):
+ """
+ """
+ ret = {}
+ var = b''
+ val = b''
+ in_var = True
+ in_quotes = False
+ new = False
+ escaped = False
+ for c in stuff:
+ if sys.version_info >= (3, 0):
+ c = bytes([c])
+ if in_var:
+ if c.isspace():
+ continue
+ if c == b'=':
+ in_var = False
+ new = True
+ else:
+ var += c
+ else:
+ if new:
+ if c == b'"':
+ in_quotes = True
+ else:
+ val += c
+ new = False
+ elif in_quotes:
+ if escaped:
+ escaped = False
+ val += c
+ else:
+ if c == b'\\':
+ escaped = True
+ elif c == b'"':
+ in_quotes = False
+ else:
+ val += c
+ else:
+ if c == b',':
+ if var:
+ ret[var] = val
+ var = b''
+ val = b''
+ in_var = True
+ else:
+ val += c
+ if var:
+ ret[var] = val
+ return ret
+
+
+class DIGEST_MD5(Mechanism):
+
+ """
+ """
+
+ enc_magic = 'Digest session key to client-to-server signing key magic'
+ dec_magic = 'Digest session key to server-to-client signing key magic'
+
+ def __init__(self, sasl, name):
+ """
+ """
+ super(DIGEST_MD5, self).__init__(sasl, name, 3)
+
+ self.hash = hash(name[7:])
+ if self.hash is None:
+ raise SASLCancelled(self.sasl, self)
+
+ if not self.sasl.tls_active():
+ if not self.sasl.sec_query(self, '-ENCRYPTION, DIGEST-MD5'):
+ raise SASLCancelled(self.sasl, self)
+
+ self._rspauth_okay = False
+ self._digest_uri = None
+ self._a1 = None
+ self._enc_buf = b''
+ self._enc_key = None
+ self._enc_seq = 0
+ self._max_buffer = 65536
+ self._dec_buf = b''
+ self._dec_key = None
+ self._dec_seq = 0
+ self._qops = [b'auth']
+ self._qop = b'auth'
+
+ def MAC(self, seq, msg, key):
+ """
+ """
+ mac = hmac.HMAC(key=key, digestmod=self.hash)
+ seqnum = num_to_bytes(seq)
+ mac.update(seqnum)
+ mac.update(msg)
+ return mac.digest()[:10] + b'\x00\x01' + seqnum
+
+
+ def encode(self, text):
+ """
+ """
+ self._enc_buf += text
+
+ def flush(self):
+ """
+ """
+ result = b''
+ # Leave buffer space for the MAC
+ mbuf = self._max_buffer - 10 - 2 - 4
+
+ while self._enc_buf:
+ msg = self._encbuf[:mbuf]
+ mac = self.MAC(self._enc_seq, msg, self._enc_key, self.hash)
+ self._enc_seq += 1
+ msg += mac
+ result += num_to_bytes(len(msg)) + msg
+ self._enc_buf = self._enc_buf[mbuf:]
+
+ return result
+
+ def decode(self, text):
+ """
+ """
+ self._dec_buf += text
+ result = b''
+
+ while len(self._dec_buf) > 4:
+ num = bytes_to_num(self._dec_buf)
+ if len(self._dec_buf) < (num + 4):
+ return result
+
+ mac = self._dec_buf[4:4 + num]
+ self._dec_buf = self._dec_buf[4 + num:]
+ msg = mac[:-16]
+
+ mac_conf = self.MAC(self._dec_mac, msg, self._dec_key)
+ if mac[-16:] != mac_conf:
+ self._desc_sec = None
+ return result
+
+ self._dec_seq += 1
+ result += msg
+
+ return result
+
+ def response(self):
+ """
+ """
+ vitals = ['username']
+ if not self.has_values(['key_hash']):
+ vitals.append('password')
+ self.check_values(vitals)
+
+ resp = {}
+ if 'auth-int' in self._qops:
+ self._qop = b'auth-int'
+ resp['qop'] = self._qop
+ if 'realm' in self.values:
+ resp['realm'] = quote(self.values['realm'])
+
+ resp['username'] = quote(bytes(self.values['username']))
+ resp['nonce'] = quote(self.values['nonce'])
+ if self.values['nc']:
+ self._cnonce = self.values['cnonce']
+ else:
+ self._cnonce = bytes('%s' % random.random())[2:]
+ resp['cnonce'] = quote(self._cnonce)
+ self.values['nc'] += 1
+ resp['nc'] = bytes('%08x' % self.values['nc'])
+
+ service = bytes(self.sasl.service)
+ host = bytes(self.sasl.host)
+ self._digest_uri = service + b'/' + host
+ resp['digest-uri'] = quote(self._digest_uri)
+
+ a2 = b'AUTHENTICATE:' + self._digest_uri
+ if self._qop != b'auth':
+ a2 += b':00000000000000000000000000000000'
+ resp['maxbuf'] = b'16777215' # 2**24-1
+ resp['response'] = self.gen_hash(a2)
+ return b','.join([bytes(k) + b'=' + bytes(v) for k, v in resp.items()])
+
+ def gen_hash(self, a2):
+ """
+ """
+ if not self.has_values(['key_hash']):
+ key_hash = self.hash()
+ user = bytes(self.values['username'])
+ password = bytes(self.values['password'])
+ realm = bytes(self.values['realm'])
+ kh = user + b':' + realm + b':' + password
+ key_hash.update(kh)
+ self.values['key_hash'] = key_hash.digest()
+
+ a1 = self.hash(self.values['key_hash'])
+ a1h = b':' + self.values['nonce'] + b':' + self._cnonce
+ a1.update(a1h)
+ response = self.hash()
+ self._a1 = a1.digest()
+ rv = bytes(a1.hexdigest().lower())
+ rv += b':' + self.values['nonce']
+ rv += b':' + bytes('%08x' % self.values['nc'])
+ rv += b':' + self._cnonce
+ rv += b':' + self._qop
+ rv += b':' + bytes(self.hash(a2).hexdigest().lower())
+ response.update(rv)
+ return bytes(response.hexdigest().lower())
+
+ def mutual_auth(self, cmp_hash):
+ """
+ """
+ a2 = b':' + self._digest_uri
+ if self._qop != b'auth':
+ a2 += b':00000000000000000000000000000000'
+ if self.gen_hash(a2) == cmp_hash:
+ self._rspauth_okay = True
+
+ def prep(self):
+ """
+ """
+ if 'password' in self.values:
+ del self.values['password']
+ self.values['cnonce'] = self._cnonce
+
+ def process(self, challenge=None):
+ """
+ """
+ if challenge is None:
+ if self.has_values(['username', 'realm', 'nonce', 'key_hash',
+ 'nc', 'cnonce', 'qops']):
+ self._qops = self.values['qops']
+ return self.response()
+ else:
+ return None
+
+ d = parse_challenge(challenge)
+ if b'rspauth' in d:
+ self.mutual_auth(d[b'rspauth'])
+ else:
+ if b'realm' not in d:
+ d[b'realm'] = self.sasl.def_realm
+ for key in ['nonce', 'realm']:
+ if bytes(key) in d:
+ self.values[key] = d[bytes(key)]
+ self.values['nc'] = 0
+ self._qops = [b'auth']
+ if b'qop' in d:
+ self._qops = [x.strip() for x in d[b'qop'].split(b',')]
+ self.values['qops'] = self._qops
+ if b'maxbuf' in d:
+ self._max_buffer = int(d[b'maxbuf'])
+ return self.response()
+
+ def okay(self):
+ """
+ """
+ if self._rspauth_okay and self._qop == b'auth-int':
+ self._enc_key = self.hash(self._a1 + self.enc_magic).digest()
+ self._dec_key = self.hash(self._a1 + self.dec_magic).digest()
+ self.encoding = True
+ return self._rspauth_okay
+
+
+register_mechanism('DIGEST-', 30, DIGEST_MD5)