Skip to content

Commit

Permalink
Fix KV create race after delete/purge
Browse files Browse the repository at this point in the history
This change fixes an issue when using a JetStream K/V store where a user
is creating, deleting, and re-creating keys. If the last entry for a key
is a `Operation::Delete` or `Operation::Purge`, the initial
`self.update()` returns an error, causing the second part of the method
to be exercised.

Prior to this change, if the entry was deleted or purged a `kv.put()`
call is used which ignores the revision of that last entry. A single
writer to the K/V store would succeed (as no other writers would write
first) so no problem. However, if 2 writers attempt to create a key,
then a second writer *could* call the `kv.put()` before the first writer
calls `kv.put()`. This means that *both* writers get an `Ok(revision)`
and can assume that they won the creation of the key.

When using a "distributed lock" pattern (that is many writers race to
create a key and the first successful writer wins), this above scenario
results in potentially more than one writer who believes they have
uniquely acquired the distributed lock.

This change replaces the `kv.put()` call to a `kv.update()` call and
provides the `revision` from the deleted/purged entry to ensure that no
other writer has beaten the caller to this update. This change closes
the race period between concurrent writers to between the first update
and the second update call with some optimistic write concurrency to
detect another writer.

It appears as though this strategy is in effect in the Go client code
[kv.Create] implementation.

[kv.Create]: https://github.com/nats-io/nats.go/blob/278f9f188bca4d7bdee283a0e98ab66b82530c60/jetstream/kv.go#L944-L963

Co-authored-by: John Keiser <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
Signed-off-by: Fletcher Nichol <[email protected]>
  • Loading branch information
fnichol and John Keiser committed Aug 14, 2024
1 parent e9919a0 commit 4e8c526
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ impl Store {
// Deleted or Purged key, we can create it again.
Some(Entry {
operation: Operation::Delete | Operation::Purge,
revision,
..
}) => {
let revision = self.put(key, value).await?;
let revision = self.update(key, value, revision).await?;
Ok(revision)
}

Expand Down Expand Up @@ -1250,6 +1251,7 @@ impl From<UpdateError> for CreateError {
match error.kind() {
UpdateErrorKind::InvalidKey => Error::from(CreateErrorKind::InvalidKey),
UpdateErrorKind::TimedOut => Error::from(CreateErrorKind::Publish),
UpdateErrorKind::WrongLastRevision => Error::from(CreateErrorKind::AlreadyExists),
UpdateErrorKind::Other => Error::from(CreateErrorKind::Other),
}
}
Expand Down Expand Up @@ -1362,6 +1364,7 @@ crate::from_with_timeout!(WatchError, WatchErrorKind, StreamError, StreamErrorKi
pub enum UpdateErrorKind {
InvalidKey,
TimedOut,
WrongLastRevision,
Other,
}

Expand All @@ -1370,14 +1373,25 @@ impl Display for UpdateErrorKind {
match self {
Self::InvalidKey => write!(f, "key cannot be empty or start/end with `.`"),
Self::TimedOut => write!(f, "timed out"),
Self::WrongLastRevision => write!(f, "wrong last revision"),
Self::Other => write!(f, "failed getting entry"),
}
}
}

pub type UpdateError = Error<UpdateErrorKind>;

crate::from_with_timeout!(UpdateError, UpdateErrorKind, PublishError, PublishErrorKind);
impl From<PublishError> for UpdateError {
fn from(err: PublishError) -> Self {
match err.kind() {
PublishErrorKind::TimedOut => Self::new(UpdateErrorKind::TimedOut),
PublishErrorKind::WrongLastSequence => {
Self::with_source(UpdateErrorKind::WrongLastRevision, err)
}
_ => Self::with_source(UpdateErrorKind::Other, err),
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum WatcherErrorKind {
Expand Down

0 comments on commit 4e8c526

Please sign in to comment.