Skip to content

Commit

Permalink
Fix transaction implementation in LogState::validate. (#243)
Browse files Browse the repository at this point in the history
This PR removes the incorrect transaction implementation for
`LogState::validate`.

Now `validate` takes ownership of `self` and returns `Result<Self,
ValidationError>`.

This means that callers that expect to keep the log state following an
invalid log entry must clone the state prior to validation.

As the in-memory data store is the only store that persists the log
state in memory, it now clones the state before validation and updates
the log state upon successful validation.

For the postgres data store, the log state was loaded from the database
and is discarded on error, so no clone is necessary.

Fixes #242.
  • Loading branch information
peterhuene authored Feb 7, 2024
1 parent c46ec79 commit cf62106
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 207 deletions.
15 changes: 8 additions & 7 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
if operator.head_registry_index.is_none()
|| proto_envelope.registry_index > operator.head_registry_index.unwrap()
{
operator
operator.state = operator
.state
.validate(&proto_envelope.envelope)
.map_err(|inner| ClientError::OperatorValidationFailed { inner })?;
Expand All @@ -454,12 +454,13 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
if package.head_registry_index.is_none()
|| proto_envelope.registry_index > package.head_registry_index.unwrap()
{
package
.state
.validate(&proto_envelope.envelope)
.map_err(|inner| ClientError::PackageValidationFailed {
name: package.name.clone(),
inner,
let state = std::mem::take(&mut package.state);
package.state =
state.validate(&proto_envelope.envelope).map_err(|inner| {
ClientError::PackageValidationFailed {
name: package.name.clone(),
inner,
}
})?;
package.head_registry_index = Some(proto_envelope.registry_index);
package.head_fetch_token = Some(record.fetch_token);
Expand Down
4 changes: 2 additions & 2 deletions crates/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub trait Record: Clone + Decode + Send + Sync {
fn contents(&self) -> HashSet<&AnyHash>;
}

/// Trait implemented by the validator types.
/// Trait implemented by the log state types.
pub trait Validator:
std::fmt::Debug + Serialize + DeserializeOwned + Default + Send + Sync
{
Expand All @@ -33,7 +33,7 @@ pub trait Validator:
type Error: Send;

/// Validates the given record.
fn validate(&mut self, record: &ProtoEnvelope<Self::Record>) -> Result<(), Self::Error>;
fn validate(self, record: &ProtoEnvelope<Self::Record>) -> Result<Self, Self::Error>;
}

/// Helpers for converting to and from protobuf
Expand Down
109 changes: 26 additions & 83 deletions crates/protocol/src/operator/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ pub struct LogState {
/// This is `None` until the first (i.e. init) record is validated.
#[serde(skip_serializing_if = "Option::is_none")]
algorithm: Option<HashAlgorithm>,
/// The current head of the validator.
/// The current head of the state.
#[serde(skip_serializing_if = "Option::is_none")]
head: Option<Head>,
/// The permissions of each key.
#[serde(skip_serializing_if = "IndexMap::is_empty")]
permissions: IndexMap<signing::KeyID, IndexSet<model::Permission>>,
/// The keys known to the validator.
/// The keys known to the state.
#[serde(skip_serializing_if = "IndexMap::is_empty")]
keys: IndexMap<signing::KeyID, signing::PublicKey>,
/// The namespaces known to the validator. The key is the lowercased namespace.
/// The namespaces known to the state. The key is the lowercased namespace.
#[serde(skip_serializing_if = "IndexMap::is_empty")]
namespaces: IndexMap<String, NamespaceDefinition>,
}
Expand All @@ -146,20 +146,14 @@ impl LogState {
/// It is expected that `validate` is called in order of the
/// records in the log.
///
/// This operation is transactional: if any entry in the record
/// fails to validate, the validator state will remain unchanged.
/// Note that on failure, the log state is consumed to prevent
/// invalid state from being used in future validations.
pub fn validate(
&mut self,
mut self,
record: &ProtoEnvelope<model::OperatorRecord>,
) -> Result<(), ValidationError> {
let snapshot = self.snapshot();

let result = self.validate_record(record);
if result.is_err() {
self.rollback(snapshot);
}

result
) -> Result<Self, ValidationError> {
self.validate_record(record)?;
Ok(self)
}

/// Gets the public key of the given key id.
Expand Down Expand Up @@ -230,7 +224,7 @@ impl LogState {
// Validate the envelope signature
model::OperatorRecord::verify(key, envelope.content_bytes(), envelope.signature())?;

// Update the validator head
// Update the state head
self.head = Some(Head {
digest: RecordId::operator_record::<Sha256>(envelope),
timestamp: record.timestamp,
Expand Down Expand Up @@ -466,59 +460,17 @@ impl LogState {
}
Ok(())
}

fn snapshot(&self) -> Snapshot {
let Self {
algorithm,
head,
permissions,
keys,
namespaces,
} = self;

Snapshot {
algorithm: *algorithm,
head: head.clone(),
permissions: permissions.len(),
keys: keys.len(),
namespaces: namespaces.len(),
}
}

fn rollback(&mut self, snapshot: Snapshot) {
let Snapshot {
algorithm,
head,
permissions,
keys,
namespaces,
} = snapshot;

self.algorithm = algorithm;
self.head = head;
self.permissions.truncate(permissions);
self.keys.truncate(keys);
self.namespaces.truncate(namespaces);
}
}

impl crate::Validator for LogState {
type Record = model::OperatorRecord;
type Error = ValidationError;

fn validate(&mut self, record: &ProtoEnvelope<Self::Record>) -> Result<(), Self::Error> {
fn validate(self, record: &ProtoEnvelope<Self::Record>) -> Result<Self, Self::Error> {
self.validate(record)
}
}

struct Snapshot {
algorithm: Option<HashAlgorithm>,
head: Option<Head>,
permissions: usize,
keys: usize,
namespaces: usize,
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -547,11 +499,11 @@ mod tests {

let envelope =
ProtoEnvelope::signed_contents(&alice_priv, record).expect("failed to sign envelope");
let mut validator = LogState::default();
validator.validate(&envelope).unwrap();
let state = LogState::default();
let state = state.validate(&envelope).unwrap();

assert_eq!(
validator,
state,
LogState {
head: Some(Head {
digest: RecordId::operator_record::<Sha256>(&envelope),
Expand Down Expand Up @@ -591,8 +543,8 @@ mod tests {

let envelope =
ProtoEnvelope::signed_contents(&alice_priv, record).expect("failed to sign envelope");
let mut validator = LogState::default();
validator.validate(&envelope).unwrap();
let state = LogState::default();
let state = state.validate(&envelope).unwrap();

let expected = LogState {
head: Some(Head {
Expand All @@ -612,7 +564,7 @@ mod tests {
namespaces: IndexMap::new(),
};

assert_eq!(validator, expected);
assert_eq!(state, expected);

let record = model::OperatorRecord {
prev: Some(RecordId::operator_record::<Sha256>(&envelope)),
Expand All @@ -639,14 +591,11 @@ mod tests {
let envelope =
ProtoEnvelope::signed_contents(&alice_priv, record).expect("failed to sign envelope");

// This validation should fail and the validator state should remain unchanged
match validator.validate(&envelope).unwrap_err() {
// This validation should fail
match state.validate(&envelope).unwrap_err() {
ValidationError::PermissionNotFoundToRevoke { .. } => {}
_ => panic!("expected a different error"),
}

// The validator should not have changed
assert_eq!(validator, expected);
}

#[test]
Expand Down Expand Up @@ -676,8 +625,8 @@ mod tests {

let envelope =
ProtoEnvelope::signed_contents(&alice_priv, record).expect("failed to sign envelope");
let mut validator = LogState::default();
validator.validate(&envelope).unwrap();
let state = LogState::default();
let state = state.validate(&envelope).unwrap();

let expected = LogState {
head: Some(Head {
Expand Down Expand Up @@ -714,7 +663,7 @@ mod tests {
]),
};

assert_eq!(validator, expected);
assert_eq!(state, expected);

{
let record = model::OperatorRecord {
Expand All @@ -737,14 +686,11 @@ mod tests {
let envelope = ProtoEnvelope::signed_contents(&alice_priv, record)
.expect("failed to sign envelope");

// This validation should fail and the validator state should remain unchanged
match validator.validate(&envelope).unwrap_err() {
// This validation should fail
match state.clone().validate(&envelope).unwrap_err() {
ValidationError::NamespaceAlreadyDefined { .. } => {}
_ => panic!("expected a different error"),
}

// The validator should not have changed
assert_eq!(validator, expected);
}

{
Expand All @@ -768,14 +714,11 @@ mod tests {
let envelope = ProtoEnvelope::signed_contents(&alice_priv, record)
.expect("failed to sign envelope");

// This validation should fail and the validator state should remain unchanged
match validator.validate(&envelope).unwrap_err() {
// This validation should fail
match state.validate(&envelope).unwrap_err() {
ValidationError::NamespaceConflict { .. } => {}
_ => panic!("expected a different error"),
}

// The validator should not have changed
assert_eq!(validator, expected);
}
}
}
Loading

0 comments on commit cf62106

Please sign in to comment.