import configparser
import json
import logging
import os
import time
from os.path import exists, expanduser
from pathlib import Path
from typing import List, Optional, Tuple
import requests
from requests.auth import HTTPBasicAuth
NULL_TOKEN = '', 0
[docs]class MissingCredentials(Exception):
pass
[docs]class Credentials:
"""Used to fetch and store credentials and to generate/cache an access token.
:param client_id: The client id
:type str:
:param client_secret: The client secret
:type str:
:param auth_endpoint: The auth endpoint
:type str:
:param api_endpoint: The api endpoint
:type str:"""
def __init__(
self,
client_id: str,
client_secret: str,
auth_endpoint: str,
api_endpoint: str,
cached_profile: str = None,
cache_path: Path = Path(expanduser('~/.lucidtech/token-cache.json')),
):
if not all([client_id, client_secret, auth_endpoint, api_endpoint]):
raise MissingCredentials
self._token = read_token_from_cache(cached_profile, cache_path) if cached_profile else NULL_TOKEN
self.client_id = client_id
self.client_secret = client_secret
self.auth_endpoint = auth_endpoint
self.api_endpoint = api_endpoint
self.cached_profile = cached_profile
self.cache_path = cache_path
@property
def access_token(self) -> str:
access_token, expiration = self._token
if not access_token or time.time() > expiration:
access_token, expiration = self._get_client_credentials()
self._token = (access_token, expiration)
if self.cached_profile:
write_token_to_cache(self.cached_profile, self._token, self.cache_path)
return access_token
def _get_client_credentials(self) -> Tuple[str, int]:
url = f'https://{self.auth_endpoint}/token?grant_type=client_credentials'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
auth = HTTPBasicAuth(self.client_id, self.client_secret)
response = requests.post(url, headers=headers, auth=auth)
response.raise_for_status()
response_data = response.json()
return response_data['access_token'], time.time() + response_data['expires_in']
[docs]def read_token_from_cache(cached_profile: str, cache_path: Path):
if not cache_path.exists():
return NULL_TOKEN
try:
cache = json.loads(cache_path.read_text())
return cache[cached_profile]['access_token'], cache[cached_profile]['expires_in']
except Exception as e:
logging.warning(e)
return NULL_TOKEN
[docs]def write_token_to_cache(cached_profile, token, cache_path: Path):
if not cache_path.exists():
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache = {}
else:
cache = json.loads(cache_path.read_text())
access_token, expires_in = token
cache[cached_profile] = {
'access_token': access_token,
'expires_in': expires_in,
}
cache_path.write_text(json.dumps(cache, indent=2))
[docs]def read_from_environ() -> List[Optional[str]]:
"""Read the following environment variables and return them:
- LAS_CLIENT_ID
- LAS_CLIENT_SECRET
- LAS_AUTH_ENDPOINT
- LAS_API_ENDPOINT
:return: List of client_id, client_secret, auth_endpoint, api_endpoint
:rtype: List[Optional[str]]"""
return [os.environ.get(k) for k in (
'LAS_CLIENT_ID',
'LAS_CLIENT_SECRET',
'LAS_AUTH_ENDPOINT',
'LAS_API_ENDPOINT',
)]
[docs]def read_from_file(credentials_path: str = expanduser('~/.lucidtech/credentials.cfg'),
section: str = 'default') -> List[Optional[str]]:
"""Read a config file and return credentials from it. Defaults to '~/.lucidtech/credentials.cfg'.
:param credentials_path: Path to read credentials from.
:type credentials_path: str
:param section: Section to read credentials from.
:type section: str
:return: List of client_id, client_secret, auth_endpoint, api_endpoint
:rtype: List[Optional[str]]"""
if not exists(credentials_path):
raise MissingCredentials
config = configparser.ConfigParser()
config.read(credentials_path)
client_id = config.get(section, 'client_id')
client_secret = config.get(section, 'client_secret')
auth_endpoint = config.get(section, 'auth_endpoint')
api_endpoint = config.get(section, 'api_endpoint')
cached_profile = section if config.get(section, 'use_cache', fallback=False) in ['true', 'True'] else None
return [client_id, client_secret, auth_endpoint, api_endpoint, cached_profile]
[docs]def guess_credentials() -> Credentials:
"""Tries to fetch Credentials first by looking at the environment variables, next by looking at the default
credentials path ~/.lucidtech/credentials.cfg. Note that if not all the required environment variables
are present, _all_ variables will be disregarded, and the credentials in the default path will be used.
:return: Credentials from file
:rtype: :py:class:`~las.Credentials`
:raises: :py:class:`~las.MissingCredentials`"""
for guesser in [read_from_environ, read_from_file]:
args = guesser() # type: ignore
if all(args[:4]):
return Credentials(*args)
raise MissingCredentials