Skip to content

Commit

Permalink
Refine docstrings in tuf.services
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Mar 2, 2022
1 parent 84937d6 commit 7de24c1
Showing 1 changed file with 94 additions and 38 deletions.
132 changes: 94 additions & 38 deletions warehouse/tuf/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class InsecureKeyWarning(UserWarning):

@implementer(IKeyService)
class LocalKeyService:
"""
A service to read public and private TUF role keys as local files for development.
NOTE: Do not use in production!
"""

def __init__(self, key_path, request):
warnings.warn(
"LocalKeyService is intended only for use in development, you "
Expand All @@ -60,12 +66,8 @@ def create_service(cls, context, request):

def get(self, rolename, key_type):
"""
Gets the Key based on role name and key type
Args:
rolename: role name
key_type: 'private' or 'public'
Returns Key objects for passed TUF role name from configured TUF key path. Key
type is one of 'public' or 'private'.
"""
if key_type == "private":
privkey_path = os.path.join(self._key_path, f"{rolename}*")
Expand All @@ -89,6 +91,9 @@ def get(self, rolename, key_type):

@implementer(IStorageService)
class LocalStorageService:
"""
A storage service with methods to read and write TUF role metadata as local files.
"""
def __init__(self, repo_path):
self._repo_path = repo_path

Expand All @@ -100,6 +105,11 @@ def create_service(cls, context, request):

@contextmanager
def get(self, role, version=None):
"""
Yields TUF role metadata file object for the passed role name, from the
configured TUF repo path, optionally at the passed version (latest if None).
"""

if role == Role.TIMESTAMP.value:
filename = os.path.join(self._repo_path, f"{role}.json")
else:
Expand Down Expand Up @@ -128,6 +138,9 @@ def get(self, role, version=None):
file_object.close()

def put(self, file_object, filename):
"""
Writes passed file object to configured TUF repo path using the passed filename.
"""
file_path = os.path.join(self._repo_path, filename)
if not file_object.closed:
file_object.seek(0)
Expand All @@ -146,19 +159,29 @@ def store(self, file_object, filename):

@implementer(IRepositoryService)
class LocalRepositoryService:
"""
A repository service to create and maintain TUF role metadata.
"""
def __init__(self, storage_service, key_service, request):
self._storage_backend = storage_service
self._key_storage_backend = key_service
self._request = request

@classmethod
def create_service(cls, context, request):
"""
Creates a new repository service object configuring services to read and write
TUF role metadata (``IStorageService``) and to read public and private keys
(``IKeyService``).
"""
storage_service = request.find_service(IStorageService)
key_service = request.find_service(IKeyService)
return cls(storage_service, key_service, request)

def _get_hash_bins(self):
"""Get the hash bins object"""
"""
Returns a 'hash bin delegation' management object.
"""
if self._request.registry.settings["warehouse.env"] == Environment.development:
number_of_bins = 32
else:
Expand All @@ -168,11 +191,10 @@ def _get_hash_bins(self):

def _make_fileinfo(self, file, custom=None):
"""
Create a TUF-compliant "fileinfo" dictionary suitable for addition to a
delegated bin.
Returns a TUF-compliant 'fileinfo' dictionary suitable for targets metadata.
The optional "custom" kwarg can be used to supply additional custom
metadata (e.g., metadata for indicating backsigning).
The optional 'custom' kwarg can be used for additional metadata about target
files (e.g., to indicate backsigning).
"""
hashes = {"blake2b-256": file.blake2_256_digest}
fileinfo = dict()
Expand All @@ -184,9 +206,11 @@ def _make_fileinfo(self, file, custom=None):
return fileinfo

def _set_expiration_for_role(self, role_name):
# If we're initializing TUF for development purposes, give
# every role a long expiration time so that developers don't have to
# continually re-initialize it.
"""
Returns a metadata expiration date (now + role-specific interval).
"""
# In a development environment metadata expires less frequently so that
# developers don't have to continually re-initialize it.
if self._request.registry.settings["warehouse.env"] == Environment.development:
return datetime.datetime.now().replace(microsecond=0) + datetime.timedelta(
seconds=self._request.registry.settings[
Expand All @@ -199,6 +223,16 @@ def _set_expiration_for_role(self, role_name):
)

def init_repository(self):
"""
Creates TUF top-level role metadata (root, targets, snapshot, timestamp).
The metadata is populated with configured expiration times, signature thresholds
and verification keys, and signed and persisted using the configured key and
storage services.
FIXME: In production 'root' and 'targets' roles require offline singing keys,
which may not be available at the time of initializing this metadata.
"""
metadata_repository = MetadataRepository(
self._storage_backend, self._key_storage_backend
)
Expand All @@ -217,12 +251,23 @@ def init_repository(self):
metadata_repository.initialize(top_roles_payload, True)

def init_targets_delegation(self):
"""
Creates TUF metadata for hash bin delegated targets roles (bins, bin-n).
Metadata is created for one 'bins' role and a configured number of 'bin-n'
roles. It is populated with configured expiration times, signature thresholds
and verification keys, and signed and persisted using the configured key and
storage services.
FIXME: In production the 'bins' role requires an offline singing key, which may
not be available at the time of initializing this metadata.
"""
hash_bins = self._get_hash_bins()
metadata_repository = MetadataRepository(
self._storage_backend, self._key_storage_backend
)

# Delegate first targets -> BINS
# Top-level 'targets' role delegates trust for all target files to 'bins' role.
delegate_roles_payload = dict()
delegate_roles_payload["targets"] = list()
delegate_roles_payload["targets"].append(
Expand All @@ -243,7 +288,8 @@ def init_targets_delegation(self):
self._set_expiration_for_role(Role.SNAPSHOT.value),
)

# Delegates all BINS -> BIN_N (Hash bin prefixes)
# The 'bins' role delegates trust for target files to 'bin-n' roles based on
# target file path hash prefixes.
delegate_roles_payload = dict()
delegate_roles_payload[Role.BINS.value] = list()
for bin_n_name, bin_n_hash_prefixes in hash_bins.generate():
Expand All @@ -266,29 +312,28 @@ def init_targets_delegation(self):
)

def bump_snapshot(self):
# Bumping the Snapshot role involves the following steps:
# 1. Initiate Metadata Repository.
# 2. Load the Snapshot Role.
# 3. Bump Snapshot role (and write to the Storage).
# 4. Bump Timestamp role using the new Snapshot Metadata.
"""
Bumps version and expiration date of TUF 'snapshot' role metadata.
# 1. Metadata Repository.
The version number is incremented by one, the expiration date renewed using a
configured expiration interval, and the metadata is signed and persisted using
the configured key and storage services.
Bumping 'snapshot' transitively bumps the 'timestamp' role.
"""
metadata_repository = MetadataRepository(
self._storage_backend, self._key_storage_backend
)

# 2. Snapshot role metadata.
snapshot_metadata = metadata_repository.load_role(Role.SNAPSHOT.value)

# 3. Bump Snapshot role metadata.
snapshot_metadata = metadata_repository.snapshot_bump_version(
snapshot_expires=self._set_expiration_for_role(
self._request, Role.SNAPSHOT.value
),
store=True,
)

# 4. Bump Snapshot role metadata using Timestamp metadata.
metadata_repository.timestamp_bump_version(
snapshot_version=snapshot_metadata.signed.version,
timestamp_expires=self._set_expiration_for_role(
Expand All @@ -298,18 +343,17 @@ def bump_snapshot(self):
)

def bump_bin_n_roles(self):
# Bumping all of the delegated bin roles in the TUF repository involves
# the following steps:
# 1. Metadata Repository.
# 2. Load Snapshot role.
# 3. Load BINS role.
# 4. For each BIN-N role (hash bin) in BINS, fetch the
# role, bump, write back to the repo and update Snapshot role MetaFile.
# 5. Bump Snapshot and write back to repository.
# 6. Bump Timestamp role (using updated Snapshot) and write back to
# repository.

# 1. Metadata Repository
"""
Bumps version and expiration date of 'bin-n' role metadata (multiple).
The version numbers are incremented by one, the expiration dates are renewed
using a configured expiration interval, and the metadata is signed and persisted
using the configured key and storage services.
Bumping 'bin-n' transitively bumps 'snapshot' and 'timestamp' roles.
"""

# 1. Grab metadata Repository
metadata_repository = MetadataRepository(
self._storage_backend, self._key_storage_backend
)
Expand All @@ -335,7 +379,7 @@ def bump_bin_n_roles(self):
role, role_metadata.signed.version, snapshot_metadata
)

# 5. Bump Snapshot with updated metadata
# 5. Bump Snapshot with updated targets (bin-n) metadata
snapshot_metadata = metadata_repository.snapshot_bump_version(
snapshot_expires=self._set_expiration_for_role(
self._request, Role.SNAPSHOT.value
Expand All @@ -354,7 +398,16 @@ def bump_bin_n_roles(self):
)

def add_hashed_targets(self, targets):
"""
Update 'bin-n' roles metadata, assigning each passed target to the correct bin.
Assignment is based on the hash prefix of the target file path. All metadata is
signed and persisted using the configured key and storage services.
Updating 'bin-n' transitively bumps 'snapshot' and 'timestamp'.
"""
hash_bins = self._get_hash_bins()

targets_payload = dict()
for target in targets:
fileinfo = target.get("info")
Expand All @@ -370,6 +423,9 @@ def add_hashed_targets(self, targets):
self._storage_backend, self._key_storage_backend
)

# TODO: Should we renew expiration date of 'timestamp' *and* 'snapshot' here?
# PEP 458 'Producing Consistent Snapshots' only mentions 'timestamp'.
# https://www.python.org/dev/peps/pep-0458/#producing-consistent-snapshots
metadata_repository.add_targets(
targets_payload,
self._set_expiration_for_role(Role.TIMESTAMP.value),
Expand Down

0 comments on commit 7de24c1

Please sign in to comment.