-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: propagate null-valued records in repartition #6647
Conversation
@@ -58,7 +58,7 @@ public static KeyBuilder keyBuilder(final ColumnName name, final SqlType type) { | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
public static List<?> asList(final Object key) { | |||
public static List<Object> asList(final Object key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this was previously List<?>
rather than List<Object>
? I don't fully understand the meaning of this change, but it lets us avoid an unchecked cast downstream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://stackoverflow.com/a/35273095/2258040 - for our use case, I don't think it really makes sense to return List<?>
@@ -13,7 +13,7 @@ | |||
* specific language governing permissions and limitations under the License. | |||
*/ | |||
|
|||
package io.confluent.ksql.analyzer; | |||
package io.confluent.ksql.execution.util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love the package this was moved to but couldn't think of anywhere better. Suggestions welcome!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 it's not a public API so it doesn't really matter to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superb sleuthing 🕵️ some minor comments inline but I think we can sneak this by without worrying about backwards compatibility too much (famous last words)
@@ -13,7 +13,7 @@ | |||
* specific language governing permissions and limitations under the License. | |||
*/ | |||
|
|||
package io.confluent.ksql.analyzer; | |||
package io.confluent.ksql.execution.util; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 it's not a public API so it doesn't really matter to me
@@ -58,7 +58,7 @@ public static KeyBuilder keyBuilder(final ColumnName name, final SqlType type) { | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
public static List<?> asList(final Object key) { | |||
public static List<Object> asList(final Object key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://stackoverflow.com/a/35273095/2258040 - for our use case, I don't think it really makes sense to return List<?>
ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/PartitionByParamsFactory.java
Show resolved
Hide resolved
private static class PartitionByExpressionEvaluator { | ||
|
||
private final Function<GenericRow, Object> evaluator; | ||
private final boolean acceptsKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final boolean acceptsKey; | |
private final boolean operatesOnKeyOnly; |
A little hard for me to understand it inline without context on what this PR does:
final Object newKey = evaluator.evaluator.apply(
evaluator.acceptsKey
? GenericRow.fromList(StructKeyUtil.asList(k))
: v
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is addressed by your suggestion below? I went with evaluateOnKeyOnly
though I don't feel strongly.
@@ -202,6 +217,23 @@ private static LogicalSchema buildSchema( | |||
final String errorMsg = "Error computing new key from expression " | |||
+ expressionMetadata.getExpression(); | |||
|
|||
return row -> expressionMetadata.evaluate(row, null, logger, () -> errorMsg); | |||
return new PartitionByExpressionEvaluator( | |||
row -> expressionMetadata.evaluate(row, null, logger, () -> errorMsg), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect this isn't the only place where this bug happens (a null value doesn't get properly passed into an ExpressionEvaluator
🤔). I had a PR at one point going to try to have ExpressionMetadata#evaluate
take in both a key and a value instead of just a GenericRow
which would have prevented this issue. Unfortunately, I couldn't get joins to work because the joiner doesn't actually have access to the key 😭
tldr, I think it would be good if we encapsulate as much of this "put the keys and only the keys into the generic row if the row was null" into the PartitionByExpressionEvaluator
and have that implement BiFunction<Struct, GenericRow, Object>
so at least the part that creates the key is all in the same place that calls the evaluator:
final Object newKey = evaluator.evaluator.apply(
evaluator.acceptsKey
? GenericRow.fromList(StructKeyUtil.asList(k))
: v
);
I would also just pass in expressionMetadata
into the PartitionByExpressionEvaluator
instead of passing it in as a lambda so that it's centralized and we don't have to track down where it's being called and what's getting passed into it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, thanks for the suggestion -- much cleaner!
What's the benefit of implementing BiFunction<Object, GenericRow, Object>
rather than simply have a method? I've currently got
Object evaluate(final Object key, final GenericRow value) {
final GenericRow row = evaluateOnKeyOnly
? GenericRow.fromList(StructKeyUtil.asList(key))
: value;
return expressionMetadata.evaluate(row, null, logger, errorMsg);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is none, I think that's great (especially if nowhere else expects the BiFunction)
Description
Fixes #6646
The current mapper used in repartitions builds the new key by evaluating the generated expression on the value. If the value is null, the mapper returns an empty key rather than the original key. More intuitive behavior is to detect when the partition by expression depends only on key column(s) and evaluate the new key on the key columns in these cases, allowing null values to be propagated. This is needed to enable the propagation of tombstones in #6635
Testing done
Unit + QTT.
Reviewer checklist