Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V3 binder messages not compatible with v4 version binder #192

Closed
RomanAbakumovMH opened this issue May 17, 2023 · 20 comments
Closed

V3 binder messages not compatible with v4 version binder #192

RomanAbakumovMH opened this issue May 17, 2023 · 20 comments

Comments

@RomanAbakumovMH
Copy link

Between 3.0.0 and 4.0.0

Describe the bug

Messages sent using v3 binder version are not compatible with v4 version.

To Reproduce

  1. Clone the repository https://github.com/RomanAbakumov/kinesis-lock-issues-demo
  2. switch to v3-with-headers branch
  3. run localstack using startLocalstack.cmd/startLocalstack.sh
  4. start application using mvnw spring-boot:run
  5. switch to the master branch
  6. start application using mvnw spring-boot:run

Expected behavior

Both running instances should receive messages from each other.

Actual behavior
messages did not deserialize properly
in the app with V4 binder log I see:

Received message: ���contentType �"application/json" spring.cloud.function.definition �"asd"Message: 46
Received message: Message: 25

The first message is from V3 binder, and second is from V4

@RomanAbakumovMH
Copy link
Author

This may be related:
I see this error in DatabindContext class method reportBadDefinition, but it doesn't appear in the logs.
No serializer found for class software.amazon.awssdk.services.kinesis.model.Record and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)

@artembilan
Copy link
Member

OK. I cannot reproduce it in my own test.
I mean it works as designed:

2023-05-17T15:29:13.438-04:00 TRACE [distributed-tracing-pattern,64652b08bfc98bb5b73d0547af49468c,b1a6ce1743b41545] 3216 --- [esis-consumer-1] o.s.i.microservices.distributedtracing   : Received message from Kinesis: GenericMessage [payload=foo, headers={replyChannel=nullChannel, errorChannel=, aws_shard=shardId-000000000000, traceparent=00-64652b08bfc98bb5b73d0547af49468c-b73d0547af49468c-01, id=d7fa9c14-2b29-45de-e3bf-f3ac7cbf1e63, sourceData=Record(SequenceNumber=49640854094218006061386703884442635850037981994148691970, ApproximateArrivalTimestamp=2023-05-17T19:29:12.656Z, Data=SdkBytes(bytes=0x000000b07b2240636c617373223a226a6176612e7574696c2e486173684d6170222c227472616365706172656e74223a2230302d36343635326230386266633938626235623733643035343761663439343638632d623733643035343761663439343638632d3031222c22737072696e672e636c6f75642e66756e6374696f6e2e646566696e6974696f6e223a22617364222c22636f6e74656e7454797065223a226170706c69636174696f6e2f6a736f6e227d00000003666f6f), PartitionKey=2058211984, EncryptionType=NONE), spring.cloud.function.definition=asd, contentType=application/json, aws_receivedPartitionKey=2058211984, aws_receivedStream=SOME_STREAM, aws_receivedSequenceNumber=49640854094218006061386703884442635850037981994148691970, timestamp=1684351753438}]

As you see that header spring.cloud.function.definition=asd is present in the consumed data.
And then my JMS producer fails:

jakarta.jms.JMSRuntimeException: AMQ139012: The property name 'spring.cloud.function.definition' is not a valid java identifier.
	at org.apache.activemq.artemis.jms.client.ActiveMQMessage.checkProperty(ActiveMQMessage.java:911) ~[artemis-jakarta-client-2.28.0.jar:2.28.0]

Will look into your application now...

@artembilan
Copy link
Member

artembilan commented May 17, 2023

Well, I'm not sure what we are talking here about.
I placed a breakpoint into your:

	@Bean
	public Consumer<Message<String>> messages() {
		return message -> System.out.println("Received message: " + message.getPayload());
	}

and this is what I see in debug:

payload = "Message: 0"
headers = {MessageHeaders@10547}  size = 9
 "aws_shard" -> "shardId-000000000005"
 "id" -> {UUID@10570} "b31159d2-e2f4-5de7-f0d7-f26941af2a8f"
 "sourceData" -> {Record@10572} "Record(SequenceNumber=49640854856479777692361933485273813201612158648253939794, ApproximateArrivalTimestamp=2023-05-17T20:04:46.218Z, Data=SdkBytes(bytes=0x000000687b2240636c617373223a226a6176612e7574696c2e486173684d6170222c22737072696e672e636c6f75642e66756e6374696f6e2e646566696e6974696f6e223a22617364222c22636f6e74656e7454797065223a226170706c69636174696f6e2f6a736f6e227d0000000a4d6573736167653a2030), PartitionKey=508538808, EncryptionType=NONE)"
 "spring.cloud.function.definition" -> "asd"
 "contentType" -> {MimeType@10576} "application/json"
 "aws_receivedPartitionKey" -> "508538808"
 "aws_receivedStream" -> "stream-v0"
 "aws_receivedSequenceNumber" -> "49640854856479777692361933485273813201612158648253939794"
 "timestamp" -> {Long@10584} 1684353900771

So, what do I miss about that header propagation?
Perhaps you got confused that message was not deserialized properly before for that Consumer<Message<String>>, but now we do that on a Kinesis channel adapter and then let Spring Cloud Stream deal with the rest of byte[] for the payload.

@RomanAbakumovMH
Copy link
Author

Are you running the applications from 2 different branches at the same time?

You should receive the messages from application that is run on binder v3 in application that is running binder v4.

@artembilan
Copy link
Member

Oh! No. That is not going to work.
Since this is fully new version such a breaking change is expected: those binary serialization formats are fully different.
We really cannot accept data in v4 from v3.

We probably can come up with some re-mapper processor, but it is not clear where in your architecture you can place it...
Sorry for inconvenience, but that indeed was a decision to make such a breaking change since we have already enough of them after moving to AWS SDK v2 and so on.

@RomanAbakumovMH
Copy link
Author

That was not expected, I remember migrating from V2 to V3 was easy and fully compatible.
I'm not sure if it is feasible to fully move to V4 binder, and it is not released yet.

Do you know if it is possible to fix the locking issue in V3 binder? #190
You may reproduce the issue on the v3-with-headers branch I believe.

@artembilan
Copy link
Member

It was easy because there was no any crucial dependency upgrades. Mostly just Java version migration.
The headers embedding logic remained the same.
I probably need to think about this more and come up with idea how to determine the format and remap from old to new.
Or just bring the old format back...

Fortunately the cat is not out of the bag yet as you said that don't have GA version released.

Unfortunately we cannot fix that problem in previous version since the real problem is out of our control in that DynamoDB Lock Client library.

You can use, though, some other LockRegistry impl instead:
https://docs.spring.io/spring-integration/docs/current/reference/html/redis.html#redis-lock-registry
https://docs.spring.io/spring-integration/docs/current/reference/html/jdbc.html#jdbc-lock-registry
https://docs.spring.io/spring-integration/docs/current/reference/html/zookeeper.html#zk-lock-registry
https://docs.spring.io/spring-integration/docs/current/reference/html/hazelcast.html#hazelcast-lock-registry

@RomanAbakumovMH
Copy link
Author

That is interesting, do you have any example of how to use 3rd party lock registry with kinesis?

@artembilan
Copy link
Member

Just declare a respective bean in your configuration.
I usually do that in units tests:

		@Bean
		public LockRegistry lockRegistry() {
			return new DefaultLockRegistry();
		}

The KinesisBinderConfiguration has a conditional bean for its default DynamoDbLockRegistry:

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnBean(DynamoDbAsyncClient.class)
	@ConditionalOnProperty(name = "spring.cloud.stream.kinesis.binder.kpl-kcl-enabled", havingValue = "false",
			matchIfMissing = true)
	public LockRegistry dynamoDBLockRegistry(

@artembilan
Copy link
Member

So, as I promised I came up with something like LegacyEmbeddedHeadersSupportBytesMessageMapper which tries to deserialize an old embedded headers format via EmbeddedHeaderUtils used in the previous binder version and then it falls back to a new format in the EmbeddedJsonHeadersMessageMapper.
Thanks to your awesome sample and instructions I got this in your master branch logs after running:

Message sent: Message: 4
Received message: Message: 4
Received message: Message: 74
Message sent: Message: 5
Received message: Message: 5
Message sent: Message: 6
Received message: Message: 6
Received message: Message: 76
Received message: Message: 75

This confirms that messages produced from v3 are properly deserialized in v4, while new format is also there.

The v3 branch has this:

Received message: h{"@class":"java.util.HashMap","spring.cloud.function.definition":"asd","contentType":"application/json"}
                                                                                                                           Message: 22
Message sent: Message: 93
Received message: h{"@class":"java.util.HashMap","spring.cloud.function.definition":"asd","contentType":"application/json"}
                                                                                                                           Message: 23
Message sent: Message: 94
Received message: Message: 94

Which also says us that new format is received, but it cannot be properly deserialized.
And that is really expected.

Not sure what else we can do, so I'm going to push that fix to let you to test on yourself.

Some not re. your app: I had to modify your application.yaml to this:

    aws:
      region:
        static: us-east-1
      endpoint: http://localhost:4566

In the bottom section and remove cloud.aws on the top since that one is not supported any more by Spring Cloud AWS auto-config.

@artembilan
Copy link
Member

Fixed via: 56d6212

@artembilan
Copy link
Member

The 4.0.0-SNAPSHOT is available now.

Thank you for all your hard testing of this new version!

Let us know if you got more issues!

We are going to release GA next week.

@RomanAbakumovMH
Copy link
Author

Thanks @artembilan I'm trying with JdbcLockRegistry for now, and will see how it goes, anyway I think it is really nice that you can support the previous format.

@RomanAbakumovMH
Copy link
Author

This is just a thought, maybe it would be nice to configure on the stream/binder level to use legacy format so produced messages can be read by older clients.

@artembilan
Copy link
Member

Yes. That is also possible since we have already that API to support them.
My only concern if we will stuck with that forever and no one will move to a new format because we will produce an old one and won't know when we really done with old data to migrate to a new format 😄

@RomanAbakumovMH
Copy link
Author

This depends on the project but in microservice architecture, it will be very hard to update the binder without synchronous redeployment of all microservices, which is not always possible.

I'm not sure why a new format is needed, but I think it is better to stick to the old format if possible or add support for the new formats to old binders too, so it may support both formats transparent for the client. Same stream may be read by multiple services...

@artembilan
Copy link
Member

Sure!
Let me play with that and I'll introduce something like legacyEmbeddedHeadersFormat = false|true into binder configuration properties.
It seems like Postel's Law still chases me 😄

artembilan added a commit that referenced this issue May 18, 2023
Fixes: #192

* Expose `spring.cloud.stream.kinesis.binder.legacy-embedded-headers-format` configuration property
(`false` by default)
* Add logic into a `LegacyEmbeddedHeadersSupportBytesMessageMapper` to serialize headers via `EmbeddedHeaderUtils`
when `legacyEmbeddedHeadersFormat == true`
* Use `LegacyEmbeddedHeadersSupportBytesMessageMapper` on producer endpoints as well
* Change `KinesisBinderObservationTests` configuration to verify legacy headers support
@artembilan
Copy link
Member

Now all good.
Give it several minutes to build a SNAPSHOT and there is already a new property to set:

spring.cloud.stream.kinesis.binder.legacy-embedded-headers-format=true

This way produced messages from v4 are going to be serialized via old format and v3 can receive them.

@RomanAbakumovMH
Copy link
Author

Sounds good, thank you!
As I understand incoming messages will not require any additional config and the format will be detected automatically?

@artembilan
Copy link
Member

That's correct. Since we really have all the required info in that byte[] to parse it properly on the consumer side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants