-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: avoid spurious tombstones in table output #6405
Changes from 1 commit
a10e5eb
cc7f1da
fcee2d4
8bc96c1
64480bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -619,8 +619,9 @@ public void shouldQueryMaterializedTableWithMultipleAggregationColumns() { | |
} | ||
|
||
@Test | ||
public void shouldIgnoreHavingClause() { | ||
// Note: HAVING clause are handled centrally by KsqlMaterialization | ||
public void shouldHandleHavingClause() { | ||
// Note: HAVING clause are handled centrally by KsqlMaterialization. This logic will have been | ||
// installed as part of building the below statement: | ||
|
||
// Given: | ||
final PersistentQueryMetadata query = executeQuery( | ||
|
@@ -632,7 +633,11 @@ public void shouldIgnoreHavingClause() { | |
|
||
final LogicalSchema schema = schema("COUNT", SqlTypes.BIGINT); | ||
|
||
final Map<String, GenericRow> rows = waitForUniqueUserRows(STRING_DESERIALIZER, schema); | ||
final int matches = (int) USER_DATA_PROVIDER.data().values().stream() | ||
.filter(row -> ((Long) row.get(0)) > 2) | ||
.count(); | ||
|
||
final Map<String, GenericRow> rows = waitForUniqueUserRows(matches, STRING_DESERIALIZER, schema); | ||
Comment on lines
+636
to
+640
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The number of expected rows is now reduced as we no longer produce spurious tombstones. |
||
|
||
// When: | ||
final Materialization materialization = query.getMaterialization(queryId, contextStacker).get(); | ||
|
@@ -641,16 +646,22 @@ public void shouldIgnoreHavingClause() { | |
final MaterializedTable table = materialization.nonWindowed(); | ||
|
||
rows.forEach((rowKey, value) -> { | ||
// Rows passing the HAVING clause: | ||
final Struct key = asKeyStruct(rowKey, query.getPhysicalSchema()); | ||
|
||
final Optional<Row> expected = Optional.ofNullable(value) | ||
.map(v -> Row.of(schema, key, v, -1L)); | ||
|
||
final Optional<Row> row = withRetry(() -> table.get(key)); | ||
assertThat(row.map(Row::schema), is(expected.map(Row::schema))); | ||
assertThat(row.map(Row::key), is(expected.map(Row::key))); | ||
assertThat(row.map(Row::value), is(expected.map(Row::value))); | ||
assertThat(row.map(Row::schema), is(Optional.of(schema))); | ||
assertThat(row.map(Row::key), is(Optional.of(key))); | ||
assertThat(row.map(Row::value), is(Optional.of(value))); | ||
}); | ||
|
||
USER_DATA_PROVIDER.data().entries().stream() | ||
.filter(e -> !rows.containsKey(e.getKey().getString("USERID"))) | ||
.forEach(e -> { | ||
// Rows filtered by the HAVING clause: | ||
final Optional<Row> row = withRetry(() -> table.get(e.getKey())); | ||
assertThat(row, is(Optional.empty())); | ||
}); | ||
Comment on lines
+658
to
+664
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Get's against the table for filtered out rows should return nothing. |
||
} | ||
|
||
private static void verifyRetainedWindows( | ||
|
@@ -677,10 +688,22 @@ private static void verifyRetainedWindows( | |
private <T> Map<T, GenericRow> waitForUniqueUserRows( | ||
final Deserializer<T> keyDeserializer, | ||
final LogicalSchema aggregateSchema | ||
) { | ||
return waitForUniqueUserRows( | ||
USER_DATA_PROVIDER.data().size(), | ||
keyDeserializer, | ||
aggregateSchema | ||
); | ||
} | ||
|
||
private <T> Map<T, GenericRow> waitForUniqueUserRows( | ||
final int count, | ||
final Deserializer<T> keyDeserializer, | ||
final LogicalSchema aggregateSchema | ||
) { | ||
return TEST_HARNESS.verifyAvailableUniqueRows( | ||
output.toUpperCase(), | ||
USER_DATA_PROVIDER.data().size(), | ||
count, | ||
VALUE_FORMAT, | ||
PhysicalSchema.from(aggregateSchema, SerdeFeatures.of(), SerdeFeatures.of()), | ||
keyDeserializer | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: test name and comments were misleading as the extra steps
KsqlMaterialization
adds to handle the HAVING clause are installed as part of this test.