1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#!/usr/bin/env python
# -*- conding: utf-8 -*-
import json
import re
from time import time
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2.rfc6749.errors import InvalidScopeError
from requests.models import Response
from pyfunkwhale.utils import read_file, write_file
class Client(object):
def __init__(self, client_name, redirect_uri, client_id,
client_secret, scopes, username, password, domain,
authorization_endpoint, token_endpoint,
token_filename):
self.client_name = client_name
self.redirect_uri = redirect_uri
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes
self.token = None
self.username = username
self.password = password
self.domain = domain
self.authorization_endpoint = authorization_endpoint
self.token_endpoint = token_endpoint
self.token_filename = token_filename
self.oauth_client = OAuth2Session(
self.client_id,
redirect_uri=self.redirect_uri,
scope=self.scopes)
self.authorization_url, self.state = self.oauth_client. \
authorization_url(
self.authorization_endpoint)
try:
self.token = json.loads(read_file(token_filename))
self._refresh_token()
except FileNotFoundError:
self._get_token()
write_file(token_filename, self.token)
def _get_token(self):
"""
Ask the user to go on the authorization page of the funkwhale instance
and then wait the response code for fetch the token generated.
"""
print("For authorizate this app go to:\n{}".format(
self.authorization_url))
self.authorization_code = input("Enter response code: ")
self.token = self.oauth_client.fetch_token(
self.token_endpoint,
code=self.authorization_code,
client_secret=self.client_secret)
def _refresh_token(self):
"""
Check if the token is expired in 60 seconds and if True will ask a new
token from the instance.
"""
if time() - 60 > self.token["expires_at"]:
try:
self.token = self.oauth_client.refresh_token(
self.token_endpoint,
refresh_token=self.token["refresh_token"],
client_id=self.client_id,
client_secret=self.client_secret)
except InvalidScopeError:
self._get_token()
write_file(self.token_filename, self.token)
def _force_refresh_token(self):
"""
Force the refresh of the OAuth2 token
"""
self._get_token()
write_file(self.token_filename, self.token)
def _get_JWT_token(self) -> dict:
"""
Get a JWT token.
"""
data = {"username": self.username, "password": self.password}
return self.call('/token', 'post', data=data).json()
def call(self, endpoint: str, method: str, params: dict = None,
data: dict = None, headers: dict = None) -> Response:
"""
Call the API
Parameters
----------
endpoint : str
The endpoint to call on the API
method : str
The HTTP method to use for calling the endpoint
params : dict, optional
The uri params for a GET method
data : dict, optional
The uri data for a POST method
Raises
------
requests.exceptions.HTTPError
If their is an error during requesting the API.
"""
self._refresh_token()
if headers is None:
headers = {'Authorization': self.token['token_type'] + ' ' +
self.token['access_token']}
call = getattr(self.oauth_client, method)
endpoint = re.sub(r'^\/', '', endpoint)
r = call(self.domain + '/api/v1/' + endpoint, headers=headers,
params=params, data=data)
if r.status_code == 401:
self._force_refresh_token()
r = call(self.domain + '/api/v1/' + endpoint, headers=headers,
params=params, data=data)
r.raise_for_status()
return r
|