-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: register correct unwrapped schema #6188
fix: register correct unwrapped schema #6188
Conversation
This commit fixes several issues and refactors a lot of the serde code around wrapping and unwrapping single values. The main issues being fixed are: 1. allow each format to define if it supported wrapping and/or unwrapping. (Not possible with current design) 2. pass the correct wrapping / unwrapping flags are passed to key vs value formats when creating serde. (bug in code passes same SerdeOptions to key and value). 3. register the correct wrapped / unwrapped schema with the SR. (bug in existing code meant registered format is always wrapped). At the same time, the way wrapping / unwrapping was handled in the code wasn't great. Formats like `JSON` needed to be able to handle both wrapped and unwrapped schemas and values, depending on whether the user _explicitly_ set wrapping or unwrapping, vs the default behaviour of the format. This commit refactors the code such that the format will always be passed the a consistent schema and the set of serde features the format should use when creating the serde. This simplifies things and paves the way to user-define-serde.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM I reviewed this change with a somewhat relaxed lens (it's hard to dig into these refactors at the code level) but what I checked looks good to me and your description is an overall positive change! Would have been nice to split it up into a few smaller PRs though 😅
ksqldb-common/src/main/java/io/confluent/ksql/schema/ksql/SchemaConverters.java
Outdated
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/serde/EnabledSerdeFeatures.java
Show resolved
Hide resolved
ksqldb-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java
Show resolved
Hide resolved
ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSchemas.java
Outdated
Show resolved
Hide resolved
final boolean unwrapSingle = serdeOptions.valueWrapping() | ||
.map(option -> option == SerdeOption.UNWRAP_SINGLE_VALUES) | ||
.orElse(false); | ||
if (schema.features().enabled(SerdeFeature.UNWRAP_SINGLES)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems a little weird to encapsulate this above (applySinglesUnwrapping
) but then check for it explicitly here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? applySinglesUnwrapping(outerSchema)
only extracts the inner schema. That's only half the storey. The code still needs to build a serde that can handle the unwrapping, i.e. extracting the value of the single column from the Struct
passed to serialize
and the reverse for deserialize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like if I'm already checking the UNWRAP_SINGLES
variable here, i might as well just extract the inner schema there as well. Not a biggie, but now we're checking it in two places and doing a no-op in one of them if it's disabled.
ksqldb-functional-tests/src/test/resources/query-validation-tests/elements.json
Show resolved
Hide resolved
Thanks for the review @agavra. Yeah, it was a big one. Would of been a lot of work to break it into separate PRs and would have take a long time to chain the reviews together. It was definitely a one of those times when you pull on a thread and the thing just keeps unraveling... |
Following on from the fixes and refactors done in confluentinc#6188, this commit pushes down the use of the Connect schema type to lower levels of the code. Higher levels of the code now deal with `LogicalSchema`, `PersistentSchema` or just a `List<SimpleColumn>`. This moves us closer to removing the Connect schema from the code base, except in the Serde code that deals with connect formats. `LogicalSchema` and `PersistentSchema` no longer know about the Connect schema type. Calls to retrieve the Connect schema from these types have been replaced with a util function that can convert a list of columns into a Struct Connect schema. As more code moves away from the Connect schema these util function calls will slowly be removed.
* refactor: push ConnectSchema down Following on from the fixes and refactors done in #6188, this commit pushes down the use of the Connect schema type to lower levels of the code. Higher levels of the code now deal with `LogicalSchema`, `PersistentSchema` or just a `List<SimpleColumn>`. This moves us closer to removing the Connect schema from the code base, except in the Serde code that deals with connect formats. `LogicalSchema` and `PersistentSchema` no longer know about the Connect schema type. Calls to retrieve the Connect schema from these types have been replaced with a util function that can convert a list of columns into a Struct Connect schema. As more code moves away from the Connect schema these util function calls will slowly be removed. Co-authored-by: Andy Coates <[email protected]>
Description
This commit fixes several issues and refactors a lot of the serde code around wrapping and unwrapping single values. This needs to be done before we can support key formats that use wrapping / unwrapping.
The main issues being fixed are:
At the same time, the way wrapping / unwrapping was handled in the code wasn't great. Formats like
JSON
needed to be able to handle both wrapped and unwrapped schemas and values, depending on whether the user explicitly set wrapping or unwrapping, vs the default behaviour of the format. This commit refactors the code such that the format will always be passed the a consistent schema and the set of serde features the format should use when creating the serde. This simplifies things and paves the way to user-define-serde.Reviewing notes:
There's some refactoring of how physical and persistence schemas are created and used:
SerdeOptions
can now split the global options into key and value specificEnabledSerdeFeatures
where,EnabledSerdeFeatures
is a set ofSerdeFeature
s that have been validated to ensure there are no clashing features (currently checking wrap and unwrap are not both set).PhysicalSchema
is now really just a combinationLogicalSchema
andSerdeOptions
.PersistenceSchema
is still using the Connect schema internally for now, but that schema will always be aSTRUCT
containing the key/value columns. The instance also tracks the key/value serde features.How serde are created has been refactored:
KsqlSerdeFactory
is no more. Instead,Format
interface, which now has acreateSerde
method, rather than one that returns aKSqlSerdeFactory
. This is much cleaner and simple.KsqlSerdeFactory
are now just implementation details of specific formats, and are free to be changed as needed.Format
deals in terms ofSerdeFeature
s, notSerdeOptions
.GenericRowSerde
andGenericKeySerde
are no longer responsible for handling wrapping and unwrapping, they just pass down and expectStruct
. (Common code has moved toGenericSerdeFactory
).Format.createSerde
can return correctly typedSerde<Struct>
, rather than some unknown maybeStruct
maybe some primitive.KafkaFormat
andDelimitedFormat
can now support unwrapping and don't need to do anything to support it.ConnectFormat
handles this by extracting the single field's schema and wrapping the serde created by the sub-class with serde that handle the unwrapping.Testing done
usual
Reviewer checklist