Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sign and verify #2

Merged
merged 6 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion securesystemslib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ class InvalidConfigurationError(Error):
"""If a configuration object does not match the expected format."""
pass


class StorageError(Error):
"""Indicate an error occured during interaction with an abstracted storage
"""Indicate an error occurred during interaction with an abstracted storage
backend."""
pass


class SignatureVerificationError(Error):
"""Indicate an error occurred during signature verification of DSSE."""
pass
44 changes: 43 additions & 1 deletion securesystemslib/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ class Key:
"""Key interface created to support multiple verify implementations."""

__metaclass__ = abc.ABCMeta
keyid: str

@abc.abstractmethod
def match_keyid(self, keyid: str) -> bool:
"""Matches a keyid with the keyid present in the Key instance.

Arguments:
keyid: Key identifier

Returns:
Boolean. True if keyid matches for the Key, False otherwise.
"""

raise NotImplementedError

@abc.abstractmethod
def verify(self, signature: Signature, payload: bytes) -> bool:
Expand Down Expand Up @@ -54,7 +68,7 @@ class SSlibKey(Key):

keytype: str
scheme: str
keyval: Dict[str, str]
keyval: Dict[str, Dict]
keyid: str
unrecognized_fields: Dict[str, Any] = field(default_factory=dict)

Expand Down Expand Up @@ -122,6 +136,18 @@ def to_securesystemslib_key(self) -> Dict[str, Any]:
"keyval": self.keyval,
}

def match_keyid(self, keyid: str) -> bool:
"""Matches a keyid with the keyid present in the Key instance.

Arguments:
keyid: Key identifier

Returns:
Boolean. True if keyid matches for the Key, False otherwise.
"""

return self.keyid == keyid

def verify(self, signature: Signature, payload: bytes) -> bool:
"""Verifies a given payload by the key assigned to the SSlibKey instance.

Expand Down Expand Up @@ -227,6 +253,22 @@ def from_keyring(cls, keyid, homedir=None):
pubkey_dict = gpg.export_pubkey(keyid, homedir)
return cls.from_dict(pubkey_dict)

def match_keyid(self, keyid: str) -> bool:
"""Matches a keyid with the keyids present in the Key instance.

Arguments:
keyid: Key identifier

Returns:
Boolean. True if keyid matches for the Key, False otherwise.
"""

if self.keyid == keyid:
return True
if self.subkeys and keyid in self.subkeys:
return True
return False

def verify(self, signature: GPGSignature, payload: bytes) -> bool:
"""Verifies a given payload by the key assigned to the GPGKey instance.

Expand Down
107 changes: 87 additions & 20 deletions securesystemslib/metadata.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,33 @@
"""Dead Simple Signing Envelope
"""

from typing import Any, List
import logging
from typing import Any, Dict, List

from securesystemslib import exceptions, formats
from securesystemslib.signer import GPGSignature, Signature
from securesystemslib.key import Key
from securesystemslib.signer import GPGSignature, Signature, Signer
from securesystemslib.util import b64dec, b64enc

logger = logging.getLogger(__name__)


class Envelope:
"""
DSSE Envelope to provide interface for signing arbitrary data.
"""DSSE Envelope to provide interface for signing arbitrary data.

Attributes:
payload: Arbitrary byte sequence of serialized body
payload_type: string that identifies how to interpret payload
signatures: List of Signature and GPG Signature

Methods:
from_dict(cls, data):
Creates a Signature object from its JSON/dict representation.

to_dict(self):
Returns the JSON-serializable dictionary representation of self.
payload: Arbitrary byte sequence of serialized body.
payload_type: string that identifies how to interpret payload.
signatures: list of Signature and GPGSignature.

"""

payload: bytes
payload_type: str
signatures: List[Signature]

def __init__(self, payload, payload_type, signatures):
def __init__(
self,
payload: bytes,
payload_type: str,
signatures: List[Signature]
):
self.payload = payload
self.payload_type = payload_type
self.signatures = signatures
Expand Down Expand Up @@ -84,7 +81,6 @@ def to_dict(self) -> dict:
"signatures": [signature.to_dict() for signature in self.signatures],
}

@property
def pae(self) -> bytes:
"""Pre-Auth-Encoding byte sequence of self."""

Expand All @@ -94,3 +90,74 @@ def pae(self) -> bytes:
len(self.payload),
self.payload,
)

def sign(self, signer: Signer) -> Signature:
PradyumnaKrishna marked this conversation as resolved.
Show resolved Hide resolved
PradyumnaKrishna marked this conversation as resolved.
Show resolved Hide resolved
"""Sign the payload and create the signature.

Arguments:
signer: A "Signer" class instance.

Returns:
A "Signature" instance.
"""

signature = signer.sign(self.pae())
self.signatures.append(signature)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before appending maybe you will want to check if this signature doesn't exist?
Does a set makes sense for Envelope.signatures?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about it, according to current signature wrappers:
in-toto: signatures is a list and appends the signatures.
python-tuf: signatures is a dict and appends with keyid.

Also, in DSSE keyid is optional. But what I can do is:

Suggested change
self.signatures.append(signature)
if signature.keyid:
for s in self.signatures:
if s.keyid == signature.keyid:
self.signatures.remove(s)
break
self.signatures.append(signature)

Copy link

@MVrachev MVrachev Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in-toto: signatures is a list and appends the signatures.
Doesn't in-toto make a check if a signature already exists?

In python-tuf to add a new signature you should use sign: https://github.com/theupdateframework/python-tuf/blob/8a03abfdeb5a6b6bc892859f65425be0834893a8/tuf/api/metadata.py#L342.
In sign there is an append option which if used will add an additional signature.
If append is True and a key is already used to sign a file, then the signature will exist in the dictionary will be replaced by a new Signature instance with the same content: https://github.com/theupdateframework/python-tuf/blob/8a03abfdeb5a6b6bc892859f65425be0834893a8/tuf/api/metadata.py#L389.
This removes the possibility of two existing Signature instances with the same content.

The question is do we want to allow duplicating signatures?
I think duplicating signatures could be a problem because they artificially increase the numbers of signatures used and thus threshold loses its importance.
I don't think an attacker can utilize that as for that he/she will need the private key used for signing (and if he has that he can break the system on multiple levels), so maybe I shouldn't worry.

@lukpueh any thoughts?

PS: If you want to remove an already existing signature you can directly use == and not rely on keyid existing in a signature as a __eq__ implementation exist for the Signature class:

def __eq__(self, other: Any) -> bool:

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't in-toto make a check if a signature already exists?

I don't think in-toto's metadata wrapper do that.

@adityasaky whats your opinion about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in-toto does allow multiple signatures from the same key on a particular document but that's because multiple signatures only really apply to the layout file. For a threshold k out of n keys for a particular step, you're not going to have one link with multiple signatures, but rather k different link files. We verify here to see we have k distinct link metadata files: https://github.com/in-toto/in-toto/blob/f6a4f6239ef28fc482adf147af79f030086f63f0/in_toto/verifylib.py#L506-L507. As for the layout, verification is dependent on each submitted public key.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that in order to escape possible problems regarding threshold computation it's better if we don't allow duplicating signatures. They just don't make sense to exist.
@adityasaky maybe before merging, we can wait for @lukpueh input here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, as long as we don't block @PradyumnaKrishna. I maintain that we shouldn't be getting too involved with duplicate signatures in the DSSE implementation. Perhaps this is even a hint that we shouldn't get too involved in the threshold aspects in the envelope either as each consumer may have different takes on how to handle thresholds. Looking for duplicate signatures gets messy here, IMO, for a number of reasons:

  1. DSSE doesn't specify how keyids are supposed to be calculated so a key could theoretically be presented with multiple keyids, or two keys could have the same keyid if someone chooses to assign "human readable" identifiers ("key-alice"). So far, we're just trusting the keyid value presented in the signature object to make the call.
  2. Since keyids are optional, we can't even use the current "trust the keyid in the signature object" if we have two signatures from the same key, one with keyid and one without.

I think, then, trying to calculate thresholds and handling duplicate signatures gets very opinionated, and it's best to leave it to the consumers of the library, especially since each one may have their own take on threshold values. If we do want to, we'd have to map every key to every signature (which we more or less do when there are no keyids), calculate a consistent keyid value for every key, and then use that to track duplicates, all of which I think may be overkill. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind getting input from @lukpueh. I am working parallel on payload parser on my local.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityasaky makes some good points. Let's not check for duplicates here, but leave it to the consumer/verifier.

Regarding...

I don't think an attacker can utilize that as for that he/she will need the private key used for signing (and if he has that he can break the system on multiple levels), so maybe I shouldn't worry.

I might be misunderstanding your thought, @MVrachev, but the premise of signature threshold verification is that the attacker can indeed control some private keys, but as long that number of keys is below the threshold the attacker can't compromise the data. So verifying the threshold does serve a purpose, but it has to happen at verification time (also see my comment below).


return signature

def verify(self, keys: List[Key], threshold: int) -> Dict[str, Key]:
"""Verify the payload with the provided Keys.

Arguments:
keys: A list of public keys to verify the signatures.
threshold: Number of signatures needed to pass the verification.

Raises:
ValueError: If "threshold" is not valid.
SignatureVerificationError: If the enclosed signatures do not pass
the verification.

Note:
Mandating keyid in signatures and matching them with keyid of Key
in order to consider them for verification, is not DSSE spec
compliant (Issue #416).

Returns:
accepted_keys: A dict of unique public keys.
"""

accepted_keys = {}
pae = self.pae()

# checks for threshold value.
if threshold <= 0:
raise ValueError("Threshold must be greater than 0")

if len(keys) < threshold:
raise ValueError("Number of keys can't be less than threshold")

for signature in self.signatures:
for key in keys:
# If Signature keyid doesn't match with Key, skip.
if not key.match_keyid(signature.keyid):
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a TODO comment either here or in the docstring above that what we do here, i.e. mandating keyids in signatures in order to consider them for verification, is not dsse spec compliant?

see secure-systems-lab#416 (comment)


# If a key verifies the signature, we exit and use the result.
# TODO: Exception handling in `Key.verify`.
# See https://github.com/in-toto/securesystemslib/issues/8.
if key.verify(signature, pae):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI and because you asked about how to deal with exceptions that might be raised in Key.verify:

theupdateframework/python-tuf@414dfc8 discusses returning boolean vs. raising exception in Key.verify and opts for the latter.

I suggest we change our Key implementations to do the same thing, i.e. return None if signature verification passes and raise an exceptions (or rather different exceptions with a common base) in all other cases. Then the caller does not have to case handle True/False/Error but only error type.

I also suggest to do it in a separate PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we change our Key implementations to do the same thing, i.e. return None if signature verification passes and raise an exceptions (or rather different exceptions with a common base) in all other cases.

@PradyumnaKrishna, would you mind creating an issue for this?

accepted_keys[key.keyid] = key
break

PradyumnaKrishna marked this conversation as resolved.
Show resolved Hide resolved
# Break, if amount of recognized_signer are more than threshold.
if len(accepted_keys) >= threshold:
break

if threshold > len(accepted_keys):
raise exceptions.SignatureVerificationError(
"Accepted signatures do not match threshold,"
f" Found: {len(accepted_keys)}, Expected {threshold}"
)

return accepted_keys
71 changes: 71 additions & 0 deletions tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import tempfile
import unittest

from typing import List

if sys.version_info >= (3, 3):
from unittest.mock import patch # pylint: disable=no-name-in-module,import-error
else:
Expand Down Expand Up @@ -55,8 +57,12 @@
from securesystemslib.gpg.exceptions import (PacketParsingError,
PacketVersionNotSupportedError, SignatureAlgorithmNotSupportedError,
KeyNotFoundError, CommandError, KeyExpirationError)
from securesystemslib.exceptions import SignatureVerificationError
from securesystemslib.formats import (GPG_PUBKEY_SCHEMA,
ANY_PUBKEY_DICT_SCHEMA)
from securesystemslib.key import GPGKey, Key
from securesystemslib.metadata import Envelope
from securesystemslib.signer import GPGSignature, GPGSigner


class GPGTestUtils:
Expand Down Expand Up @@ -649,6 +655,31 @@ def test_verify_signature_with_expired_key(self):
"\nexpected: {}"
"\ngot: {}".format(expected, ctx.exception))

def test_dsse_envelope(self):
"""Test signing and verifying DSSE signatures."""

# Create the DSSE Envelope.
envelope = Envelope(
payload=b"hello world",
payload_type="http://example.com/HelloWorld",
signatures=[],
)

# Create a GPGSigner and create a DSSE signature.
gpg_signer = GPGSigner(homedir=self.gnupg_home)
gpg_signature = envelope.sign(gpg_signer)
self.assertIsInstance(gpg_signature, GPGSignature)

# Create a GPGKey and verify the DSSE signature.
gpgkey = GPGKey.from_keyring(keyid=self.default_keyid, homedir=self.gnupg_home)
key_list: List[Key] = [gpgkey]
envelope.verify(key_list, 1)

# Duplicate GPGKey.
new_key_list = key_list + key_list
with self.assertRaises(SignatureVerificationError):
envelope.verify(new_key_list, 2)


@unittest.skipIf(not HAVE_GPG, "gpg not found")
class TestGPGDSA(unittest.TestCase):
Expand Down Expand Up @@ -733,6 +764,26 @@ def test_gpg_sign_and_verify_object(self):
self.assertTrue(verify_signature(signature, key_data, test_data))
self.assertFalse(verify_signature(signature, key_data, wrong_data))

def test_dsse_envelope(self):
"""Test signing and verifying DSSE signatures."""

# Create the DSSE Envelope.
envelope = Envelope(
payload=b"hello world",
payload_type="http://example.com/HelloWorld",
signatures=[],
)

# Create a GPGSigner and create a DSSE signature.
gpg_signer = GPGSigner(homedir=self.gnupg_home)
gpg_signature = envelope.sign(gpg_signer)
self.assertIsInstance(gpg_signature, GPGSignature)

# Create a GPGKey and verify the DSSE signature.
gpgkey = GPGKey.from_keyring(keyid=self.default_keyid, homedir=self.gnupg_home)
key_list: List[Key] = [gpgkey]
envelope.verify(key_list, 1)



@unittest.skipIf(not HAVE_GPG, "gpg not found")
Expand Down Expand Up @@ -808,6 +859,26 @@ def test_verify_short_signature(self):
key = export_pubkey(self.default_keyid, homedir=self.gnupg_home)
self.assertTrue(verify_signature(signature, key, test_data))

def test_dsse_envelope(self):
"""Test signing and verifying DSSE signatures."""

# Create the DSSE Envelope.
envelope = Envelope(
payload=b"hello world",
payload_type="http://example.com/HelloWorld",
signatures=[],
)

# Create a GPGSigner and create a DSSE signature.
gpg_signer = GPGSigner(homedir=self.gnupg_home)
gpg_signature = envelope.sign(gpg_signer)
self.assertIsInstance(gpg_signature, GPGSignature)

# Create a GPGKey and verify the DSSE signature.
gpgkey = GPGKey.from_keyring(keyid=self.default_keyid, homedir=self.gnupg_home)
key_list: List[Key] = [gpgkey]
envelope.verify(key_list, 1)


if __name__ == "__main__":
unittest.main()
Loading