-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
175 lines (150 loc) · 6.68 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from functools import partial
try:
from urllib import urlencode
except ImportError:
from urllib.parse import urlencode
from soundcloud.resource import wrapped_resource
from soundcloud.request import make_request
class Client(object):
"""A client for interacting with Soundcloud resources."""
use_ssl = True
host = 'api.soundcloud.com'
auth_host = 'secure.soundcloud.com'
def __init__(self, **kwargs):
"""Create a client instance with the provided options. Options should
be passed in as kwargs.
"""
self.use_ssl = kwargs.get('use_ssl', self.use_ssl)
self.host = kwargs.get('host', self.host)
self.auth_host = kwargs.get('auth_host', self.auth_host)
self.scheme = self.use_ssl and 'https://' or 'http://'
self.options = kwargs
self._authorize_url = None
self.client_id = kwargs.get('client_id')
if 'access_token' in kwargs:
self.access_token = kwargs.get('access_token')
return
if 'client_id' not in kwargs:
raise TypeError("At least a client_id must be provided.")
if 'scope' in kwargs:
self.scope = kwargs.get('scope')
# decide which protocol flow to follow based on the arguments
# provided by the caller.
if self._options_for_authorization_code_flow_present():
self._authorization_code_flow()
elif self._options_for_credentials_flow_present():
self._credentials_flow()
elif self._options_for_token_refresh_present():
self._refresh_token_flow()
def exchange_token(self, code):
"""Given the value of the code parameter, request an access token."""
url = '%s%s/oauth/token' % (self.scheme, self.auth_host)
options = {
'grant_type': 'authorization_code',
'redirect_uri': self._redirect_uri(),
'client_id': self.options.get('client_id'),
'client_secret': self.options.get('client_secret'),
'code': code,
}
options.update({
'verify_ssl': self.options.get('verify_ssl', True),
'proxies': self.options.get('proxies', None)
})
self.token = wrapped_resource(
make_request('post', url, options))
self.access_token = self.token.access_token
return self.token
def authorize_url(self):
"""Return the authorization URL for OAuth2 authorization code flow."""
return self._authorize_url
def _authorization_code_flow(self):
"""Build the the auth URL so the user can authorize the app."""
options = {
'scope': getattr(self, 'scope', 'non-expiring'),
'client_id': self.options.get('client_id'),
'response_type': 'code',
'redirect_uri': self._redirect_uri()
}
url = '%s%s/authorize' % (self.scheme, self.auth_host)
self._authorize_url = '%s?%s' % (url, urlencode(options))
def _refresh_token_flow(self):
"""Given a refresh token, obtain a new access token."""
url = '%s%s/oauth/token' % (self.scheme, self.auth_host)
options = {
'grant_type': 'refresh_token',
'client_id': self.options.get('client_id'),
'client_secret': self.options.get('client_secret'),
'refresh_token': self.options.get('refresh_token')
}
options.update({
'verify_ssl': self.options.get('verify_ssl', True),
'proxies': self.options.get('proxies', None)
})
self.token = wrapped_resource(
make_request('post', url, options))
self.access_token = self.token.access_token
def _credentials_flow(self):
"""Given a username and password, obtain an access token."""
url = '%s%s/oauth/token' % (self.scheme, self.auth_host)
options = {
'client_id': self.options.get('client_id'),
'client_secret': self.options.get('client_secret'),
'username': self.options.get('username'),
'password': self.options.get('password'),
'scope': getattr(self, 'scope', ''),
'grant_type': 'password'
}
options.update({
'verify_ssl': self.options.get('verify_ssl', True),
'proxies': self.options.get('proxies', None)
})
self.token = wrapped_resource(
make_request('post', url, options))
self.access_token = self.token.access_token
def _request(self, method, resource, **kwargs):
"""Given an HTTP method, a resource name and kwargs, construct a
request and return the response.
"""
url = self._resolve_resource_name(resource)
if hasattr(self, 'access_token'):
kwargs.update(dict(oauth_token=self.access_token))
if hasattr(self, 'client_id'):
kwargs.update(dict(client_id=self.client_id))
kwargs.update({
'verify_ssl': self.options.get('verify_ssl', True),
'proxies': self.options.get('proxies', None)
})
return wrapped_resource(make_request(method, url, kwargs))
def __getattr__(self, name, **kwargs):
"""Translate an HTTP verb into a request method."""
if name not in ('get', 'post', 'put', 'head', 'delete'):
raise AttributeError
return partial(self._request, name, **kwargs)
def _resolve_resource_name(self, name):
"""Convert a resource name (e.g. tracks) into a URI."""
if name[:4] == 'http': # already a url
return name
name = name.rstrip('/').lstrip('/')
return '%s%s/%s' % (self.scheme, self.host, name)
def _redirect_uri(self):
"""
Return the redirect uri. Checks for ``redirect_uri`` or common typo,
``redirect_url``
"""
return self.options.get(
'redirect_uri',
self.options.get('redirect_url', None))
# Helper functions for testing arguments provided to the constructor.
def _options_present(self, options, kwargs):
return all(map(lambda k: k in kwargs, options))
def _options_for_credentials_flow_present(self):
required = ('client_id', 'client_secret', 'username', 'password')
return self._options_present(required, self.options)
def _options_for_authorization_code_flow_present(self):
required = ('client_id', 'redirect_uri')
or_required = ('client_id', 'redirect_url')
return (self._options_present(required, self.options) or
self._options_present(or_required, self.options))
def _options_for_token_refresh_present(self):
required = ('client_id', 'client_secret', 'refresh_token')
return self._options_present(required, self.options)