Skip to content

Commit

Permalink
feat: add lock cleanup for the event of lock leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
flyingImer committed Mar 2, 2022
1 parent 64af8b4 commit 63e00d6
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 4 deletions.
80 changes: 76 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ export class DistributedSemaphore extends Construct {

// TODO: maybe expose via StateMachineFragment?
new StateMachine(this, 'Semaphore', {
definition: this.buildSemaphoreDefinition(locks, 'MySemaphore', 5),
definition: this.buildSemaphoreDefinition(locks, 'MySemaphore', 'currentlockcount', 5),
});
}

private buildSemaphoreDefinition(locks: Table, lockName: string, concurrentAccessLimit: number): IChainable {
const lockCountAttrName = 'currentlockcount';
new StateMachine(this, 'SemaphoreCleanup', {
definition: this.buildCleanup(locks, 'MySemaphore', 'currentlockcount'),
});
}

private buildSemaphoreDefinition(locks: Table, lockName: string, lockCountAttrName: string, concurrentAccessLimit: number): IChainable {
// get lock
const acquireLock = new DynamoUpdateItem(this, 'AcquireLock', {
comment: 'acquire a lock using a conditional update to DynamoDB. This update will do two things: 1) increment a counter for the number of held locks and 2) add an attribute to the DynamoDB Item with a unique key for this execution and with a value of the time when the lock was Acquired. The Update includes a conditional expression that will fail under two circumstances: 1) if the maximum number of locks have already been distributed or 2) if the current execution already owns a lock. The latter check is important to ensure the same execution doesn\'t increase the counter more than once. If either of these conditions are not met, then the task will fail with a DynamoDB.ConditionalCheckFailedException error, retry a few times, then if it is still not successful, it will move off to another branch of the workflow. If this is the first time that a given lockname has been used, there will not be a row in DynamoDB, so the update will fail with DynamoDB.AmazonDynamoDBException. In that case, this state sends the workflow to state that will create that row to initialize.',
Expand Down Expand Up @@ -164,4 +166,74 @@ export class DistributedSemaphore extends Construct {
}))
.next(successState);
}

private buildCleanup(locks: Table, lockName: string, lockCountAttrName: string): IChainable {
const getCurrentLockItem = new DynamoGetItem(this, 'GetCurrentLockItem', {
comment: 'Get info from DDB for the lock item to look and see if this specific owner is still holding a lock',
table: locks,
key: {
// TODO: dynamic lock name from state machine running context, e.g., input
[locks.schema().partitionKey.name]: DynamoAttributeValue.fromString(lockName),
},
expressionAttributeNames: {
'#lockownerid.$': '$.detail.executionArn',
},
projectionExpression: [
new DynamoProjectionExpression().withAttribute('#lockownerid'),
],
resultSelector: {
'Item.$': '$.Item',
'ItemString.$': 'States.JsonToString($.Item)',
},
resultPath: '$.lockinfo.currentlockitem',
consistentRead: true,
});

const successState = new Succeed(this, 'SuccessStateCleanup'); // FIXME: correct the name
const cleanUpLock = new DynamoUpdateItem(this, 'CleanUpLock', {
comment: 'If this lockowerid is still there, then clean it up and release the lock',
table: locks,
key: {
// TODO: dynamic lock name from state machine running context, e.g., input
[locks.schema().partitionKey.name]: DynamoAttributeValue.fromString(lockName),
},
expressionAttributeNames: {
'#currentlockcount': lockCountAttrName,
'#lockownerid.$': '$.detail.executionArn',
},
expressionAttributeValues: {
':decrease': DynamoAttributeValue.fromNumber(1),
},
updateExpression: 'SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid',
conditionExpression: 'attribute_exists(#lockownerid)',
returnValues: DynamoReturnValues.UPDATED_NEW,
});

return getCurrentLockItem.addRetry({
errors: [Errors.ALL],
maxAttempts: 20,
interval: Duration.seconds(5),
backoffRate: 1.4,
}).next(
new Choice(this, 'CheckIfLockIsHeld', {
comment: 'This state checks to see if the execution in question holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock',
}).when(
Condition.and(
Condition.isPresent('$.lockinfo.currentlockitem.ItemString'),
Condition.stringMatches('$.lockinfo.currentlockitem.ItemString', '*Z*'),
),
cleanUpLock.addRetry({
errors: ['DynamoDB.ConditionalCheckFailedException'],
maxAttempts: 0,
}).addRetry({
errors: [Errors.ALL],
maxAttempts: 20,
interval: Duration.seconds(5),
backoffRate: 1.4,
}).addCatch(successState, {
errors: ['DynamoDB.ConditionalCheckFailedException'],
}).next(successState),
).otherwise(successState),
);
}
}
134 changes: 134 additions & 0 deletions test/__snapshots__/construct.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 63e00d6

Please sign in to comment.