Source code for meteofrance_publicapi.core
"""Core module for the meteofranceapi package."""
from pathlib import Path
import time
import requests
import logging
from .const import EXPIRED_TOKEN_CODE, SUCCESS_CODE, PARAMETER_ERROR_CODE, MISSING_DATA_CODE
from .errors import MissingParameterError, MissingDataError
logger = logging.getLogger(__name__)
[docs]
class MeteoFranceAPI:
def __init__(self,
api_key: str | None = None,
token: str | None = None,
application_id: str | None = None,
):
"""Init the MeteoFranceAPI object.
"""
self.api_key = api_key
self.token = token
self.application_id = application_id
self.session = requests.Session()
self.connect()
[docs]
def connect(self):
"""Connect to the meteo-France API.
If the API key is provided, it is used to authenticate the user.
If the token is provided, it is used to authenticate the user.
If the application ID is provided, a token is requested from the API.
"""
if self.api_key is None and self.token is None:
if self.application_id is None:
raise ValueError("api_key or token or application_id must be provided")
self.token = self.get_token()
if self.api_key is not None:
logger.debug("using api key")
self.session.headers.update({"apikey": self.api_key})
else:
logger.debug("using token")
self.session.headers.update({"Authorization": "Bearer " + self.token})
[docs]
def get_token(self):
"""request a token from the meteo-France API.
The token lasts 1 hour, and is used to authenticate the user.
If a new token is requested before the previous one expires, the previous one is invalidated.
A local cache is used to avoid requesting a new token at each run of the script.
"""
# cache the token for 1 hour
TOKEN_DURATION_S = 3600
local_tmp_cache = Path("/tmp/meteofrance/arome/")
cache_filename = local_tmp_cache / "token.txt"
cache_time_filename = local_tmp_cache / "token_time.txt"
if cache_filename.exists() and cache_time_filename.exists():
logger.debug("reading token from cache")
with open(cache_time_filename) as f:
cache_time = float(f.read())
if cache_time >= (time.time() - TOKEN_DURATION_S ):
with open(cache_filename) as f:
token = f.read()
return token
token_entrypoint = " https://portail-api.meteofrance.fr/token"
params = {"grant_type": "client_credentials"}
header = {"Authorization": "Basic " + self.application_id}
res = requests.post(token_entrypoint,
params=params,
headers=header
)
self.token = res.json()["access_token"]
# save token to file
local_tmp_cache.mkdir(parents=True, exist_ok=True)
with open(cache_filename, "w") as f:
f.write(self.token)
with open(cache_time_filename, "w") as f:
f.write(str(time.time()))
return self.token
[docs]
def _get_request(self, url, params=None):
"""Make a get request to the API.
Parameters
----------
url : str
the url to request
params : dict
the parameters to pass to the request
Returns
-------
requests.Response
the response of the request
"""
logger.debug(f"GET {url}")
res = self.session.get(url, params=params)
if self._token_expired(res):
logger.info("token expired, requesting a new one")
self.get_token()
self.connect()
res = self.session.get(url, params=params)
if self._token_expired(res):
raise ValueError("token expired but could not get a new one")
error_code = res.status_code
if error_code == SUCCESS_CODE:
logger.debug("request successful")
if error_code == PARAMETER_ERROR_CODE:
logger.error("parameter error")
raise MissingParameterError(res.text)
if error_code == MISSING_DATA_CODE:
logger.error("missing data")
raise MissingDataError(res.text)
if error_code != SUCCESS_CODE:
# Quick and dirty error handling
# TODO: implement a proper error handling
# for each error code of the API
logger.error(f"error code: {error_code}")
raise ValueError(f"error code: {error_code}")
return res
[docs]
def _token_expired(self, res):
"""Check if the token is expired.
Returns
-------
bool
True if the token is expired, False otherwise.
"""
status = res.status_code
if status == EXPIRED_TOKEN_CODE:
if "application/json" in res.headers["Content-Type"]:
data = res.text()
if "Invalid JWT token" in data["description"]:
return True
return False