Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration support for glueSchemaRegistry #206

Closed
jaychapani opened this issue May 25, 2022 · 19 comments
Closed

Integration support for glueSchemaRegistry #206

jaychapani opened this issue May 25, 2022 · 19 comments

Comments

@jaychapani
Copy link

Hi there,

I'm trying to integrate glueSchemaRegistry with Kinesis using KPL/KCL.

Upon debugging the code I came across these lines of code and found that we don't have a way to pass the schema along the UserRecord which is calling KinesisProducer.addUserRecord(). The following piece of code is never execute because of null schema object.

if (schema != null && data != null) {
  if (schema.getSchemaDefinition() == null || schema.getDataFormat() == null) {
      throw new IllegalArgumentException(
          String.format(
              "Schema specification is not valid. SchemaDefinition or DataFormat cannot be null. SchemaDefinition: %s, DataFormat: %s",
              schema.getSchemaDefinition(),
              schema.getDataFormat()
          )
      );
  }
  GlueSchemaRegistrySerializer serializer = glueSchemaRegistrySerializerInstance.get(config);
  byte[] encodedBytes = serializer.encode(stream, schema, data.array());
  data = ByteBuffer.wrap(encodedBytes);
}
@artembilan
Copy link
Member

There is the logic like this in the component:

else if (message.getPayload() instanceof UserRecord) {
	return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}

So, you can build that UserRecord together with the provided Schema upstream and pass it as a payload in the message to produce.

Does it make sense?

@jaychapani
Copy link
Author

The message that we receive is of type GenericMessage

@artembilan
Copy link
Member

That's not related.
I suggest you to have a simple POJO (or lambda) transformer upfront which would create you an UserRecord with desired Schema.
Why you talk about Message is not clear.

Please, elaborate.

@jaychapani
Copy link
Author

Sorry - Message payload is of type byte[]. I'm not following your suggestion to transform payload to UserRecord. It would be nice if you can help me with pointers or example.

@artembilan
Copy link
Member

Since you point to the code in the KplMessageHandler like this:

				UserRecord userRecord = new ();
				userRecord.setExplicitHashKey(putRecordRequest.getExplicitHashKey());
				userRecord.setData(putRecordRequest.getData());
				userRecord.setPartitionKey(putRecordRequest.getPartitionKey());
				userRecord.setStreamName(putRecordRequest.getStreamName());
				return handleUserRecord(message, putRecordRequest, userRecord);

It is just a fact that you need to do something similar to create UserRecord and populate a Schema in some your service (or transformer) code upfront into this UserRecord, before sending to this KplMessageHandler.

See docs for more info: https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#transformer-annotation.

Your story sounds more like StackOverflow question.
Please, consider in the future to start from there: we use GitHub for bugs and feature requests.

Anyway: what is your code? How do you send that byte[] to this KplMessageHandler and why can't you do the same but with already UserRecord instead and with requested Schema ?

@jaychapani
Copy link
Author

jaychapani commented May 26, 2022

I am using spring cloud steam with spring-cloud-stream-binder-aws-kinesis and trying to integrate AWS glue schema registry. This piece of code is converting object to byte[] which is being passed to KplMessageHandler in the code flow which is resulting in above issue. Going forward I will post such things on StackOverflow.

@artembilan
Copy link
Member

OK. That's much better and thank you for more info.

Does that convert your data correctly using its Avro approach?
I wonder if you can modify your function to return a UserRecord converted manually instead...

Maybe there is a chance that you can share with us a simple project revealing the problem?

@jaychapani
Copy link
Author

@artembilan - Thanks for your help through this.
Yes data conversion is working fine. I'm having trouble integrating with Glue Schema registry so that payload have injected value from schema which will help in serialization and deserialization. Here is the test repo that I'm using.

@artembilan
Copy link
Member

I see in your code this.customerQueue.offer(customer);.
How about calling the MessageConverter before that, create a UserRecord and populate desired Schema into it before posting to the queue?

Side question: is it really correct to convert via Apache Avro first and then re-convert with that Glue Schema?
Although it is not clear from where then we take a byte[] for the Glue Schema in the end by that UserRecord contract...

@artembilan
Copy link
Member

How about to use an AWSKafkaAvro(De)Serializer instead : https://stackoverflow.com/q/72420787/2756547 ?

@artembilan
Copy link
Member

Forget my previous question: it is not related since it talks about Apache Kafka.

So, here is a doc: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds.

And they indeed shows that we have to use org.apache.avro.Schema.Parser to serialize data into byte[]. And I guess that our AvroSchemaMessageConverter does that trick.
Then their sample goes to the producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema); indicating that data is already serialized by Avro, but schema has to be propagated anyway for the proper Glue logic.
Therefore my suggestion to have your addCustomer() to produce a UserRecord manually serialized and supplied with a schema is the way to go for now as a workaround.

I guess as the fix we can add an option to inject a schema via SpEL expression as we do for many other UserRecord option.

Let me double check the code how proposed workaround can work!

@artembilan
Copy link
Member

So, if you produce a UserRecord (and of course with that schema option) from your Supplier, then you need to set spring.cloud.stream.default.producer.useNativeEncoding.
This way Spring Cloud Stream won't perform the conversion itself: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties.
The OutboundContentTypeConvertingInterceptor cannot be used in this case:

	/**
	 * Unlike INBOUND where the target type is known and conversion is typically done by
	 * argument resolvers of {@link InvocableHandlerMethod} for the OUTBOUND case it is
	 * not known so we simply rely on provided MessageConverters that will use the
	 * provided 'contentType' and convert messages to a type dictated by the Binders
	 * (i.e., byte[]).
	 */
	private final class OutboundContentTypeConvertingInterceptor

It always expects conversion to be done to the byte[].

@jaychapani
Copy link
Author

I'm able to supply UserRecord to KplMessageHandler.handleMessageToAws. But now the problem is with the following code where it tries to generate PutRecordRequest using buildPutRecordRequest. How can I serialize UserRecord?

else if (message.getPayload() instanceof UserRecord) {
      return handleUserRecord(message, buildPutRecordRequest(message), (UserRecord) message.getPayload());
}

@artembilan
Copy link
Member

I see your point.
That buildPutRecordRequest() must not do any serialization since we already have the data in the UserRecord.
All your feedback is great and I guess we have to fix this respectively.

Meanwhile as another workaround, in addition to what we have so far with you about UserRecord propagation, here is what you can do for that serialization which is there anyway:

		@Bean
		ProducerMessageHandlerCustomizer<KplMessageHandler> kplMessageHandlerCustomizer() {
			return (handler, destinationName) -> handler.setMessageConverter(new MessageConverter() {

				@Override 
				public Object fromMessage(Message<?> message, Class<?> targetClass) {
					return ((UserRecord) message.getPayload()).getData().array();
				}

				@Override 
				public Message<?> toMessage(Object payload, MessageHeaders headers) {
					return null;
				}
				
			});
		}

@jaychapani
Copy link
Author

Thanks, @artembilan - The suggestion that you provided above worked. Is there a way to get the schema and partition key like the way we are getting in buildPutRecordRequest ()?

@artembilan
Copy link
Member

Sorry, not sure in your question. I assumed since you have managed to create a UserRecord in your own code, then you know how to provide a partitionKey and schema for that instance from your code.

Please, share what you have so far and how you provide the partitionKey for a Spring Cloud Stream producer binding.
By default it is like this:

		FunctionExpression<Message<?>> partitionKeyExpression =
				new FunctionExpression<>((m) ->
						m.getHeaders().containsKey(BinderHeaders.PARTITION_HEADER)
								? m.getHeaders().get(BinderHeaders.PARTITION_HEADER)
								: m.getPayload().hashCode());

I'm not sure from where to take that schema for Glue though.

The AWS official doc shows only this for KPL sample:

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
    new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

They also have this sample:

Schema gsrSchema =
        new Schema(dataFormatSerializer.getSchemaDefinition(record), dataFormat.name(), "MySchema");

The docs is still the one I have shared before: https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds

@jaychapani
Copy link
Author

I have pushed my latest code to the sample repo here. Currently to validate the changes I have hardcoded the partitionKey and stream. Sorry mentioned schema in the above question. I need to get both the values dynamically similar to buildPutRecordRequest().

@artembilan
Copy link
Member

Not sure what you want to hear from me since there is nothing what the framework can help, but there is indeed cannot be.
You call the REST controller you you end up here:

 public void addCustomer(Customer customer) throws IOException {

So, we got only the Customer info at this point.

Perhaps you can inject a BindingServiceProperties and take the specific binding properties from there for this your producer logic:

ConsumerProperties consumerProperties = this.bindingServiceProperties.getConsumerProperties(inputName);

And that one may give you an access to the stream and partitionKey (if configured).
You probably can also have an extra configuration property for schema as well if that can be configured some way.

As the fix for this issue I definitely going to expose a schema evaluation property on the KplMessageHandler and extract it for configuration from the Kinesis binder perspective.

@jaychapani
Copy link
Author

Thank you @artembilan

Griffin1989106 added a commit to Griffin1989106/SpringWithAWS that referenced this issue Jul 22, 2024
Fixes spring-projects/spring-integration-aws#206

* Provide a Glue Schema support for `KplMessageHandler`.
When `UserRecord` is built from the request message, a
`glueSchemaExpression` is evaluated to supply into a `UserRecord`
to produce.
The `GlueSchemaRegistryConfiguration` must be supplied into a `KinesisProducerConfiguration`
* Fix the `buildPutRecordRequest()` algorithm to copy the data
from the `UserRecord` in the request message payload instead of
false-leading expression evaluations and conversions from the request message

**Cherry-pick to `2.5.x`**
guygriffin1989106 added a commit to guygriffin1989106/SpringWithAWS that referenced this issue Jul 26, 2024
Fixes spring-projects/spring-integration-aws#206

* Provide a Glue Schema support for `KplMessageHandler`.
When `UserRecord` is built from the request message, a
`glueSchemaExpression` is evaluated to supply into a `UserRecord`
to produce.
The `GlueSchemaRegistryConfiguration` must be supplied into a `KinesisProducerConfiguration`
* Fix the `buildPutRecordRequest()` algorithm to copy the data
from the `UserRecord` in the request message payload instead of
false-leading expression evaluations and conversions from the request message

**Cherry-pick to `2.5.x`**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants