summaryrefslogtreecommitdiff
path: root/slixmpp/plugins/xep_0235/stanza.py
blob: 5205b07ce2bac7f3d379c6840e7704e306f399e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
    Slixmpp: The Slick XMPP Library
    Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
    This file is part of Slixmpp.

    See the file LICENSE for copying permission.
"""

import hmac
import hashlib
import urllib
import base64

from slixmpp.xmlstream import ET, ElementBase, JID


class OAuth(ElementBase):

    name = 'oauth'
    namespace = 'urn:xmpp:oauth:0'
    plugin_attrib = 'oauth'
    interfaces = {'oauth_consumer_key', 'oauth_nonce', 'oauth_signature',
                  'oauth_signature_method', 'oauth_timestamp', 'oauth_token',
                  'oauth_version'}
    sub_interfaces = interfaces

    def generate_signature(self, stanza, sfrom, sto, consumer_secret,
                                 token_secret, method='HMAC-SHA1'):
        self['oauth_signature_method'] = method

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        self['oauth_signature'] = sig
        return sig

    def verify_signature(self, stanza, sfrom, sto, consumer_secret,
                               token_secret):
        method = self['oauth_signature_method']

        request = urllib.quote('%s&%s' % (sfrom, sto), '')
        parameters = urllib.quote('&'.join([
            'oauth_consumer_key=%s' % self['oauth_consumer_key'],
            'oauth_nonce=%s' % self['oauth_nonce'],
            'oauth_signature_method=%s' % self['oauth_signature_method'],
            'oauth_timestamp=%s' % self['oauth_timestamp'],
            'oauth_token=%s' % self['oauth_token'],
            'oauth_version=%s' % self['oauth_version']
        ]), '')

        sigbase = '%s&%s&%s' % (stanza, request, parameters)

        consumer_secret = urllib.quote(consumer_secret, '')
        token_secret = urllib.quote(token_secret, '')
        key = '%s&%s' % (consumer_secret, token_secret)

        if method == 'HMAC-SHA1':
            sig = base64.b64encode(hmac.new(key, sigbase, hashlib.sha1).digest())
        elif method == 'PLAINTEXT':
            sig = key

        return self['oauth_signature'] == sig