From 63e00d66b40d4ad4e4bcd76dbb4928572bb9f5e3 Mon Sep 17 00:00:00 2001 From: flyingImer Date: Wed, 2 Mar 2022 18:41:53 +0000 Subject: [PATCH] feat: add lock cleanup for the event of lock leaks --- src/index.ts | 80 ++++++++++++- test/__snapshots__/construct.test.ts.snap | 134 ++++++++++++++++++++++ 2 files changed, 210 insertions(+), 4 deletions(-) diff --git a/src/index.ts b/src/index.ts index 8ac6b9fc..e7547174 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,13 +17,15 @@ export class DistributedSemaphore extends Construct { // TODO: maybe expose via StateMachineFragment? new StateMachine(this, 'Semaphore', { - definition: this.buildSemaphoreDefinition(locks, 'MySemaphore', 5), + definition: this.buildSemaphoreDefinition(locks, 'MySemaphore', 'currentlockcount', 5), }); - } - private buildSemaphoreDefinition(locks: Table, lockName: string, concurrentAccessLimit: number): IChainable { - const lockCountAttrName = 'currentlockcount'; + new StateMachine(this, 'SemaphoreCleanup', { + definition: this.buildCleanup(locks, 'MySemaphore', 'currentlockcount'), + }); + } + private buildSemaphoreDefinition(locks: Table, lockName: string, lockCountAttrName: string, concurrentAccessLimit: number): IChainable { // get lock const acquireLock = new DynamoUpdateItem(this, 'AcquireLock', { comment: 'acquire a lock using a conditional update to DynamoDB. This update will do two things: 1) increment a counter for the number of held locks and 2) add an attribute to the DynamoDB Item with a unique key for this execution and with a value of the time when the lock was Acquired. The Update includes a conditional expression that will fail under two circumstances: 1) if the maximum number of locks have already been distributed or 2) if the current execution already owns a lock. The latter check is important to ensure the same execution doesn\'t increase the counter more than once. If either of these conditions are not met, then the task will fail with a DynamoDB.ConditionalCheckFailedException error, retry a few times, then if it is still not successful, it will move off to another branch of the workflow. If this is the first time that a given lockname has been used, there will not be a row in DynamoDB, so the update will fail with DynamoDB.AmazonDynamoDBException. In that case, this state sends the workflow to state that will create that row to initialize.', @@ -164,4 +166,74 @@ export class DistributedSemaphore extends Construct { })) .next(successState); } + + private buildCleanup(locks: Table, lockName: string, lockCountAttrName: string): IChainable { + const getCurrentLockItem = new DynamoGetItem(this, 'GetCurrentLockItem', { + comment: 'Get info from DDB for the lock item to look and see if this specific owner is still holding a lock', + table: locks, + key: { + // TODO: dynamic lock name from state machine running context, e.g., input + [locks.schema().partitionKey.name]: DynamoAttributeValue.fromString(lockName), + }, + expressionAttributeNames: { + '#lockownerid.$': '$.detail.executionArn', + }, + projectionExpression: [ + new DynamoProjectionExpression().withAttribute('#lockownerid'), + ], + resultSelector: { + 'Item.$': '$.Item', + 'ItemString.$': 'States.JsonToString($.Item)', + }, + resultPath: '$.lockinfo.currentlockitem', + consistentRead: true, + }); + + const successState = new Succeed(this, 'SuccessStateCleanup'); // FIXME: correct the name + const cleanUpLock = new DynamoUpdateItem(this, 'CleanUpLock', { + comment: 'If this lockowerid is still there, then clean it up and release the lock', + table: locks, + key: { + // TODO: dynamic lock name from state machine running context, e.g., input + [locks.schema().partitionKey.name]: DynamoAttributeValue.fromString(lockName), + }, + expressionAttributeNames: { + '#currentlockcount': lockCountAttrName, + '#lockownerid.$': '$.detail.executionArn', + }, + expressionAttributeValues: { + ':decrease': DynamoAttributeValue.fromNumber(1), + }, + updateExpression: 'SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid', + conditionExpression: 'attribute_exists(#lockownerid)', + returnValues: DynamoReturnValues.UPDATED_NEW, + }); + + return getCurrentLockItem.addRetry({ + errors: [Errors.ALL], + maxAttempts: 20, + interval: Duration.seconds(5), + backoffRate: 1.4, + }).next( + new Choice(this, 'CheckIfLockIsHeld', { + comment: 'This state checks to see if the execution in question holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock', + }).when( + Condition.and( + Condition.isPresent('$.lockinfo.currentlockitem.ItemString'), + Condition.stringMatches('$.lockinfo.currentlockitem.ItemString', '*Z*'), + ), + cleanUpLock.addRetry({ + errors: ['DynamoDB.ConditionalCheckFailedException'], + maxAttempts: 0, + }).addRetry({ + errors: [Errors.ALL], + maxAttempts: 20, + interval: Duration.seconds(5), + backoffRate: 1.4, + }).addCatch(successState, { + errors: ['DynamoDB.ConditionalCheckFailedException'], + }).next(successState), + ).otherwise(successState), + ); + } } \ No newline at end of file diff --git a/test/__snapshots__/construct.test.ts.snap b/test/__snapshots__/construct.test.ts.snap index 09f67c9f..25c88dea 100644 --- a/test/__snapshots__/construct.test.ts.snap +++ b/test/__snapshots__/construct.test.ts.snap @@ -294,6 +294,140 @@ Object { }, "Type": "AWS::IAM::Policy", }, + "DistributedSemaphoreSemaphoreCleanup4070C6CC": Object { + "DependsOn": Array [ + "DistributedSemaphoreSemaphoreCleanupRoleDefaultPolicy0ADCE922", + "DistributedSemaphoreSemaphoreCleanupRoleCA3B1346", + ], + "Properties": Object { + "DefinitionString": Object { + "Fn::Join": Array [ + "", + Array [ + "{\\"StartAt\\":\\"GetCurrentLockItem\\",\\"States\\":{\\"GetCurrentLockItem\\":{\\"Next\\":\\"CheckIfLockIsHeld\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"States.ALL\\"],\\"IntervalSeconds\\":5,\\"MaxAttempts\\":20,\\"BackoffRate\\":1.4}],\\"Type\\":\\"Task\\",\\"Comment\\":\\"Get info from DDB for the lock item to look and see if this specific owner is still holding a lock\\",\\"ResultPath\\":\\"$.lockinfo.currentlockitem\\",\\"ResultSelector\\":{\\"Item.$\\":\\"$.Item\\",\\"ItemString.$\\":\\"States.JsonToString($.Item)\\"},\\"Resource\\":\\"arn:", + Object { + "Ref": "AWS::Partition", + }, + ":states:::dynamodb:getItem\\",\\"Parameters\\":{\\"Key\\":{\\"LockName\\":{\\"S\\":\\"MySemaphore\\"}},\\"TableName\\":\\"", + Object { + "Ref": "DistributedSemaphoreLockTable45D69AC1", + }, + "\\",\\"ConsistentRead\\":true,\\"ExpressionAttributeNames\\":{\\"#lockownerid.$\\":\\"$.detail.executionArn\\"},\\"ProjectionExpression\\":\\"#lockownerid\\"}},\\"CheckIfLockIsHeld\\":{\\"Type\\":\\"Choice\\",\\"Comment\\":\\"This state checks to see if the execution in question holds a lock. It can tell that by looking for Z, which will be indicative of the timestamp value. That will only be there in the stringified version of the data returned from DDB if this execution holds a lock\\",\\"Choices\\":[{\\"And\\":[{\\"Variable\\":\\"$.lockinfo.currentlockitem.ItemString\\",\\"IsPresent\\":true},{\\"Variable\\":\\"$.lockinfo.currentlockitem.ItemString\\",\\"StringMatches\\":\\"*Z*\\"}],\\"Next\\":\\"CleanUpLock\\"}],\\"Default\\":\\"SuccessStateCleanup\\"},\\"SuccessStateCleanup\\":{\\"Type\\":\\"Succeed\\"},\\"CleanUpLock\\":{\\"Next\\":\\"SuccessStateCleanup\\",\\"Retry\\":[{\\"ErrorEquals\\":[\\"DynamoDB.ConditionalCheckFailedException\\"],\\"MaxAttempts\\":0},{\\"ErrorEquals\\":[\\"States.ALL\\"],\\"IntervalSeconds\\":5,\\"MaxAttempts\\":20,\\"BackoffRate\\":1.4}],\\"Catch\\":[{\\"ErrorEquals\\":[\\"DynamoDB.ConditionalCheckFailedException\\"],\\"Next\\":\\"SuccessStateCleanup\\"}],\\"Type\\":\\"Task\\",\\"Comment\\":\\"If this lockowerid is still there, then clean it up and release the lock\\",\\"Resource\\":\\"arn:", + Object { + "Ref": "AWS::Partition", + }, + ":states:::dynamodb:updateItem\\",\\"Parameters\\":{\\"Key\\":{\\"LockName\\":{\\"S\\":\\"MySemaphore\\"}},\\"TableName\\":\\"", + Object { + "Ref": "DistributedSemaphoreLockTable45D69AC1", + }, + "\\",\\"ConditionExpression\\":\\"attribute_exists(#lockownerid)\\",\\"ExpressionAttributeNames\\":{\\"#currentlockcount\\":\\"currentlockcount\\",\\"#lockownerid.$\\":\\"$.detail.executionArn\\"},\\"ExpressionAttributeValues\\":{\\":decrease\\":{\\"N\\":\\"1\\"}},\\"ReturnValues\\":\\"UPDATED_NEW\\",\\"UpdateExpression\\":\\"SET #currentlockcount = #currentlockcount - :decrease REMOVE #lockownerid\\"}}}}", + ], + ], + }, + "RoleArn": Object { + "Fn::GetAtt": Array [ + "DistributedSemaphoreSemaphoreCleanupRoleCA3B1346", + "Arn", + ], + }, + }, + "Type": "AWS::StepFunctions::StateMachine", + }, + "DistributedSemaphoreSemaphoreCleanupRoleCA3B1346": Object { + "Properties": Object { + "AssumeRolePolicyDocument": Object { + "Statement": Array [ + Object { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": Object { + "Service": Object { + "Fn::FindInMap": Array [ + "ServiceprincipalMap", + Object { + "Ref": "AWS::Region", + }, + "states", + ], + }, + }, + }, + ], + "Version": "2012-10-17", + }, + }, + "Type": "AWS::IAM::Role", + }, + "DistributedSemaphoreSemaphoreCleanupRoleDefaultPolicy0ADCE922": Object { + "Properties": Object { + "PolicyDocument": Object { + "Statement": Array [ + Object { + "Action": "dynamodb:GetItem", + "Effect": "Allow", + "Resource": Object { + "Fn::Join": Array [ + "", + Array [ + "arn:", + Object { + "Ref": "AWS::Partition", + }, + ":dynamodb:", + Object { + "Ref": "AWS::Region", + }, + ":", + Object { + "Ref": "AWS::AccountId", + }, + ":table/", + Object { + "Ref": "DistributedSemaphoreLockTable45D69AC1", + }, + ], + ], + }, + }, + Object { + "Action": "dynamodb:UpdateItem", + "Effect": "Allow", + "Resource": Object { + "Fn::Join": Array [ + "", + Array [ + "arn:", + Object { + "Ref": "AWS::Partition", + }, + ":dynamodb:", + Object { + "Ref": "AWS::Region", + }, + ":", + Object { + "Ref": "AWS::AccountId", + }, + ":table/", + Object { + "Ref": "DistributedSemaphoreLockTable45D69AC1", + }, + ], + ], + }, + }, + ], + "Version": "2012-10-17", + }, + "PolicyName": "DistributedSemaphoreSemaphoreCleanupRoleDefaultPolicy0ADCE922", + "Roles": Array [ + Object { + "Ref": "DistributedSemaphoreSemaphoreCleanupRoleCA3B1346", + }, + ], + }, + "Type": "AWS::IAM::Policy", + }, }, } `;