aboutsummaryrefslogtreecommitdiff
path: root/pyfunkwhale/client.py
blob: 2221a355132c8bc989a4ffbf306592f6d22ec687 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python
# -*- conding: utf-8 -*-

import json
from time import time
from requests_oauthlib import OAuth2Session

from pyfunkwhale.utils import read_file, write_file


class Client(object):

    def __init__(self, client_name, redirect_uri, client_id,
                 client_secret, scopes, username, password, domain,
                 authorization_endpoint, token_endpoint,
                 token_filename):
        self.client_name = client_name
        self.redirect_uri = redirect_uri
        self.client_id = client_id
        self.client_secret = client_secret
        self.scopes = scopes
        self.token = None

        self.username = username
        self.password = password
        self.domain = domain

        self.authorization_endpoint = authorization_endpoint
        self.token_endpoint = token_endpoint
        self.token_filename = token_filename
        self.oauth_client = OAuth2Session(
                        self.client_id,
                        redirect_uri=self.redirect_uri,
                        scope=self.scopes)
        self.authorization_url, self.state = self.oauth_client. \
            authorization_url(
                            self.authorization_endpoint)

        try:
            self.token = json.loads(read_file(token_filename))
            self._refresh_token()
        except FileNotFoundError:
            self._get_token()
            write_file(token_filename, self.token)

    def _get_token(self):
        """
        Ask the user to go on the authorization page of the funkwhale instance
        and then wait the response code for fetch the token generated.
        """
        print("For authorizate this app go to:\n{}".format(
                self.authorization_url))
        self.authorization_code = input("Enter response code: ")

        self.token = self.oauth_client.fetch_token(
                                self.token_endpoint,
                                code=self.authorization_code,
                                client_secret=self.client_secret)

    def _refresh_token(self):
        """
        Check if the token is expired in 60 seconds and if True will ask a new
        token from the instance.
        """
        if time() - 60 > self.token["expires_at"]:
            self.token = self.oauth_client.refresh_token(
                self.token_endpoint,
                refresh_token=self.token["refresh_token"],
                client_id=self.client_id,
                client_secret=self.client_secret)
            write_file(self.token_filename, self.token)

    def call(self, endpoint, method, params=None, data=None):
        """
        Call the API

        Parameters
        ----------
        endpoint : str
            The endpoint to call on the API
        method : str
            The HTTP method to use for calling the endpoint
        params : dict, optional
            The uri params for a GET method
        data : dict, optional
            The uri data for a POST method

        Raises
        ------
        requests.exceptions.HTTPError
            If their is an error during requesting the API.
        """
        self._refresh_token()
        headers = {'Authorization': self.token['token_type'] + ' ' +
                   self.token['access_token']}

        call = getattr(self.oauth_client, method)

        r = call(self.domain + '/api/v1/' + endpoint, headers=headers,
                 params=params, data=data)

        r.raise_for_status()

        return r