summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0235/stanza.py
blob: 3dec7b734504f6b6c764d0573d8683c4f7cfb17b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

# Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import hmac
import hashlib
import urllib
import base64

from slixmpp.xmlstream import ET, ElementBase, JID


class OAuth(ElementBase):

    name = 'oauth'
    namespace = 'urn:xmpp:oauth:0'
    plugin_attrib = 'oauth'
    interfaces = {'oauth_consumer_key', 'oauth_nonce', 'oauth_signature',
                  'oauth_signature_method', 'oauth_timestamp', 'oauth_token',
                  'oauth_version'}
    sub_interfaces = interfaces

    def generate_signature(self, stanza, sfrom, sto, consumer_secret,
                                 token_secret, method='HMAC-SHA1'):
        self['oauth_signature_method'] = method

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        self['oauth_signature'] = sig
        return sig

    def verify_signature(self, stanza, sfrom, sto, consumer_secret,
                               token_secret):
        method = self['oauth_signature_method']

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        return self['oauth_signature'] == sig