Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis binder locks timed out and messages not delivered #190

Closed
RomanAbakumov opened this issue Apr 27, 2023 · 17 comments
Closed

Kinesis binder locks timed out and messages not delivered #190

RomanAbakumov opened this issue Apr 27, 2023 · 17 comments

Comments

@RomanAbakumov
Copy link

RomanAbakumov commented Apr 27, 2023

We use Spring Cloud stream with Kinesis, sometimes we see that messages are not delivering.
After investigation, we found that the locks are not processing correctly, and we have messages about it in the logs.

To Reproduce
Steps to reproduce the behavior:

  1. Clone the repository https://github.com/RomanAbakumov/kinesis-lock-issues-demo
  2. run localstack using startLocalstack.cmd/startLocalstack.sh
  3. start application using mvnw spring-boot:run
  4. Wait for app to create a streams and tables and start producing and receiving messages, wait for about 5 minutes
    you should see messages like:
    Message sent: Message: 3
    Received message: Message: 3
    Message sent: Message: 4
  5. Start 5 other instances using mvnw spring-boot:run
    At this point only first instance (started on step 3) receiving all the messages and allocate the locks
  6. Stop application started on step 3 by using kill -TERM or closing the window.

Version of the framework
org.springframework.cloud:spring-cloud-dependencies:2022.0.1
spring-cloud-stream-binder-kinesis: 3.0.0
Expected behavior
Locks allocated by running nodes and messages are received
Observed behavior
Messages lost, delayed.
Timeouts in the logs:
`2023-04-27T12:25:37.735-04:00 INFO 31356 --- [is-dispatcher-1] a.i.k.KinesisMessageDrivenChannelAdapter : The lock for key 'event-group:stream-v0:shardId-000000000009' was not renewed in time

java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095) ~[na:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.renewLockIfAny(KinesisMessageDrivenChannelAdapter.java:1035) ~[spring-integration-aws-2.5.4.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.execute(KinesisMessageDrivenChannelAdapter.java:947) ~[spring-integration-aws-2.5.4.jar:na]
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerDispatcher.run(KinesisMessageDrivenChannelAdapter.java:857) ~[spring-integration-aws-2.5.4.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
`
Additional context
The issue is likely happening when shard locks is distributed between multiple nodes, it is not observed when all locks are allocated by one instance.
We running 2 pods listening stream with 4 shards, sometimes messages are not delivered, and it may be remediated by clearing the locks table. When locks are allocated to different pods timeout messages like the above appear in the logs.

The behavior is not consistent, sometimes it works as it should, and sometimes message delivery is stuck, but using a linked project it is reproducible almost all the time.

You may check lock distribution by configuring test credentials in .aws/config like:
aws_access_key_id=test
aws_secret_access_key=test
and executing getLockTableContent.sh/getLockTableContent.cmd

@artembilan artembilan transferred this issue from spring-cloud/spring-cloud-stream Apr 27, 2023
@artembilan
Copy link
Member

Duplicate of #186.

Please, consider to upgrade to the latest Kinesis Binder: https://spring.io/blog/2023/03/27/spring-integration-for-aws-3-0-0-m2-and-spring-cloud-stream-kinesis-binder-4.

For the current version I'd suggest to use a short leaseDuration.

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Apr 27, 2023
@RomanAbakumov
Copy link
Author

Thanks for the quick reply.
I've tried to reduce the leaseDuration, but it didn't help.
I will try with the new milestone version libs, do you have an estimation of when it will be released?

@artembilan
Copy link
Member

In May. Or better to say when Spring Cloud AWS 3.0 is GA.

@RomanAbakumov
Copy link
Author

@artembilan
I've tried to use mentioned versions, but while it sends and receive a messages I getting exceptions, and it was not happening with the old versions.
Maybe I need to configure the endpoint for local stack in the different place, but the message is misleading in this case.

To Reproduce
Steps to reproduce the behavior:

  1. Clone the repository https://github.com/RomanAbakumov/kinesis-lock-issues-demo
  2. run localstack using startLocalstack.cmd/startLocalstack.sh
  3. start application using mvnw spring-boot:run

Exception
`2023-05-01T12:20:10.378-04:00 ERROR 25124 --- [-response-1-316] o.s.i.a.outbound.KinesisMessageHandler : Failed to send async reply: GenericMessage [payload=byte[11], headers={errorChannel=org.springframework.integration.channel.PublishSubscribeChannel@4b7587bb, aws_shard=shardId-000000000015, aws_sequenceNumber=49640352981821805482850094172893096755799942984020001010, aws_serviceResult=PutRecordResponse(ShardId=shardId-000000000015, SequenceNumber=49640352981821805482850094172893096755799942984020001010, EncryptionType=NONE), id=469a6c17-5f59-2b0b-2e04-eaa39c0e9594, contentType=application/json, target-protocol=kafka, timestamp=1682958010378}]

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:479) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:581) ~[spring-integration-core-6.0.3.jar:6.0.3]
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:563) ~[spring-integration-core-6.0.3.jar:6.0.3]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56) ~[sdk-core-2.20.30.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69) ~[sdk-core-2.20.30.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177) ~[sdk-core-2.20.30.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105) ~[sdk-core-2.20.30.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[na:na]
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163) ~[sdk-core-2.20.30.jar:na]
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

2023-05-01T12:20:10.379-04:00 ERROR 25124 --- [-response-1-316] o.s.integration.handler.LoggingHandler : org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:479)
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:581)
at org.springframework.integration.handler.AbstractMessageProducingHandler$ReplyFutureCallback.accept(AbstractMessageProducingHandler.java:563)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:69)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:177)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
`

@RomanAbakumov
Copy link
Author

RomanAbakumov commented May 1, 2023

I've tried to debug it to find the root cause, but I just see that some remote call is failing because of wrong endpoint, however it is still unclear why it uses the wrong one.

@artembilan
Copy link
Member

Yeah... That one was fixed in the SNAPSHOT: 8223c8d.

Was not released yet.

Can you try against spring-cloud-stream-binder-kinesis-4.0.0-SNAPSHOT, please?

@RomanAbakumov
Copy link
Author

Yeah, I confirm, this error doesn't appear with snapshot, thanks

@RomanAbakumov
Copy link
Author

While it works as it should, with no errors, and shard consumers able to allocate unused nodes I faced another issue that may need attention.
My laptop runs localstack and 3 app instances, I put it to sleep and when I wake it up I see that one app instance reports errors, and it can't recover of it for more than 30 minutes.

2023-05-01T14:54:10.459-04:00 INFO 16112 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception java.util.concurrent.CompletionException: software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException: The shard iterator has expired. Shard iterators are only valid for 300 seconds (Service: Kinesis, Status Code: 400, Request ID: AKSSPO8S0D2O3C4WWIF4QKMEL8VJSL2NANQN52N899PIFJFO7I09) during [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49640352981598798030864787941543021567332992185863766098', timestamp=null, stream='stream-v0', shard='shardId-000000000005', reset=false}, state=CONSUME}] task invocation.
Process will be retried on the next iteration.

@artembilan
Copy link
Member

Got it!
Please, raise a new GH issue in the https://github.com/spring-projects/spring-integration-aws project with more info.

It looks like we we don't catch this kind of errors in the KinesisMessageDrivenChannelAdapter.

@RomanAbakumov
Copy link
Author

it looks like the locks were holding, but the iterator has expired as after restarting I saw that old messages started to be consumed.

@RomanAbakumovMH
Copy link

@artembilan
Kinesis binder v3 has dependency to io.awspring.cloud:spring-cloud-starter-aws and io.awspring.cloud:spring-cloud-aws-autoconfigure that allow to specify AWS configuration in application.yaml such as:
cloud:
aws:
region:
static: us-east-1

Binder v4 doesn't have those dependencies so this configuration is not working.
How I can get it working with v4 binder, should I manually add dependencies?

@artembilan
Copy link
Member

has dependency to io.awspring.cloud:spring-cloud-starter-aws and io.awspring.cloud:spring-cloud-aws-autoconfigure

That's not correct. You can find its POM here: https://central.sonatype.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kinesis/3.0.0

As you see there is spring-cloud-starter-aws, but no spring-cloud-aws-autoconfigure.

So, what we did in v4 is just changed that dep to spring-cloud-aws-starter.
And looks like that spring-cloud-aws-autoconfigure is still there: https://github.com/awspring/spring-cloud-aws/blob/main/spring-cloud-aws-starters/spring-cloud-aws-starter/pom.xml#L17-L20.

Probably those props are changed.
In my tests I use:

spring.cloud.aws.region.static

See this Microservices Pattern implementation where I really use that Kinesis Binder v4: https://github.com/artembilan/microservices-patterns-spring-integration/tree/main/distributed-tracing

@RomanAbakumovMH
Copy link

RomanAbakumovMH commented May 15, 2023

Looks like property to configure it was changed from "cloud.aws.region.static" to "spring.cloud.aws.region.static".
Thanks for your help.

@RomanAbakumovMH
Copy link

@artembilan
I'm trying to integrate app with new binder, but I'm having issues with message headers:
Caused by: java.lang.IllegalStateException: Failed to establish route, since neither were provided: 'spring.cloud.function.definition' as Message header or as application property or 'spring.cloud.function.routing-expression' as application property. Incoming message: GenericMessage [payload=byte[566], headers={aws_shard=shardId-000000000000, id=637da3b1-b3db-71ab-4fbb-4446b4a432a3, ................

It worked perfectly fine with 3.0 binder, and header is populated on the sending side.
Both sending and receiving binders is configures with needed header like:
s.c.s.kinesis.binder.headers:
- spring.cloud.function.definition

So it worked before when all apps used 3.0 binders, no changes except binder.

Do you have any idea why it can happen?

PS: I've tried to add headers to test application referenced in this ticket and it worked fine, so is it some kind of compatibility of the 3.0 vs 4.0 binders?

@artembilan
Copy link
Member

@RomanAbakumov ,

Please, raise a new issue with more details.
I'd like to see some code to determine what is wrong.

Are you really sure that you produce a message with header?
The mapping is just a part of the puzzle: if you don't produce that header, there is just nothing to map onto Kinesis record.

@RomanAbakumovMH
Copy link

Yes, we use single event stream for different kind of messages and routing based by headers.
I'll try to reproduce it in demo app.

@RomanAbakumovMH
Copy link

New bug reported #192

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants