Source code for repository_service_tuf.helpers.api_client

# SPDX-FileCopyrightText: 2022-2023 VMware Inc
#
# SPDX-License-Identifier: MIT
from enum import Enum
from typing import Any, Dict, Optional

import requests
import rich_click as click
from dynaconf import LazySettings
from requests.exceptions import ConnectionError
from rich.console import Console

console = Console()


[docs] class URL(Enum): BOOTSTRAP = "api/v1/bootstrap/" CONFIG = "api/v1/config/" METADATA = "api/v1/metadata/" TASK = "api/v1/task/?task_id=" PUBLISH_ARTIFACTS = "api/v1/artifacts/publish/" METADATA_SIGN = "api/v1/metadata/sign/" METADATA_SIGN_DELETE = "api/v1/metadata/sign/delete" ARTIFACTS = "api/v1/artifacts/" ARTIFACTS_DELETE = "api/v1/artifacts/delete" DELEGATIONS = "api/v1/delegations/" DELEGATIONS_DELETE = "api/v1/delegations/delete"
[docs] class Methods(Enum): GET = "get" POST = "post" DELETE = "delete"
[docs] def request_server( server: str, url: str, method: Methods, payload: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> requests.Response: try: if method == Methods.GET: response = requests.get( f"{server}/{url}", json=payload, data=data, headers=headers, timeout=300, ) elif method == Methods.POST: response = requests.post( f"{server}/{url}", json=payload, data=data, headers=headers, timeout=300, ) elif method == Methods.DELETE: response = requests.delete( f"{server}/{url}", json=payload, data=data, headers=headers, timeout=300, ) else: raise ValueError("Internal Error. Invalid HTTP/S Method.") except ConnectionError: raise click.ClickException(f"Failed to connect to {server}") return response
[docs] def bootstrap_status(settings: LazySettings) -> Dict[str, Any]: response = request_server( settings.SERVER, URL.BOOTSTRAP.value, Methods.GET, headers=settings.HEADERS, ) if response.status_code == 404: raise click.ClickException( f"Server {settings.SERVER} does not allow bootstrap" ) if response.status_code != 200: raise click.ClickException( f"Error {response.status_code} {response.text}" ) bootstrap_json = response.json() if bootstrap_json is None: raise click.ClickException(f"Unexpected error {response.text}") return bootstrap_json
[docs] def get_task( task_id: str, settings: LazySettings, ) -> tuple[Dict[str, Any], requests.Response]: state_response = request_server( settings.SERVER, f"{URL.TASK.value}{task_id}", Methods.GET, headers=settings.HEADERS, ) if state_response.status_code != 200: raise click.ClickException( f"Unexpected response {state_response.text}" ) data = state_response.json().get("data") if data is None: raise click.ClickException(f"No data received {state_response.text}") return data, state_response
[docs] def task_status( task_id: str, settings: LazySettings, title: Optional[str], silent: Optional[bool] = False, ) -> Dict[str, Any]: received_states = [] while True: data, state_response = get_task(task_id, settings) if state := data.get("state"): if state not in received_states: if silent is False: console.print(f"{title} {state}") received_states.append(state) else: if silent is False: console.print(".", end="") if state == "SUCCESS": if result := data.get("result"): if result.get("status") is True: return data else: raise click.ClickException( "Task status is not successful: " f"{state_response.text}" ) else: raise click.ClickException( f"No result received in data {state_response.text}" ) elif state == "FAILURE": raise click.ClickException(f"Failed: {state_response.text}") elif state == "ERRORED": # If task.state is "ERRORED" it means there is an internal # RSTUF error and data contains error information. raise click.ClickException( f"Errored: {data['result']['error']}" ) else: raise click.ClickException( f"No state in data received {state_response.text}" )
[docs] def publish_artifacts(settings: LazySettings) -> str: publish_artifacts = request_server( settings.SERVER, URL.PUBLISH_ARTIFACTS.value, Methods.POST, headers=settings.HEADERS, ) if publish_artifacts.status_code != 202: raise click.ClickException( f"Failed to publish artifacts. {publish_artifacts.status_code} " f"{publish_artifacts.text}" ) task_id = publish_artifacts.json()["data"]["task_id"] return task_id
[docs] def send_payload( settings: LazySettings, url: str, payload: Dict[str, Any], expected_msg: str, command_name: str, expected_status_code: Optional[int] = 202, ) -> str: """ Send 'payload' to a given 'settings.SERVER'. Args: settings: the command context settings object url: one of the URLs to a given endpoint as defined in api_client.py payload: dictionary containing the payload to send expected_msg: expected message to receive as a response to the request command_name: name of the command sending the payload, used for logging expected_status_code: [Optional] expected status code. Default: 202 Returns: Task id of the job sending the payload. """ response = request_server( settings.SERVER, url, Methods.POST, payload, headers=settings.HEADERS, ) if response.status_code != expected_status_code: raise click.ClickException( f"Error {response.status_code} {response.text}" ) response_json = response.json() if ( response_json.get("message") is None or response_json.get("message") != expected_msg ): raise click.ClickException(response.text) if data := response_json.get("data"): task_id = data.get("task_id") if task_id is None: raise click.ClickException( f"Failed to get `task id` {response.text}" ) console.print(f"{command_name} status: ACCEPTED ({task_id})") return task_id else: raise click.ClickException( f"Failed to get task response data {response.text}" )