Skip to content

Commit

Permalink
Add a withClockSkewUpperBound option when acquiring a lock
Browse files Browse the repository at this point in the history
This commit addresses issue awslabs#44 by providing a new option to utilize
wall clock time along with a provided error bound to determine if a
lock is expired.

Previously it was impossible to determine if a lease was expired without
blocking for at least lease duration and seeing if the version had changed.
Thus, if you never block and the lease wasn't explicitly marked as released
then the lock was unable to ever be acquired again. By providing a correct
upper clock skew error bound clients can correctly take over locks which
have expired but not been explicitly released without blocking.

In most distributed systems relying on wall clock time is generally not
correct but in this case we can provide an upper clock skew error bound
on the scale of minutes without facing any negative consequences for most
clients. This tradeoff of having a lease be unacquireable for several
minutes is possibly better than being forced to block in many use cases.
  • Loading branch information
Tristan Ohlson committed Mar 10, 2023
1 parent 0cd3fe8 commit 6f68fb9
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AcquireLockOptions {
private final Boolean acquireReleasedLocksConsistently;
private final Optional<SessionMonitor> sessionMonitor;
private final Boolean reentrant;
private final Optional<Long> clockSkewUpperBound;

/**
* Setting this flag to true will prevent the thread from being blocked (put to sleep) for the lease duration and
Expand Down Expand Up @@ -72,6 +73,7 @@ public static class AcquireLockOptionsBuilder {
private Boolean updateExistingLockRecord;
private Boolean acquireReleasedLocksConsistently;
private Boolean reentrant;
private Optional<Long> clockSkewUpperBound;

private long safeTimeWithoutHeartbeat;
private Optional<Runnable> sessionMonitorCallback;
Expand All @@ -90,6 +92,7 @@ public static class AcquireLockOptionsBuilder {
this.shouldSkipBlockingWait = false;
this.acquireReleasedLocksConsistently = false;
this.reentrant = false;
this.clockSkewUpperBound = Optional.empty();;
}

/**
Expand Down Expand Up @@ -256,6 +259,22 @@ public AcquireLockOptionsBuilder withReentrant(final boolean reentrant) {
return this;
}

/**
* In combination with withShouldSkipBlockingWait(true) this allows a node to rely on a lastTouchedAt value on the DynamoDb
* entry to take over locks which have expired but have not been deleted or marked as released within DynamoDb (which can occur
* due to an ungraceful shutdown of the owning node).
*
* It's critically important that this error bound is accurate to the nodes that are relying on the lock client. If not,
* correctness problems can occur.
*
* @param clockSkewUpperBound the upper error bound of clock skew across the nodes running this client
* @return a reference to this builder for fluent method chaining
*/
public AcquireLockOptionsBuilder withClockSkewUpperBound(final Long clockSkewUpperBound) {
this.clockSkewUpperBound = Optional.ofNullable(clockSkewUpperBound);
return this;
}

/**
* <p>
* Registers a "SessionMonitor."
Expand Down Expand Up @@ -333,7 +352,7 @@ public AcquireLockOptions build() {
}
return new AcquireLockOptions(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists,
this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, sessionMonitor,
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound);
}

@Override
Expand All @@ -342,7 +361,8 @@ public String toString() {
+ this.replaceData + ", deleteLockOnRelease=" + this.deleteLockOnRelease + ", refreshPeriod=" + this.refreshPeriod + ", additionalTimeToWaitForLock="
+ this.additionalTimeToWaitForLock + ", timeUnit=" + this.timeUnit + ", additionalAttributes=" + this.additionalAttributes + ", safeTimeWithoutHeartbeat="
+ this.safeTimeWithoutHeartbeat + ", sessionMonitorCallback=" + this.sessionMonitorCallback + ", acquireReleasedLocksConsistently="
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ")";
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ", this.clockSkewUpperBound=" + this.clockSkewUpperBound
+ ")";
}
}

Expand All @@ -360,7 +380,8 @@ public static AcquireLockOptionsBuilder builder(final String partitionKey) {
private AcquireLockOptions(final String partitionKey, final Optional<String> sortKey, final Optional<ByteBuffer> data, final Boolean replaceData,
final Boolean deleteLockOnRelease, final Boolean acquireOnlyIfLockAlreadyExists, final Long refreshPeriod, final Long additionalTimeToWaitForLock,
final TimeUnit timeUnit, final Map<String, AttributeValue> additionalAttributes, final Optional<SessionMonitor> sessionMonitor,
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant) {
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant,
final Optional<Long> clockSkewUpperBound) {
this.partitionKey = partitionKey;
this.sortKey = sortKey;
this.data = data;
Expand All @@ -376,6 +397,7 @@ private AcquireLockOptions(final String partitionKey, final Optional<String> sor
this.shouldSkipBlockingWait = shouldSkipBlockingWait;
this.acquireReleasedLocksConsistently = acquireReleasedLocksConsistently;
this.reentrant = reentrant;
this.clockSkewUpperBound = clockSkewUpperBound;
}

String getPartitionKey() {
Expand Down Expand Up @@ -424,6 +446,8 @@ Boolean getReentrant() {
return this.reentrant;
}

Optional<Long> getClockSkewUpperBound() { return this.clockSkewUpperBound; }

Map<String, AttributeValue> getAdditionalAttributes() {
return this.additionalAttributes;
}
Expand Down Expand Up @@ -460,15 +484,16 @@ public boolean equals(final Object other) {
&& Objects.equals(this.updateExistingLockRecord, otherOptions.updateExistingLockRecord)
&& Objects.equals(this.shouldSkipBlockingWait, otherOptions.shouldSkipBlockingWait)
&& Objects.equals(this.acquireReleasedLocksConsistently, otherOptions.acquireReleasedLocksConsistently)
&& Objects.equals(this.reentrant, otherOptions.reentrant);
&& Objects.equals(this.reentrant, otherOptions.reentrant)
&& Objects.equals(this.clockSkewUpperBound, otherOptions.clockSkewUpperBound);
}

@Override
public int hashCode() {
return Objects.hash(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease,
this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit,
this.additionalAttributes, this.sessionMonitor, this.updateExistingLockRecord,
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.clockSkewUpperBound);

}

Expand Down
Loading

0 comments on commit 6f68fb9

Please sign in to comment.