-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: force repartition on joins with SR-enabled key formats #6635
Conversation
@@ -92,6 +101,11 @@ public LogicalSchema getSchema() { | |||
|
|||
@Override | |||
public void setKeyFormat(final FormatInfo format) { | |||
// Force repartition in case of schema inference, to avoid misses due to key schema ID mismatch | |||
if (FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to fix the first inefficiency mentioned in the PR description (that we currently force repartitions even if not strictly necessary), this is the bit of logic that would be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to link this to a GH issue and place the issue inline
final String errorMsg = "Implicit repartitioning of windowed sources is not supported. " | ||
+ "See https://github.com/confluentinc/ksql/issues/4385."; | ||
final String additionalMsg = forceRepartition | ||
? " As a result, ksqlDB does not support joins on Schema-Registry-enabled key formats " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR does not add support for joining on windowed sources with SR-enableld key formats. This will be a follow-up PR.
|
||
// Then: | ||
assertThat(result, is(initialSchemaKStream)); | ||
} | ||
|
||
@Test | ||
public void shouldNotRepartitionIfRowkey() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is identical to shouldNotRepartitionIfSameKeyField
above, so the duplicate has been removed.
} | ||
}, | ||
{ | ||
"name": "stream stream left join", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test immediately above this one has the same name. I've renamed this one to indicate the difference and avoid confusion.
"valueFormat" : {"format" : "JSON"} | ||
}, | ||
{ | ||
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-KafkaTopic_Right-Reduce-changelog", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an example of the fact that we have two changelog topics here when we really only want to have one :(
"topics" : { | ||
"topics" : [ | ||
{ | ||
"name" : "_confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-repartition", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally this topic would be named _confluent-ksql-some.ksql.service.idquery_CSAS_OUTPUT_0-Join-left-repartition
(or similar) to indicate that it's the repartition topic on the left side of the join, but this would be a breaking change and I'm not sure it's worth the confusion it will save. (The reason it's unambiguous prior to this PR is because for stream-table joins, only the stream side (left side) would ever be repartitioned, but that's no longer true with this PR.)
final MaterializedFactory materializedFactory, | ||
final PartitionByParamsBuilder paramsBuilder | ||
) { | ||
final LogicalSchema sourceSchema = table.getSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried to implement this new plan step in a generic way so that it can also support user table repartitions in the future, rather than baking in assumptions such as the fact that today the logical schema (and key format) never change when this step is called, as the step is only used to enable a specific join use case. However, there's no testing for the more general table rekey functionality, so maybe it's more misleading to have the step appear general, and we should be failing if any of these assumptions are violated instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to implement it generically, we should have tests that fail if a user tries to repartition a table so I think that covers that this isn't being used for evil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Some comments inline about test coverage and a few clarifying points, otherwise I think the approach makes sense
@@ -92,6 +101,11 @@ public LogicalSchema getSchema() { | |||
|
|||
@Override | |||
public void setKeyFormat(final FormatInfo format) { | |||
// Force repartition in case of schema inference, to avoid misses due to key schema ID mismatch | |||
if (FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be good to link this to a GH issue and place the issue inline
ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKTable.java
Outdated
Show resolved
Hide resolved
ksqldb-functional-tests/src/test/resources/query-validation-tests/joins.json
Show resolved
Hide resolved
@@ -2298,6 +2342,159 @@ | |||
"type": "io.confluent.ksql.util.KsqlStatementException", | |||
"message": "Incompatible key formats. `T1` has KAFKA while `T2` has DELIMITED.\nCorrect the key format by creating a copy of the table with the correct key format. For example:\n\tCREATE TABLE T_COPY\n\t WITH (KEY_FORMAT = <required format>, <other key format config>)\n\t AS SELECT * FROM T;" | |||
} | |||
}, | |||
{ | |||
"name": "stream-steam key-to-key - SR-enabled key format", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to introduce a pre-topics node here so that we have control over the actual schema in schema registry. I think we also have the ability to ensure the schema at the end. It would be good to have both of these in the test to make sure we don't accidentally create a backwards incompatible change if/when we optimize away unnecessary repartitions
I think it also makes sense to add tests for:
- schemas are identical, should repartition anyway
- schemas are logically identical but physically different
- schemas are logically different, should fail (in table-table joins anyway)
Also we should make sure what happens when we have an SR-enabled value format as well (I don't imagine anything should go wrong, but it makes sure that when we piped through the value format in the table select key we don't mess things up)
🚂
"name": "stream-steam key-to-key - SR-enabled key format", | |
"name": "stream-stream key-to-key - SR-enabled key format", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to introduce a pre-topics node here so that we have control over the actual schema in schema registry. I think we also have the ability to ensure the schema at the end.
Yes, I've been thinking this as well. Wanted to open the PR first in order to sanity-check the approach -- since we're on the same page I'll pursue beefing up the tests 👍
I think it also makes sense to add tests for:
- schemas are identical, should repartition anyway
- schemas are logically identical but physically different
- schemas are logically different, should fail (in table-table joins anyway)
The new tests I added are already case (1). Tests for case (3) exist for non-SR-enabled formats, but I'll add one for an SR-enabled format. I'll add (2) and also tests with SR-enabled value formats as a sanity check.
🚂
Haha! I fell victim to copy-paste from existing tests. I made the update, which required renaming existing historic plans so this PR diff just became a lot larger in terms of number of lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the different tests discussed above. Good call on testing table-table join with nulls -- turns out we've got a bug. The newly added test won't pass until #6647 is merged.
@@ -54,6 +59,10 @@ public PreJoinRepartitionNode( | |||
this.joiningNode = source instanceof JoiningNode | |||
? Optional.of((JoiningNode) source) | |||
: Optional.empty(); | |||
this.valueFormat = getLeftmostSourceNode() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to move this into JoiningNode
so that we can leverage it both here and in JoinNode#getValueFormatForSource
(that way, we can ensure that they line up - if for some reason we decide to switch JoinNode
to use the right most value format, we won't have to make that change here too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done -- good call!
if (FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) { | ||
forceRepartition = true; | ||
} | ||
|
||
if (requiresRepartition()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, but should we make this requiresRepartition() || forceRepartition
? If not, we should add a comment about why forced repartitions don't move in here (instead of // Node is repartitioning already
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this as part of revamping the logic (see #6635 (comment)).
ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
Show resolved
Hide resolved
@@ -92,6 +101,11 @@ public LogicalSchema getSchema() { | |||
|
|||
@Override | |||
public void setKeyFormat(final FormatInfo format) { | |||
// Force repartition in case of schema inference, to avoid misses due to key schema ID mismatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not your code: it bothers me a little that there's nothing "requiring" that this is called. can we have buildStream()
throw an exception if setKeyFormat
was never set? this will make sure that we don't refactor the code and lose this call somewhere in the refactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because it wasn't required that this method be called previously :) I've revamped the logic here so it's now required, as that was the major point of confusion when reading this code as well. Also added the check you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call on challenging this, BTW. There was a bug where we weren't always forcing repartitions for upstream joins with SR-enableld key formats in multi-joins. I've fixed the bug and also added multi-join QTTs to cover this.
ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
Outdated
Show resolved
Hide resolved
} | ||
}, | ||
{ | ||
"name": "table-table - SR-enabled key format - with nulls", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test will not succeed until #6647 is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯 excellent test coverage. LGTM!
@@ -140,15 +144,18 @@ public SchemaKTable( | |||
); | |||
} | |||
|
|||
@SuppressFBWarnings("UC_USELESS_CONDITION") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😂 pah. what a useless condition.
) { | ||
if (repartitionNotNeeded(ImmutableList.of(keyExpression))) { | ||
return (SchemaKStream<Struct>) this; | ||
if (repartitionNotNeeded(ImmutableList.of(keyExpression)) && !forceRepartition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (repartitionNotNeeded(ImmutableList.of(keyExpression)) && !forceRepartition) { | |
if (!forceRepartition && repartitionNotNeeded(ImmutableList.of(keyExpression))) { |
nit: pet peeve of mine 😈
Description
Fixes #6332
This PR forces repartitions on both sources of joins with SR-enabled key formats. This is in order to ensure that the same Schema Registry schema is used on both sides of the join, so the user will not experience unexpected join misses due to logically equivalent data being sent to different topic partitions because the serialized bytes differ (due to differences in schema or schema ID). By forcing repartitions, the schema generated by ksqlDB is used for data on both sides of the join, which is consistent given logical schema, key format, key format properties, and key serde features, all of which are verified to be the same on both sides of the join, else the join is rejected.
In order to support stream-table and table-table joins, this PR adds support for repartitioning tables. This functionality is hidden from the user (users still see the same error message indicating that ksqlDB does not support repartitioning tables) and is purely used for this internal use case of enabling joins with SR-enabled key formats.
This approach of forcing repartitions on both sides of the join is not resource-efficient for two main reasons:
toTable()
call, which requires passing in aMaterialized
object, which results in an extra state store. I don't know of a way around this at this time.Testing done
QTT + manual.
Reviewer checklist