# SPDX-FileCopyrightText: 2023-2025 Repository Service for TUF Contributors
#
# SPDX-License-Identifier: MIT
import enum
import os.path
import platform
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from tempfile import TemporaryDirectory
from typing import Any, Callable, Dict, List, Optional, Tuple
import beaupy # type: ignore
import click
# Magic import to unbreak `load_pem_private_key` - pyca/cryptography#10315
import cryptography.hazmat.backends.openssl.backend # noqa: F401
import prompt_toolkit
import prompt_toolkit.completion
import requests
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key,
load_pem_public_key,
)
from cryptography.x509 import load_pem_x509_certificate, oid
from email_validator import validate_email
from prompt_toolkit.formatted_text.base import AnyFormattedText
from PyKCS11 import CKR_USER_NOT_LOGGED_IN, PyKCS11Error # type: ignore
from rich.json import JSON
from rich.markdown import Markdown
from rich.prompt import Confirm, IntPrompt, InvalidResponse, Prompt
from rich.table import Table
from securesystemslib.exceptions import UnverifiedSignatureError
from securesystemslib.formats import encode_canonical
from securesystemslib.hash import digest
from securesystemslib.signer import (
KEY_FOR_TYPE_AND_SCHEME,
AWSSigner,
AzureSigner,
CryptoSigner,
GCPSigner,
HSMSigner,
Key,
Signature,
Signer,
SigstoreKey,
SigstoreSigner,
SSlibKey,
VaultSigner,
)
from tuf.api.exceptions import DownloadError, RepositoryError
from tuf.api.metadata import (
DelegatedRole,
Delegations,
Metadata,
Root,
RootVerificationResult,
Snapshot,
Targets,
Timestamp,
UnsignedMetadataError,
VerificationResult,
)
from tuf.ngclient.updater import Updater
# TODO: Should we use the global rstuf console exclusively? We do use it for
# `console.print`, but not with `Confirm/Prompt.ask`. The latter uses a default
# console from `rich`. Using a single console everywhere would makes custom
# configuration or, more importantly, patching in tests easier:
# https://rich.readthedocs.io/en/stable/console.html#console-api
# https://rich.readthedocs.io/en/stable/console.html#capturing-output
from repository_service_tuf.cli import console
from repository_service_tuf.helpers.api_client import (
URL,
Methods,
request_server,
)
ONLINE_ROLE_NAMES = {Timestamp.type, Snapshot.type, Targets.type}
KEY_URI_FIELD = "x-rstuf-online-key-uri"
KEY_NAME_FIELD = "x-rstuf-key-name"
# Use locale's appropriate date representation to display the expiry date.
EXPIRY_FORMAT = "%x"
DEFAULT_EXPIRY = {
"root": 365,
"timestamp": 1,
"snapshot": 1,
"targets": 365,
"bins": 1,
}
DEFAULT_BINS_NUMBER = 256
# some known locations where we might find libykcs11.
# These should all be _system_ locations (not user writable)
LIBYKCS11_LOCATIONS = {
"Linux": [
"/usr/lib/x86_64-linux-gnu/libykcs11.so",
"/usr/lib64/libykcs11.so",
"/usr/local/lib/libykcs11.so",
],
"Darwin": [
"/opt/homebrew/lib/libykcs11.dylib",
"/usr/local/lib/libykcs11.dylib",
],
}
# SecureSystemsLib doesn't support SigstoreKey by default.
KEY_FOR_TYPE_AND_SCHEME.update(
{
("sigstore-oidc", "Fulcio"): SigstoreKey,
}
)
HSM_INSTRUCTIONS = """
#### Hardware signing requirements
(Source: https://github.com/theupdateframework/tuf-on-ci/tree/main/docs -- Thanks)
A hardware signing key must contain a _PIV Digital Signature private key_ to be used.
CLI also needs access to a PKCS#11 module.
1. Generate a PIV signing key on your hardware key if you don't have one yet.
For YubiKey owners, follow the [YubiKey setup instructions](https://github.com/theupdateframework/tuf-on-ci/blob/main/docs/YUBIKEY-PIV-SETUP.md).
2. Install a PKCS#11 module. It has been tested with the Yubico ykcs11. Debian users can install it with
```shell
$ apt install ykcs11
```
macOS users can install with
```shell
$ brew install yubico-piv-tool
```
> **_NOTE:_** Windows WSL users may need to attach a USB hardware device using [usbipd-win](https://learn.microsoft.com/en-us/windows/wsl/connect-usb)
""" # noqa
[docs]
class SIGNERS(str, enum.Enum):
[docs]
@classmethod
def values(self) -> List[str]:
return [e.value for e in self]
[docs]
@classmethod
def names(self) -> List[str]:
return [e.name for e in self]
# Root signers supported by RSTUF
[docs]
class ROOT_SIGNERS(SIGNERS):
HSM = "HSM"
KEY_PEM = "Key PEM File"
SIGSTORE = "Sigstore"
[docs]
class ONLINE_SIGNERS(SIGNERS):
AWSKMS = "AWS KMS"
GCPKMS = "Google Cloud KMS"
AZKMS = "Azure KMS"
HV = "HashiCorp Vault"
KEY_PEM = "Key PEM File"
# Sigstore issuers supported by RSTUF
[docs]
class SIGSTORE_ISSUERS(SIGNERS):
GitHub = "https://github.com/login/oauth"
Google = "https://accounts.google.com"
Microsoft = "https://login.microsoft.com"
[docs]
class DELEGATIONS_TYPE(str, enum.Enum):
BINS = "Bins (online key only)"
CUSTOM_DELEGATIONS = "Custom Delegations (online/offline key)"
[docs]
@classmethod
def values(cls) -> List[str]:
return [e.value for e in cls]
@dataclass
class _Settings:
"""Internal data container to gather online role settings from prompt."""
timestamp_expiry: int
snapshot_expiry: int
targets_expiry: int
bins_expiry: Optional[int] = None
bins_number: Optional[int] = None
delegations: Optional[Delegations] = None
[docs]
@dataclass
class Role:
expiration: int
[docs]
@dataclass
class BinsRole(Role):
number_of_delegated_bins: int
[docs]
@dataclass
class Roles:
root: Role
timestamp: Role
snapshot: Role
targets: Role
bins: Optional[BinsRole] = None
delegations: Optional[Dict[str, Any]] = None
[docs]
@dataclass
class Settings:
roles: Roles
[docs]
@dataclass
class CeremonyPayload:
settings: "Settings"
metadata: "Metadatas"
timeout: int = 300
[docs]
@dataclass
class UpdatePayload:
metadata: "Metadatas"
[docs]
@dataclass
class SignPayload:
signature: dict[str, str]
role: str = "root"
##############################################################################
# Prompt and dialog helpers
class _PositiveIntPrompt(IntPrompt):
validate_error_message = (
"[prompt.invalid]Please enter a valid positive integer number"
)
def process_response(self, value: str) -> int:
return_value: int = super().process_response(value)
if return_value < 1:
raise InvalidResponse(self.validate_error_message)
return return_value
class _MoreThan1Prompt(IntPrompt):
validate_error_message = "[prompt.invalid]Please enter threshold above 1"
def process_response(self, value: str) -> int:
return_value: int = super().process_response(value)
if return_value < 2:
raise InvalidResponse(self.validate_error_message)
return return_value
def _get_secret(secret: str) -> str:
msg = (
f"\nEnter {secret} to sign (provide touch/bio authentication "
"if needed)"
)
# # special case for tests -- prompt() will lockup trying to hide STDIN:
# if not sys.stdin.isatty():
# return sys.stdin.readline().rstrip()
return click.prompt(msg, hide_input=True)
def _prompt_key(
message: AnyFormattedText,
file_filter: Callable[[str], bool] = lambda f: True,
) -> str:
"""Prompt for a private or public key using prompt_toolkit."""
completer = prompt_toolkit.completion.PathCompleter(
file_filter=file_filter
)
return prompt_toolkit.prompt(message, completer=completer)
def _prompt_private_key(name_value: str) -> str:
"""Prompt for a private key."""
message = prompt_toolkit.HTML(
f"\nPlease enter the <b>path</b> to encrypted private key "
f"<b>{name_value}</b>: "
)
return _prompt_key(message)
def _load_signer_from_file_prompt(public_key: SSlibKey) -> CryptoSigner:
"""Prompt for path to private key and password, return Signer."""
while True:
name_value = public_key.unrecognized_fields.get(
KEY_NAME_FIELD, public_key.keyid[:7]
)
path = _prompt_private_key(name_value)
with open(path, "rb") as f:
private_pem = f.read()
# Because of click.prompt we are required to use click.style()
name_str = click.style(name_value, fg="green")
password_str = click.style("password", fg="yellow")
password = click.prompt(
f"\nPlease enter {password_str} to encrypted private key '{name_str}'", # noqa
hide_input=True,
)
try:
private_key = load_pem_private_key(private_pem, password.encode())
break
except TypeError as e:
console.print(f"Cannot load key: {e}")
return CryptoSigner(private_key, public_key)
def _load_signer_from_hsm_prompt(public_key: SSlibKey):
console.print(Markdown(HSM_INSTRUCTIONS))
for loc in LIBYKCS11_LOCATIONS.get(platform.system(), []):
if os.path.exists(loc):
break
else:
raise click.ClickException("Failed to find libykcs11")
os.environ["PYKCS11LIB"] = loc
uri, _ = HSMSigner.import_()
signer = Signer.from_priv_key_uri(uri, public_key, _get_secret)
return signer
def _prompt_public_key() -> str:
"""Prompt for a public key."""
# Show only directories, or files ending with ".pub"
def file_filter(f): # pragma: no cover
return os.path.isdir(f) or os.path.isfile(f) and f.endswith(".pub")
message = prompt_toolkit.HTML("Enter file path to a <b>PUBLIC</b> key: ")
return _prompt_key(message, file_filter)
def _load_key_from_file_prompt() -> SSlibKey:
"""Prompt for path to public key, return Key."""
path = _prompt_public_key()
with open(path, "rb") as f:
public_pem = f.read()
crypto = load_pem_public_key(public_pem)
key = SSlibKey.from_crypto(crypto)
return key
def _load_key_from_hsm_prompt() -> SSlibKey:
"""Prompt for a public key."""
def file_filter(f): # pragma: no cover
return os.path.isdir(f) or os.path.isfile(f)
message = prompt_toolkit.HTML(
"Enter file path to a <b>CERTIFICATE</b> PEM key: "
)
path = _prompt_key(message, file_filter)
with open(path, "rb") as f:
cert_pem = f.read()
cert = load_pem_x509_certificate(cert_pem)
key = SSlibKey.from_crypto(cert.public_key())
common_name = cert.subject.get_attributes_for_oid(oid.NameOID.COMMON_NAME)
key.unrecognized_fields[KEY_NAME_FIELD] = (
common_name[0].value if bool(common_name) else None
)
return key
def _new_keyid(key: Key) -> str:
data: bytes = encode_canonical(key.to_dict()).encode() # type: ignore
hasher = digest("sha256")
hasher.update(data)
return hasher.hexdigest()
def _load_key_from_sigstore_prompt() -> SigstoreKey:
console.print(
"\n:warning: Sigstore is not supported by all TUF Clients.\n",
justify="left",
style="italic",
)
identity = Prompt.ask("Please enter Sigstore identity")
# Validate if the identity is email
validate_email(identity)
console.print(
"\n:warning: RSTUF only support Sigstore public issuers.\n",
justify="left",
style="italic",
)
issuer_name = _select(SIGSTORE_ISSUERS.names())
issuer = SIGSTORE_ISSUERS[issuer_name].value
key = SigstoreKey(
keyid="temp",
keytype="sigstore-oidc",
scheme="Fulcio",
keyval={"issuer": issuer, "identity": identity},
unrecognized_fields={KEY_NAME_FIELD: identity},
)
key.keyid = _new_keyid(key)
return key
def _load_key_prompt(
keys: Dict[str, Key],
signer_type: Optional[str] = None,
duplicate: Optional[bool] = True,
) -> Optional[Key]:
"""Prompt and return Key, or None on error or if key is already loaded."""
try:
if not signer_type:
console.print("\nSelect a key type:")
signer_type = _select(ROOT_SIGNERS.values())
key: SSlibKey | SigstoreKey
match signer_type:
case ROOT_SIGNERS.KEY_PEM:
key = _load_key_from_file_prompt()
case ROOT_SIGNERS.SIGSTORE:
key = _load_key_from_sigstore_prompt()
case ROOT_SIGNERS.HSM:
key = _load_key_from_hsm_prompt()
except (OSError, ValueError) as e:
console.print(f"Cannot load key: {e}")
return None
# Disallow re-adding a key even if it is for a different role.
# TODO: disallow only within the same role
if duplicate is False and key.keyid in keys:
console.print("\nKey already in use.", style="bold red")
return None
return key
def _load_online_key_prompt(
root: Root, signer_type: str
) -> Tuple[Optional[str], Optional[Key]]:
"""Prompt and return Key, or None on error or if key is already loaded."""
try:
match signer_type:
case ONLINE_SIGNERS.KEY_PEM:
key = _load_key_from_file_prompt()
uri = f"fn:{key.keyid}"
case ONLINE_SIGNERS.AWSKMS:
uri, key = AWSSigner.import_(Prompt.ask("AWS KMS KeyID"))
case ONLINE_SIGNERS.GCPKMS:
uri, key = GCPSigner.import_(Prompt.ask("GCP KeyID"))
case ONLINE_SIGNERS.HV:
uri, key = VaultSigner.import_(
Prompt.ask("HashiCorp Key Name")
)
case ONLINE_SIGNERS.AZKMS:
azure_vault_name = Prompt.ask("Azure Vault Name")
azure_key_name = Prompt.ask("Azure Key Name")
uri, key = AzureSigner.import_(
az_vault_name=azure_vault_name,
az_key_name=azure_key_name,
)
except (OSError, ValueError) as e:
console.print(f"Cannot load key: {e}")
return None, None
# Disallow re-adding a key even if it is for a different role.
if key.keyid in root.keys:
console.print("\nKey already in use.", style="bold red")
return None, None
return uri, key
def _delegated_target_role_name_prompt() -> str:
"""Prompt for delegated target role name until success."""
while True:
name = Prompt.ask("Please enter delegated target role name")
if not name:
console.print("Role name cannot be empty.")
continue
break
return name
def _key_name_prompt(
keys: Dict[str, Key],
name: Optional[str] = None,
duplicate: Optional[bool] = False,
) -> str:
"""Prompt for key name until success."""
while True:
name = Prompt.ask("Please enter key name", default=name)
if not name:
console.print("Key name cannot be empty.")
continue
if duplicate is False and name in [
k.unrecognized_fields.get(KEY_NAME_FIELD) for k in keys.values()
]:
console.print("\nKey name already in use.", style="bold red")
continue
break
return name
def _expiry_prompt(role: str) -> Tuple[int, datetime]:
"""Prompt for days until expiry for role, returns days and expiry date.
Use per-role defaults from ExpirationSettings.
"""
days = _PositiveIntPrompt.ask(
f"Please enter days until expiry for '{role}' role",
default=DEFAULT_EXPIRY.get(role, 1),
)
today = datetime.now(timezone.utc).replace(microsecond=0)
date = today + timedelta(days=days)
console.print(f"New expiry date is: {date:{EXPIRY_FORMAT}}")
return days, date
def _settings_prompt() -> _Settings:
"""Prompt for expiry days of online roles and number of delegated bins."""
timestamp_expiry, _ = _expiry_prompt("timestamp")
snapshot_expiry, _ = _expiry_prompt("snapshot")
targets_expiry, _ = _expiry_prompt("targets")
return _Settings(
timestamp_expiry,
snapshot_expiry,
targets_expiry,
)
def _threshold_prompt(role: str) -> int:
return _MoreThan1Prompt.ask(f"Please enter '{role}' threshold")
def _select(options: List[str]) -> str:
return beaupy.select(options=options, cursor=">", cursor_style="cyan")
def _select_key(keys: List[Key]) -> Key:
key_choices = {
key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid): key
for key in keys
}
sign_options = list(key_choices.keys())
sign_options = [f"[green]{option}[/]" for option in sign_options]
choice = _select(sign_options)
# Remove beautification to get the actual key.
choice = choice.removeprefix("[green]").removesuffix("[/]")
return key_choices[choice]
def _select_role(roles: Dict[str, Dict[str, Any]]) -> str:
roles_options = [x for x in roles if not x.startswith("trusted_")]
choice = _select(roles_options)
return choice
def _configure_root_keys_prompt(root: Root) -> None:
"""Prompt dialog to add or remove root key in passed root, until user exit.
- Print if and how many root keys are missing to meet the threshold
- Print current root keys
- Prompt for user choice to add or remove key, or to skip (exit)
- "continue" choice is only available, if threshold is met
- "remove" choice is only available, if keys exist
- "add" choice is only shown, if "remove" or "continue" is available,
otherwise, we branch right into "add" dialog
"""
while True:
keys = _print_root_keys(root)
threshold = root.roles[Root.type].threshold
missing = max(0, threshold - len(keys))
_print_missing_key_info(threshold, missing)
# build the action choices
action_options = ["add", "remove"]
if not missing:
action_options.insert(0, "continue")
# prompt for user choice
if not keys:
action = "add"
else:
action = _select(action_options)
# apply choice
match action:
case "continue":
break
case "add":
new_key = _load_key_prompt(root.keys)
if not new_key:
continue
# The default name of the key has either been explicitly set
# by the user inside the key object itself, or it is the keyid.
name = new_key.unrecognized_fields.get(
KEY_NAME_FIELD, new_key.keyid
)
name = _key_name_prompt(root.keys, name)
new_key.unrecognized_fields[KEY_NAME_FIELD] = name
root.add_key(new_key, Root.type)
console.print(f"Added key '{name}'")
case "remove":
console.print("\nSelect a key to remove:")
key = _select_key(keys)
name = key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid)
root.revoke_key(key.keyid, Root.type)
console.print(f"Removed key '{name}'")
def _configure_online_key_prompt(root: Root) -> None:
"""Prompt dialog to set or optionally update the online key."""
current_key = _get_online_key(root)
if current_key:
# TODO: Is the info even helpful?
name = current_key.unrecognized_fields.get(
KEY_NAME_FIELD, current_key.keyid
)
console.print(f"Current online key is: '{name}'")
if not Confirm.ask(
"Do you want to change the online key?", default=True
):
return
console.print("\nSelect Online Key type:")
while True:
online_key_signer = _select(ONLINE_SIGNERS.values())
uri, new_key = _load_online_key_prompt(root, online_key_signer)
if new_key:
break
name = _key_name_prompt(root.keys)
new_key.unrecognized_fields[KEY_NAME_FIELD] = name
new_key.unrecognized_fields[KEY_URI_FIELD] = uri
for role_name in ONLINE_ROLE_NAMES:
if current_key:
root.revoke_key(current_key.keyid, role_name)
root.add_key(new_key, role_name)
console.print(f"Configured file-based online key: '{name}'")
console.print(f"Expected private key file name is: '{new_key.keyid}'")
def _add_signature_prompt(metadata: Metadata, key: Key) -> Signature:
"""Prompt for signing key and add signature to metadata until success."""
while True:
name = key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid)
signer: Signer | SigstoreSigner | CryptoSigner
try:
if key.keytype == "sigstore-oidc":
signer = SigstoreSigner.from_priv_key_uri(
"sigstore:?ambient=false", key
)
# Using HSM or Key PEM file
else:
signer_type = _select(
[ROOT_SIGNERS.HSM.value, ROOT_SIGNERS.KEY_PEM.value]
)
if signer_type == ROOT_SIGNERS.KEY_PEM.value:
signer = _load_signer_from_file_prompt(key) # type: ignore
else:
signer = _load_signer_from_hsm_prompt(key) # type: ignore
signature = metadata.sign(signer, append=True)
signer.public_key.verify_signature(
signature, metadata.signed_bytes
)
break
except UnverifiedSignatureError as e:
console.print(
f"\nFailed to verify {name} signature "
f"(is this the correct key?)\n {e}"
)
except (ValueError, OSError, UnsignedMetadataError) as e:
# Very light error handling for specific PKCS11 errors
msg = str(e)
if isinstance(e.__context__, PyKCS11Error):
pkcs_err = e.__context__
if pkcs_err.value == CKR_USER_NOT_LOGGED_IN:
msg = (
"Required authentication (e.g. touch) did not happpen"
)
console.print(
f"\nCannot sign metadata with key '{name}': {e}\n {msg}"
)
console.print(f"Signed metadata with key '{name}'")
return signature
def _add_root_signatures_prompt(
root_md: Metadata[Root], prev_root: Optional[Root]
) -> None:
# TODO: Add docstring
while True:
root_result = root_md.signed.get_root_verification_result(
prev_root,
root_md.signed_bytes,
root_md.signatures,
)
if root_result.verified:
console.print("Metadata is fully signed.")
break
results = _filter_root_verification_results(root_result)
keys = _print_keys_for_signing(results)
key_choices = {
key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid): key
for key in keys
}
sign_options = list(key_choices.keys())
or_continue = ":"
if bool(root_result.signed):
or_continue = " or continue:"
sign_options.insert(0, "continue")
console.print(f"\nSelect a key for signing{or_continue}")
choice = _select(sign_options)
if choice == "continue":
break
_add_signature_prompt(root_md, key_choices[choice])
# Delegations
##############################################################################
def _path_prompt() -> str:
"""Prompt for path name until success."""
console.print(
"The 'path' delegates targets matching any path pattern.",
style="italic",
)
while True:
name = Prompt.ask("Please enter path")
if not name:
console.print("Path cannot be empty.")
continue
break
return name
def _configure_delegations_paths(
delegated_role: DelegatedRole,
) -> DelegatedRole:
if delegated_role.paths is None:
delegated_role.paths = []
while True:
console.print(f"\nCurrent paths for '{delegated_role.name}'")
for path in delegated_role.paths:
console.print(f"- '{path}'")
if bool(delegated_role.paths) is False:
delegated_role.paths.append(_path_prompt())
continue
# build the action choices
action_options = ["continue", "add new path", "remove path"]
console.print()
action = _select(action_options)
# apply choice
match action:
case "continue":
break
case "add new path":
delegated_role.paths.append(_path_prompt())
case "remove path":
console.print()
path = _select(delegated_role.paths)
delegated_role.paths.remove(path)
console.print(f"path '{path}' removed\n")
return delegated_role
def _configure_delegations_keys(
delegated_role: DelegatedRole, delegations: Delegations
) -> None:
while True:
for keyid, key in delegations.keys.items():
if keyid in delegated_role.keyids:
name = key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid)
console.print(f"- '{name}'")
missing = max(
0,
delegated_role.threshold
- len(
[
key
for key in delegations.keys
if key in delegated_role.keyids
]
),
)
_print_missing_key_info(delegated_role.threshold, missing)
# build the action choices
action_options = ["add", "remove"]
if not missing:
action_options.insert(0, "continue")
# prompt for user choice
if not delegations.keys:
action = "add"
else:
action = _select(action_options)
# apply choice
match action:
case "continue":
break
case "add":
new_key = _load_key_prompt(delegations.keys, duplicate=True)
if not new_key:
continue
name = _key_name_prompt(
delegations.keys,
new_key.unrecognized_fields.get(KEY_NAME_FIELD),
duplicate=True,
)
new_key.unrecognized_fields[KEY_NAME_FIELD] = name
delegations.keys[new_key.keyid] = new_key
delegated_role.keyids.append(new_key.keyid)
console.print(f"Added key '{name}'")
case "remove":
# TODO:
# 1. List the key (by key name) for the role
# 2. Remove the KeyID from delegated role
# 3. Remove from delegation roles keys IF not
# used by another role
raise NotImplementedError("TODO")
def _configure_delegations() -> Delegations:
delegations = Delegations(keys={}, roles={})
while True:
if delegations.roles is None or len(delegations.roles) == 0:
action = "add new delegation"
else:
_print_delegation(delegations)
action = _select(
["continue", "add new delegation", "remove delegation"]
)
match action:
case "continue":
break
case "add new delegation":
if delegations.roles is None:
delegations.roles = {}
name = _delegated_target_role_name_prompt()
if delegations.roles.get(name):
if not Confirm.ask(
f"\nDelegation '{name}' exists. Do you want overwrite"
f" '{name}'"
):
continue
expire_days, _ = _expiry_prompt(name)
# ##########################################################
# Load the Public Keys used to sign the metadata
delegated_role = DelegatedRole(
name=name,
threshold=1,
keyids=[],
terminating=True,
paths=[],
unrecognized_fields={"x-rstuf-expire-policy": expire_days},
)
_configure_delegations_paths(delegated_role)
console.print("Select signing:")
signing_method = _select(
["Online Key (use the existing)", "Add Keys"]
)
if signing_method == "Add Keys":
delegated_role.threshold = _threshold_prompt(
delegated_role.name
)
_configure_delegations_keys(delegated_role, delegations)
delegations.roles[delegated_role.name] = delegated_role
case "remove delegation":
if delegations.roles is None:
console.print("Delegations is empty")
continue
role_name = _select(list(delegations.roles.keys()))
removed_role = delegations.roles[role_name]
delegations.roles.pop(role_name)
in_use_keyids: List[str] = []
for role in delegations.roles.values():
in_use_keyids += role.keyids
for keyid in removed_role.keyids:
if keyid not in in_use_keyids:
delegations.keys.pop(keyid)
console.print(f"Delegation '{role_name}' removed.")
return delegations
def _configure_delegations_prompt(settings: _Settings) -> None:
while True:
console.print(
Markdown(
"### Delegations\n"
"RSTUF supports two types of delegations:\n"
"- **Bins**:\n"
"Generates hash bin delegations and uses an online key for\n"
"signing.\n"
"- **Custom Delegations**:\n"
"Allows the creation of delegated roles for specified paths,\n"
" utilizing both offline and online keys."
)
)
console.print()
delegations_type = _select(DELEGATIONS_TYPE.values())
if delegations_type is None:
continue
if delegations_type == DELEGATIONS_TYPE.BINS:
bins_expiry, _ = _expiry_prompt("bins")
bins_number = IntPrompt.ask(
"Please enter number of delegated hash bins",
default=DEFAULT_BINS_NUMBER,
choices=[str(2**i) for i in range(1, 15)],
show_default=True,
show_choices=True,
)
settings.bins_expiry = bins_expiry
settings.bins_number = bins_number
break
else:
settings.delegations = _configure_delegations()
break
##############################################################################
# Other helpers
def _get_root_keys(root: Root) -> Dict[str, Key]:
return {
keyid: root.get_key(keyid) for keyid in root.roles[Root.type].keyids
}
def _get_online_key(root: Root) -> Optional[Key]:
# TODO: assert all online roles have the same and only one keyid, or none
key = None
if root.roles[Timestamp.type].keyids:
key = root.get_key(root.roles[Timestamp.type].keyids[0])
return key
def _parse_pending_data(pending_roles_resp: Dict[str, Any]) -> Dict[str, Any]:
data = pending_roles_resp.get("data", {})
all_roles: Dict[str, Dict[str, Any]] = data.get("metadata", {})
pending_roles = {
k: v for k, v in all_roles.items() if not k.startswith("trusted_")
}
if len(pending_roles) == 0:
raise click.ClickException("No metadata available for signing")
if any(
role["signed"]["_type"] not in [Root.type, Targets.type]
for role in pending_roles.values()
):
raise click.ClickException(
"Supporting only root and targets pending role types"
)
return all_roles
def _get_pending_roles(settings: Any) -> Dict[str, Dict[str, Any]]:
"""Get dictionary of pending roles for signing."""
response = request_server(
settings.SERVER,
URL.METADATA_SIGN.value,
Methods.GET,
headers=settings.HEADERS,
)
if response.status_code != 200:
raise click.ClickException(
f"Failed to fetch metadata for signing. Error: {response.text}"
)
return _parse_pending_data(response.json())
def _print_root(root: Root):
"""Pretty print root metadata."""
key_table = Table("Role", "Name", "Signing Scheme", "Public Value")
for key in _get_root_keys(root).values():
if isinstance(key, SigstoreKey):
public_value = f"{key.keyval['identity']}@{key.keyval['issuer']}"
else:
public_value = key.keyval["public"] # SSlibKey-specific
name = key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid)
key_table.add_row(
"Root", f"[green]{name}[/]", key.scheme, public_value
)
root_key = _get_online_key(root)
if root_key is None:
console.print("No online key configured")
return
name = root_key.unrecognized_fields.get(KEY_NAME_FIELD, root_key.keyid)
key_table.add_row(
"Online",
f"[green]{name}[/]",
root_key.scheme,
root_key.keyval["public"],
)
root_table = Table("Infos", "Keys", title="Root Metadata")
root_table.add_row(
(
f"Expiration: {root.expires:%x}\n"
f"Threshold: {root.roles[Root.type].threshold}\n"
f"Version: {root.version}"
),
key_table,
)
console.print(root_table)
def _print_targets(targets: Metadata[Targets]):
"""Pretty print targets metadata."""
targets_table = Table("Version", "Artifacts")
artifact_table = Table("Path", "Info", show_lines=True)
for path, info in targets.signed.targets.items():
artifact_table.add_row(
path, JSON.from_data(info.to_dict()), style="bold"
)
targets_table.add_row(str(targets.signed.version), artifact_table)
console.print(targets_table)
def _print_delegation(delegations: Delegations):
"""Pretty print target delegation metadata."""
if delegations.roles is None:
console.print("No delegations")
return None
delegations_table = Table(
"Role Name",
"Infos",
"Keys",
title="Delegation Metadata",
show_lines=True,
)
for rolename, delegation in delegations.roles.items():
key_table: Optional[Table] = None
key_table = Table("ID", "Name", "Signing Scheme")
for key in delegations.keys.values():
if key.keyid in delegations.roles[rolename].keyids:
name = key.unrecognized_fields.get(KEY_NAME_FIELD)
key_table.add_row(key.keyid, name, key.scheme)
if len(key_table.rows) == 0:
key_table = None
if delegation.paths is None:
delegation.paths = []
delegations_table.add_row(
delegation.name,
(
f"Expiration: {delegation.unrecognized_fields['x-rstuf-expire-policy']}\n" # noqa
f"Threshold: {delegation.threshold}\n"
f"Paths: {', '.join(delegation.paths)}"
),
key_table or "Online Key",
)
console.print(delegations_table)
def _filter_root_verification_results(
root_result: RootVerificationResult,
) -> list[VerificationResult]:
"""Filter unverified results with distinct relevant fields."""
# 1. Filter unverified
results: list[VerificationResult] = [
r for r in (root_result.first, root_result.second) if not r.verified
]
# 2. Filter distinct by 'unsigned' and 'missing' properties
if len(results) == 2:
if (root_result.first.unsigned == root_result.second.unsigned) and (
root_result.first.missing == root_result.second.missing
):
results = results[:1]
return results
def _print_keys_for_signing(
results: list[VerificationResult],
) -> list[Key]:
"""Print public keys eligible for signing and return in printed order.
The indexed output can be used to choose a signing key (1-based).
"""
keys: list[Key] = []
for result in results:
m = result.missing
s = "s" if m > 1 else ""
console.print(f"Info: {m} signature{s} missing from any of:")
for key in result.unsigned.values():
name = key.unrecognized_fields.get(KEY_NAME_FIELD, key.keyid)
console.print(f"- [green]{name}[/]")
keys.append(key)
console.print()
return keys
def _print_root_keys(root: Root) -> list[Key]:
"""Print current root keys and return in printed order.
The indexed output can be used to choose a key (1-based).
"""
keys: list[Key] = []
keyids = root.roles[Root.type].keyids
if keyids:
console.print("\nCurrent signing keys:")
for keyid in keyids:
key = root.get_key(keyid)
name = key.unrecognized_fields.get(KEY_NAME_FIELD, keyid)
console.print(f"- '{name}'")
keys.append(key)
return keys
def _print_missing_key_info(threshold: int, missing: int) -> None:
if missing:
s = "s" if missing > 1 else ""
console.print(
f"\nInfo: {missing} key{s} missing for threshold {threshold}."
)
else:
console.print(
f"\nInfo: Threshold {threshold} is met, more keys can be added."
)
def _warn_no_save():
console.print(
":warning: ",
"metadata result not sent to rstuf worker, use `-s` to save locally ",
":warning:",
justify="center",
style="italic",
)
def _get_latest_md(metadata_url: str, role_name: str) -> Metadata:
try:
temp_dir = TemporaryDirectory()
initial_root_url = f"{metadata_url}/1.root.json"
response = requests.get(initial_root_url, timeout=300)
if response.status_code != 200:
raise click.ClickException(
f"Cannot fetch initial root {initial_root_url}"
)
with open(f"{temp_dir.name}/root.json", "w") as f:
f.write(response.text)
updater = Updater(
metadata_dir=temp_dir.name, metadata_base_url=metadata_url
)
updater.refresh()
md_bytes = updater._load_local_metadata(role_name)
metadata = Metadata.from_bytes(md_bytes)
return metadata
except (OSError, RepositoryError, DownloadError):
raise click.ClickException(f"Problem fetching latest {role_name}")