Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer stops consuming events and throws KinesisMessageDrivenChannelAdapter : The lock for key 'xxxxxxxx:shardId-00000000000X' was not renewed in time #186

Closed
malvine opened this issue May 11, 2022 · 43 comments

Comments

@malvine
Copy link

malvine commented May 11, 2022

Hi,

I've got interesting use case and it's very simple to replicate.
I am using the latest binder version.

My setup:

  • One Kinesis stream with two shards.
  • One consumer app with 3 instances let's call them instance1, instance2, instance3.
  • Latest binder version.

To replicate:

  1. Start the consumer: usually one instance gets all both shards as owner. Let's say instance1 gets all the shards. So far so good. Instance consuming events.

  2. Restart the active consuming instance e.g. instance1. Instance2 will get now shard-0 and instance3 gets shard-1 as owner. (This step might take couple of attempts as sometimes instance2 or instance3 gets all the shards. Keep trying until you get one shards per instance)

  3. Once you got instance1 - doing nothing; instance2 owner of shard-0 and instance-3 owner of shard-1, that's when the instances stop consuming any events and both of them have this in the logs every 30 seconds:


.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'xxxxxxxx:shardId-000000000002' was not renewed in time java.util.concurrent.TimeoutException: null 	at 
java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(Kin
esisMessageDrivenChannelAdapter.java:1030) ~[spring-integration-aws-2.4.0.jar:na] 
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMes
sageDrivenChannelAdapter.java:946) ~[spring-integration-aws-2.4.0.jar:na] 
org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:856) ~[spring-integration-aws-2.4.0.jar:na 

The only way to stop this is to restart one of the consuming instances so all the shards are back to one instance.

The binder version 2.0.1.RELEASE doesn't have this issue.
Pretty sure it's not even the binder issue - it's probably integrations-aws lib issue.

Thanks.

@artembilan
Copy link
Member

I think there must be some other logs related to the lock processing.
Something like:

logger.error(ex, () -> "Error during locking: " + lock);

Would you mind to share that one with us as well?

Also: is there a chance to use really the latest 2.2.0 Kinesis binder version: https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kinesis ?

The spring-integration-aws-2.4.0 is not the latest one...

@malvine
Copy link
Author

malvine commented May 11, 2022

yeah, sorry was trying with 2.1.0 thought if that one doesn't have the issue.
ok. with the latest version 2.2.0 get exactly the same.

No error before that or after. just that endlessly on both instances.

2022-05-11 17:39:28.046  INFO 99193 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49629266667711709536924126896771570517185546512247029778', timestamp=null, stream='xxxxxx', shard='shardId-000000000001', reset=false}, state=NEW}] has been started.
2022-05-11 17:39:39.020  INFO 99193 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'xx:xxxxxx:shardId-000000000001' was not renewed in time

java.util.concurrent.TimeoutException: null
	at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1950) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2085) ~[na:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1031) ~[spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) ~[spring-integration-aws-2.5.1.jar:na]
	at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) ~[spring-integration-aws-2.5.1.jar:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

@artembilan artembilan transferred this issue from spring-cloud/spring-cloud-stream-binder-aws-kinesis May 11, 2022
@artembilan
Copy link
Member

OK. Any chances that you can share a simple project to reproduce?
Is it possible for me to run it against LocalStack then?

@malvine
Copy link
Author

malvine commented May 13, 2022

Yeah, I did put a quick project together and seems like on LocalStack you can easy to replicate the issue. Once you get shards being consumed by not one instance but two.
Here is the link

@artembilan
Copy link
Member

Hi @malvine !

Thank you for the sample.
I definitely could reproduce it locally with my Docker.
It's impossible to debug it, but at least I see where we are heading.
So, I guess we can explain the problem like this:
As long as consumer doesn't take all the shards ownership from the stream, it doesn't consume even from a shard it owns, neither update the lock for the shard in DynamoDb.
I mean it looks like lock manager is broken for this partial situation somehow...

Will look into code today.

@artembilan
Copy link
Member

Well, I'm not sure, but it looks like the problem is here:


    /**
     * Setting this flag to true will prevent the thread from being blocked (put to sleep) for the lease duration and
     * instead will return the call with the lock not granted exception back to the caller. It is up to the caller to
     * optionally back-off and retry and to acquire the lock.
     */
    private final boolean shouldSkipBlockingWait;

Which we don't set in our DynamoDbLockRegistry and then in the KinesisMessageDrivenChannelAdapter.ShardConsumerManager we do block on this line:

if (lock.tryLock()) {
	this.locks.put(key, lock);
}

So, if another instance is holder for the lock, we don't return immediately as it states for the tryLock() contract, but rather step into a busy-wait loop in the AmazonDynamoDBLockClient.acquireLock().

Does it make sense?

I'll try to fix in my local Spring Integration AWS copy and re-run your solution to verify.

@artembilan
Copy link
Member

So, yeah... My assumption and investigation is correct.
The tryLock() contraction must be honored in the DynamoDbLockRegistry.
Otherwise the AmazonDynamoDBLockClient steps into a busy-wait loop and never fulfill a Future for the lock.

I'm going to push the fix today, but if you need a release for Spring Integration AWS 2.5.2, then I can do that tomorrow.

I'm moving this to Spring Integration AWS project though...

@malvine
Copy link
Author

malvine commented May 18, 2022

@artembilan thank you very much!

@postalservice14
Copy link

Did this get resolved? I'm running into this exact issue - even when upgrading Spring Integration AWS to 2.5.4.

I was even still able to reproduce it with the quick project that @malvine made using Spring Integration AWS to 2.5.4.

@artembilan
Copy link
Member

@postalservice14 ,

can you share, please, an updated sample project to see what is going on?
I mean how are you sure that you have updated Spring Integration AWS to 2.5.4?
There is no explicit dependency for that in the mentioned project.
Not a fact that it is not overridden transitively by Kinesis Binder provided.

@postalservice14
Copy link

postalservice14 commented Feb 1, 2023

@artembilan thanks for the quick reply! Here's a fork of his project upgraded to 2.5.4 (you can see the latest commit). And I can still get it to happen using his instructions in the readme.

https://github.com/postalservice14/kinesis-lock-was-not-renewed-in-time

@artembilan
Copy link
Member

@postalservice14 ,

any chances that you can upgrade your solution (not that sample app) to the latest Spring Boot and org.springframework.cloud:spring-cloud-stream-binder-kinesis:3.0.0 ?
There is no @EnableBinding and friends any more in Spring Cloud Stream: everything has to be mapped via java.util.function primitives.

I'll try to reproduce over weekend in a simple unit test with Testcontainers.
I also don't see a reason in a producer side for that sample: according to the issue description we got a problem just with shards assignments - no need in any data!

Well, I'd prefer to have a new GH issue with fresh, actual and relevant info instead of trying to resurrect this issue against out-dated versions.
Thanks for understanding!

@artembilan artembilan transferred this issue from spring-cloud/spring-cloud-stream Feb 3, 2023
@artembilan
Copy link
Member

OK. Was able to reproduce locally as a parallel unit test against Testcontainers with LocalStack image.
Reopening.
Not sure what to do yet...

@artembilan artembilan reopened this Feb 9, 2023
@artembilan
Copy link
Member

Found the problem.
When we have several instances competing for shards we stuck on the if (lock.tryLock()) { exactly for leaseDuration on the lock which does not belong to us.
This is not OK and must be fixed this or that way since lock.tryLock() contract must return immediately if no way to lock at that moment.
I think there is something wrong in the AmazonDynamoDBLockClient by itself, but I'll see how that can be mitigated on our side.

Thank you for the patience!

@artembilan
Copy link
Member

So, the code in that Lock client looks like this:

   /*
                         * Someone else has the lock, and they have the lock for LEASE_DURATION time. At this point, we need
                         * to wait at least LEASE_DURATION milliseconds before we can try to acquire the lock.
                         */
                        lockTryingToBeAcquired = existingLock.get();
                        if (!alreadySleptOnceForOneLeasePeriod) {
                            alreadySleptOnceForOneLeasePeriod = true;
                            millisecondsToWait += existingLock.get().getLeaseDuration();
                        }

I really doubt that it is OK to block it that long period.
But we just don't have a choice since that's how that client works.

Some related discussion is here: spring-projects/spring-integration-aws#219 which leads us to the conclusion that we cannot use shouldSkipBlockingWait since isExpired is not updated on the item.

I don't know yet what is the proper fix must be done, perhaps a combination of getLock() and then tryAcquireLock() to avoid leaseDuration busy-spin loop, but for now a workaround is like this:
the leaseDuration must be less than lockRenewalTimeout on the KinesisMessageDrivenChannelAdapter, which is a 10 seconds by default.

@artembilan
Copy link
Member

Right. See here for a bug on AWS SDK side: awslabs/amazon-dynamodb-lock-client#44

artembilan added a commit to spring-projects/spring-integration-aws that referenced this issue Feb 14, 2023
Fixes spring-cloud/spring-cloud-stream-binder-aws-kinesis#186

The `DynamoDbLockClient` waits extra `leaseDuration` time in a loop breaking a `tryLock()` contract.

* Fix `DynamoDbLockRegistry.tryLock()` to decrease an actual `additionalTimeToWait` by `leaseDuration`,
so the target `DynamoDbLockClient` when it adds this `leaseDuration` will wait an actual timeout requested
by the `tryLock()` contract.
This way a `tryLock(0)` will definitely return immediately since we really are not interested in blocking

**cherry-pick to 2.5.x**

# Conflicts:
#	build.gradle
@artembilan
Copy link
Member

So, I have just made some workaround fix in the DynamoDbLockRegistry to make a time to wait as negative.
The Lock client then adds a leaseDuration to realign the timeout with our tryLock() expectations.
Give a try to the latest 2.5.5-SNAPSHOT for your solution!

@RomanAbakumov
Copy link

@artembilan I'm having the same issue, can you provide a URL of the repository that contains 2.5.5-SNAPSHOT?

@artembilan
Copy link
Member

@RomanAbakumov ,

https://repo.spring.io/snapshot

Essentially here: https://repo.spring.io/snapshot/org/springframework/integration/spring-integration-aws/2.5.5-SNAPSHOT/

@RomanAbakumov
Copy link

@artembilan version 2.5.5 doesn't have those messages about the locks, but I now have another issue.

If my app is not released the locks properly (like JVM is dies or k8s rolling restart) then there are old locks that stay in the lock table, and those locks are not released on the next app startup.
This prevents the app from consuming messages after restart, it just waits indefinitely.
If I manually remove locks from the table it starts to consume messages.

I've tested with the older version and the old locks are not an issue there, it just waits for some time and locks get released automatically.

@artembilan
Copy link
Member

Yeah... i think we are facing this one: awslabs/amazon-dynamodb-lock-client#79.
Kinda there is no info in the lock DynamoDB record that it was not used for a while.

It's probably worth to try with much shorter leaseDuration.

@artembilan
Copy link
Member

You know I have just implemented our own DynamoDbLockRepository: spring-projects/spring-integration-aws@594ea58.
It does properly set a TTL attribute which may calculates the time in the future when item has to be expired.
This should work nicely with DynamoDB TTL feature: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html.

I still have some changes to be done on the binder level to start a new 4.0.0 version based on that breaking change in Spring integration AWS 3.0.0.

@RomanAbakumov
Copy link

That sounds awesome, I've tried to play with leaseDuration however, I got controversial results.
Please keep posted about the progress.
Thanks @artembilan

@artembilan
Copy link
Member

So, here is a Kinesis Binder 4.0.0-SNAPSHOT for your consideration: https://repo.spring.io/snapshot/org/springframework/cloud/spring-cloud-stream-binder-kinesis/4.0.0-SNAPSHOT.
Note: if you use custom values for partitionKey, sortKeyName, sortKey and heartbeatPeriod, they are not valid any more.
See DynamoDbLockRepository Javadocs for a new table structure.

@RomanAbakumov
Copy link

What are the expected spring-cloud-stream and spring-boot versions for spring-cloud-stream-binder-kinesis/4.0.0-SNAPSHOT ?

@artembilan
Copy link
Member

4.0.x and 3.0.x. Java 17.
Technically everything has to be pulled transitively when you just use:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
            <version>4.0.0-SNAPSHOT<version>
        </dependency>

@RomanAbakumov
Copy link

With 4.0.0-SNAPSHOT I'm getting error:
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: One or more parameter values were invalid: Missing the key sortKey in the item (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 9KHOJDPV4HCGFD19NIDUV9PU7RVV4KQNSO5AEMVJF66Q9ASUAAJG; Proxy: null)

works fine with 3.0.0

@artembilan
Copy link
Member

artembilan commented Feb 27, 2023

I guess you have to remove your SpringIntegrationLockRegistry table from DynamoDB and let the framework to create a fresh one for you.
As I said before: there is no that sortKey any more involved in a new DynamoDbLockRepository model.
The Spring Integration AWS must be 3.0.0-SNAPSHOT or just managed transitively by the spring-cloud-stream-binder-kinesis dependency.

@RomanAbakumov
Copy link

I see strange behavior with new version locks disappearing almost instantly, difference between created and TTL is 10 ms?
Is there any additional configuration required?
image

@artembilan
Copy link
Member

Well, that is in Epoch Seconds.
By default it is public static final Duration DEFAULT_LEASE_DURATION = Duration.ofSeconds(10);
So, what you show is correct - 10 seconds difference between create time and TTL.
Probably that is not a surprise that DynamoDB TTL feature cleans them up so quickly.

However the KinesisMessageDrivenChannelAdapter must renew them every second...

Oh! I see. There is a bug 😄

I do this over there:

				new UpdateItemSpec()
							.withPrimaryKey(KEY_ATTR, lock)
							.withUpdateExpression("SET " + TTL_ATTR + " = :ttl")
							.withConditionExpression(LOCK_EXISTS_EXPRESSION)
							.withValueMap(ownerWithCurrentTimeValues());

	private ValueMap ownerWithCurrentTimeValues() {
		ValueMap valueMap =
				new ValueMap()
						.withNumber(":ttl", currentEpochSeconds());
		valueMap.putAll(this.ownerAttribute);
		return valueMap;
	}

where new ttl must be plus leaseDuration.
On it.

@artembilan
Copy link
Member

Please, try now with -U for Maven or --refresh-dependencies for Gradle: the fresh Spring Integration AWS SNAPSHOT is already there.

Thank you for assisting in testing!

@RomanAbakumov
Copy link

Thanks for the quick feedback!

I still see locks are not live for more than a second.
As with implementation without TTL, I expect the current implementation to have numberOfStreams*numberOfShards records in the table, but I may catch only 1 or 2 at the same time, and it disappears much faster than 10 seconds, almost instantly.

@artembilan
Copy link
Member

Right. The number of records really must be equal to the number of active shard consumers.
In my case that is a single stream with two shards and I saw a proper distribution in concurrent restart.

Any advice what tool I can use locally in Windows to browse DynamoDB in Docker container?
Or probably Localstack comes with some CLI tool to sneak in?
I kinda don't not how to be sure in DB content unless I confirm the stuff with my unit tests against Localstack in Docker 😄

@RomanAbakumov
Copy link

You may try to use awscli, but you will need to set it up somehow to look into your docker dynamodb.

aws dynamodb scan --table-name yourTableName

We may have a pair session if it helps.

FYI I'm running java service locally using real AWS kinesis/dynamodb.

@artembilan
Copy link
Member

So, I have this test:

	@Test
	public void testLockRenew() {
		final Lock lock = this.dynamoDbLockRegistry.obtain("foo");

		assertThat(lock.tryLock()).isTrue();
		try {
			this.dynamoDbLockRepository.setLeaseDuration(Duration.ofSeconds(60));
			assertThatNoException().isThrownBy(() -> this.dynamoDbLockRegistry.renewLock("foo"));
			String ttl =
					DYNAMO_DB.getItem(DynamoDbLockRepository.DEFAULT_TABLE_NAME,
									Map.of(DynamoDbLockRepository.KEY_ATTR, new AttributeValue("foo")))
							.getItem()
							.get(DynamoDbLockRepository.TTL_ATTR).getN();
			assertThat(Long.parseLong(ttl))
					.isCloseTo(LocalDateTime.now().plusSeconds(60).toEpochSecond(ZoneOffset.UTC),
							Percentage.withPercentage(10));
		}
		finally {
			lock.unlock();
			this.dynamoDbLockRepository.setLeaseDuration(Duration.ofSeconds(2));
		}
	}

It confirms that expireAt attribute is updated respectively when we call renew().

And yes the localstack/localstack:1.2.0 includes an aws CLI. I can get into it from my Docker Desktop terminal.
Have some problems with AWS credentials yet, but that's different story.
Playing with the concurrent test for 10 shards.

And no: I don't have any AWS accounts to test against real one. 😄

@artembilan
Copy link
Member

OK. Something is still wrong with my renew algorithm.
With regular tryLock() it looks good and sometime I see shards distribution between different instances.

Looking further for renew problem...

@artembilan
Copy link
Member

So, I made some tweaks and testing.
Please, give a shot for latest Spring Integration AWS SNAPSHOT.

Note: I still could not figure out why AWS CLI in Localstack container fails with that The security token included in the request is invalid.
I did an aws configure against values from the LocalStackContainer, but it still doesn't like it. 🤷

@RomanAbakumov
Copy link

It looks like records are expiring rite after creation, instantly.
I see now TTL was increased from the initial 10 seconds to 60, but it doesn't affect record life at all.

@artembilan
Copy link
Member

Hm. So, sounds like LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) from Java is not exactly what is Epoch Seconds on AWS.
And also it feel like Localstack does not support DynamoDB TTL feature.

Would you mind to double check with your AWS env how our expireAt is compared to its current time?

You probably can disable TTL on the table for now though.
Or we can consider to make it optional when the framework creates table for us.

@RomanAbakumov
Copy link

Here is the value that I see rite now
image
and it translates to:
GMT: Tuesday, February 28, 2023 12:21:58 PM
Your time zone: Tuesday, February 28, 2023 7:21:58 AM GMT-05:00
Relative: 5 hours ago

So it is some issue with timezone.
In my local System.currentTimeMillis() gives: 1677604766750

@RomanAbakumov
Copy link

another observation, if I set JVM timezone to GMT it starts working as expected
like: -Duser.timezone="GMT"

@artembilan
Copy link
Member

OK. Have just pushed Instant.now().getEpochSecond() fix.
Give it 5-6 mins to have a SNAPSHOT published.

@RomanAbakumov
Copy link

Looks like it works fine now, thank you!

Griffin1989106 added a commit to Griffin1989106/SpringWithAWS that referenced this issue Jul 22, 2024
Fixes spring-cloud/spring-cloud-stream-binder-aws-kinesis#186

The `DynamoDbLockClient` waits extra `leaseDuration` time in a loop breaking a `tryLock()` contract.

* Fix `DynamoDbLockRegistry.tryLock()` to decrease an actual `additionalTimeToWait` by `leaseDuration`,
so the target `DynamoDbLockClient` when it adds this `leaseDuration` will wait an actual timeout requested
by the `tryLock()` contract.
This way a `tryLock(0)` will definitely return immediately since we really are not interested in blocking

**cherry-pick to 2.5.x**
guygriffin1989106 added a commit to guygriffin1989106/SpringWithAWS that referenced this issue Jul 26, 2024
Fixes spring-cloud/spring-cloud-stream-binder-aws-kinesis#186

The `DynamoDbLockClient` waits extra `leaseDuration` time in a loop breaking a `tryLock()` contract.

* Fix `DynamoDbLockRegistry.tryLock()` to decrease an actual `additionalTimeToWait` by `leaseDuration`,
so the target `DynamoDbLockClient` when it adds this `leaseDuration` will wait an actual timeout requested
by the `tryLock()` contract.
This way a `tryLock(0)` will definitely return immediately since we really are not interested in blocking

**cherry-pick to 2.5.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants