summaryrefslogtreecommitdiff
path: root/sleekxmpp/plugins/xep_0235/stanza.py
blob: 0050d583657e24566a46ddde6aecd5e1f3f030fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
    SleekXMPP: The Sleek XMPP Library
    Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of SleekXMPP.

    See the file LICENSE for copying permission.
"""

import hmac
import hashlib
import urllib
import base64

from sleekxmpp.xmlstream import ET, ElementBase, JID


class OAuth(ElementBase):

    name = 'oauth'
    namespace = 'urn:xmpp:oauth:0'
    plugin_attrib = 'oauth'
    interfaces = set(['oauth_consumer_key', 'oauth_nonce', 'oauth_signature',
                      'oauth_signature_method', 'oauth_timestamp',
                      'oauth_token', 'oauth_version'])
    sub_interfaces = interfaces

    def generate_signature(self, stanza, sfrom, sto, consumer_secret,
                                 token_secret, method='HMAC-SHA1'):
        self['oauth_signature_method'] = method

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        self['oauth_signature'] = sig
        return sig

    def verify_signature(self, stanza, sfrom, sto, consumer_secret,
                               token_secret):
        method = self['oauth_signature_method']

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        return self['oauth_signature'] == sig