# SPDX-FileCopyrightText: 2023 VMware Inc
#
# SPDX-License-Identifier: MIT
import base64
import io
import json
import os
from urllib.parse import urlparse
import requests
from click import Context
from dynaconf.loaders.yaml_loader import write # type: ignore
from tuf.api.exceptions import UnsignedMetadataError
from tuf.api.metadata import Metadata, Root
from repository_service_tuf.cli import click, console
from repository_service_tuf.cli.artifact import artifact
[docs]
def write_config(settings_path, settings_data, merge=False):
"""
Writes config data to the rstuf config file. The expected data
is a dict of type:\n
{\n
"CURRENT_REPOSITORY": "<repository_name>",\n
"REPOSITORIES": {\n
"<repository_name>": {\n
"artifact_base_url": "<url>",\n
"hash_prefix": "<True/False>",\n
"metadata_url": "<url>",\n
"trusted_root": "<base64_path_string",\n
},\n
},\n
"SERVER": "<server_url>",\n
}
"""
write(settings_path, settings_data, merge) # pragma: no cover
def _check_root(root: str) -> None: # pragma: no cover
try:
root_md: Metadata[Root] = Metadata.from_dict(json.loads(root))
root_md.verify_delegate(Root.type, root_md)
except (UnsignedMetadataError, ValueError) as e:
raise click.ClickException(f"Error reading root file {root}: {e}")
def _load_root_from_file(root: str) -> bytes: # pragma: no cover
try:
with open(root, "rb") as file:
content = file.read()
encoded_root = base64.b64encode(content)
except IOError as e:
raise click.ClickException(f"Error reading file: {e}")
_check_root(content.decode("utf-8"))
return encoded_root
def _load_root_from_url(root: str) -> bytes: # pragma: no cover
# Check if root is a URL
parsed_url = urlparse(root)
# Validate URL scheme
if not parsed_url.scheme:
raise click.ClickException(
"Please use http:// or https:// for artifact URL"
)
# Fetch the file from the local server
response = requests.get(root, timeout=10)
# Raise an error if the request failed
response.raise_for_status()
# Open a file using StringIO
with io.StringIO(response.text) as file:
content = file.read()
_check_root(content)
encoded_root = base64.b64encode(bytes(content, "utf-8"))
return encoded_root
@artifact.group()
@click.pass_context
def repository(context) -> None:
"""
Repository management.
"""
@repository.command()
@click.pass_context
@click.argument("repository", required=False)
def show(
context: Context,
repository: str,
) -> None:
"""
List configured repositories.
"""
rstuf_config = context.obj.get("settings")
current_repository = rstuf_config.get("CURRENT_REPOSITORY")
if repository:
if rstuf_config.get("REPOSITORIES") and rstuf_config[
"REPOSITORIES"
].get(repository):
if current_repository and current_repository == repository:
console.print("CURRENT REPOSITORY:")
try:
console.print_json(
data=dict(
{
repository: rstuf_config["REPOSITORIES"].get(
repository
)
}
)
)
except TypeError:
raise click.ClickException(
f"Repository {repository} has incorrect configuration. "
"Please verify you're using proper types:\n"
"artifact_base_url: <string>\n"
"hash_prefix: <bool>\n"
"metadata_url: <string>\n"
"trusted_root: <base64 string>\n"
)
else:
raise click.ClickException(
f"Repository {repository} is missing in your configuration"
)
else:
if rstuf_config.get("REPOSITORIES"):
is_default = ""
for repo in rstuf_config.get("REPOSITORIES").keys():
if current_repository and current_repository == repo:
style = "b green"
is_default = " (default)"
else:
style = "b white"
is_default = ""
console.print(f"{repo}{is_default}", style=style)
else:
raise click.ClickException("There are no configured repositories")
@repository.command()
@click.pass_context
@click.argument("repository")
def set(context: Context, repository: str) -> None:
"""
Switch current repository.
"""
rstuf_config = context.obj.get("settings").as_dict()
rstuf_config["CURRENT_REPOSITORY"] = repository
if context.obj.get("config"):
write_config(context.obj.get("config"), rstuf_config)
console.print(f"Current repository changed to {repository}")
@repository.command()
@click.option(
"-n",
"--name",
help="The repository name.",
type=str,
required=True,
default=None,
)
@click.option(
"-r",
"--root",
help="The metadata URL to the initial trusted root or a local file.",
type=str,
required=True,
default=None,
)
@click.option(
"-m",
"--metadata-url",
help="TUF Metadata repository URL.",
type=str,
required=True,
default=None,
)
@click.option(
"-a",
"--artifacts-url",
help="The artifacts base URL to fetch from.",
type=str,
required=True,
default=None,
)
@click.option(
"-p",
"--hash-prefix",
help="Whether to add a hash prefix to artifact names.",
is_flag=True,
)
@click.pass_context
def add(
context: Context,
name: str,
root: str,
metadata_url: str,
artifacts_url: str,
hash_prefix: bool,
) -> None:
"""
Add a new repository.
"""
rstuf_config = context.obj.get("settings").as_dict()
success_msg: str = f"Successfully added {name} repository to config"
if rstuf_config.get("REPOSITORIES") and rstuf_config["REPOSITORIES"].get(
name
):
console.print(f"Repository {name} already configured")
success_msg = f"Successfully updated repository {name}"
# Check if root is a local file path
if os.path.isfile(root):
encoded_root = _load_root_from_file(root)
else:
encoded_root = _load_root_from_url(root)
repo_data = {
"artifact_base_url": artifacts_url,
"metadata_url": metadata_url,
"trusted_root": encoded_root.decode("UTF-8"),
"hash_prefix": hash_prefix,
}
if rstuf_config.get("REPOSITORIES"):
rstuf_config["REPOSITORIES"][name] = repo_data
else:
rstuf_config["REPOSITORIES"] = {name: repo_data}
if context.obj.get("config"):
write_config(context.obj.get("config"), rstuf_config)
console.print(success_msg)
@repository.command()
@click.option(
"-r",
"--root",
help="The metadata URL to the initial trusted root or a local file.",
type=str,
default=None,
)
@click.option(
"-m",
"--metadata-url",
help="TUF Metadata repository URL.",
type=str,
default=None,
)
@click.option(
"-a",
"--artifacts-url",
help="The artifacts base URL to fetch from.",
type=str,
default=None,
)
@click.option(
"-p",
"--hash-prefix",
help="Whether to add a hash prefix to artifact names.",
is_flag=True,
)
@click.pass_context
@click.argument("repository")
def update(
context: Context,
root: str,
metadata_url: str,
artifacts_url: str,
hash_prefix: bool,
repository: str,
) -> None:
"""
Update repository.
"""
rstuf_config = context.obj.get("settings").as_dict()
if not rstuf_config.get("REPOSITORIES"):
raise click.ClickException(
"There are no configured repositories to update"
)
if not rstuf_config["REPOSITORIES"].get(repository):
raise click.ClickException(
f"Repository {repository} not available in config. "
"You can create it instead"
)
if root:
rstuf_config["REPOSITORIES"][repository]["trusted_root"] = (
base64.b64encode(bytes(root, "utf-8"))
)
if metadata_url:
rstuf_config["REPOSITORIES"][repository]["metadata_url"] = metadata_url
if artifacts_url:
rstuf_config["REPOSITORIES"][repository][
"artifact_base_url"
] = artifacts_url
rstuf_config["REPOSITORIES"][repository]["hash_prefix"] = hash_prefix
if context.obj.get("config"):
write_config(context.obj.get("config"), rstuf_config)
console.print(f"Successfully updated repository {repository}")
@repository.command()
@click.pass_context
@click.argument("repository")
def delete(context: Context, repository: str) -> None:
"""
Delete a repository.
"""
rstuf_config = context.obj.get("settings").as_dict()
if not rstuf_config.get("REPOSITORIES"):
raise click.ClickException(
"There are no configured repositories. Nothing to delete"
)
if not rstuf_config["REPOSITORIES"].get(repository):
raise click.ClickException(
f"Repository {repository} not available. Nothing to delete"
)
repo = rstuf_config["REPOSITORIES"].pop(repository)
if context.obj.get("config"):
write_config(context.obj.get("config"), rstuf_config)
repo["trusted_root"] = "From file"
console.print(f"Succesfully deleted repository {repo}")