Skip to content

Commit

Permalink
fix: allow joins in windowed aggregations
Browse files Browse the repository at this point in the history
fixes: #5898

This change fixes a regression introduced in v5.5.0 that meant any windowed aggregation with a join would fail with an `IllegalArgumentException`.

This change fixes the regression, how ever follow on work is required to allow access to the window bounds columns `WINDOWSTART` and `WINDOWEND` in such queries.  Access to these columns was not possible in v5.4, i.e. this is not a regression. The follow on work will be tracked under #5931.
  • Loading branch information
big-andy-coates committed Aug 3, 2020
1 parent 6636f71 commit 9452ab6
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ private static Set<Expression> getGroupByExpressions(final Analysis analysis) {
return ImmutableSet.copyOf(analysis.getGroupByExpressions());
}

if (analysis.isJoin()) {
// Window bounds are not yet supported in the projection of windowed aggregates with joins.
// See https://github.com/confluentinc/ksql/issues/5931
return ImmutableSet.copyOf(analysis.getGroupByExpressions());
}

// Add in window bounds columns as implicit group by columns:
final AliasedDataSource source = Iterables.getOnlyElement(analysis.getFromDataSources());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public Optional<Expression> visitColumnReference(
final UnqualifiedColumnReferenceExp node,
final Context<Void> ctx
) {
if (sourceSchemas.isJoin()) {
if (sourceSchemas.isJoin() && !node.getReference().isAggregate()) {
final SourceName sourceName =
sourceSchemas.sourcesWithField(Optional.empty(), node.getReference()).iterator().next();
return Optional.of(new UnqualifiedColumnReferenceExp(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
{
"version" : "5.5.1",
"timestamp" : 1596471687145,
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE A (ID STRING, REGIONID STRING) WITH (KAFKA_TOPIC='a', KEY='id', VALUE_FORMAT='json');",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "A",
"schema" : "`ROWKEY` STRING KEY, `ID` STRING, `REGIONID` STRING",
"keyField" : "ID",
"timestampColumn" : null,
"topicName" : "a",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM B (ID STRING) WITH (KAFKA_TOPIC='b', VALUE_FORMAT='json');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "B",
"schema" : "`ROWKEY` STRING KEY, `ID` STRING",
"keyField" : null,
"timestampColumn" : null,
"topicName" : "b",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : null
},
"queryPlan" : null
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE TEST AS SELECT\n A.ID A_ID,\n COUNT(*) COUNT\nFROM B B\nLEFT OUTER JOIN A A ON ((A.ID = B.ID))\nWINDOW TUMBLING ( SIZE 1 MINUTES ) \nGROUP BY A.ID\nHAVING (COUNT(*) > 2)\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` STRING KEY, `A_ID` STRING, `COUNT` BIGINT",
"keyField" : "A_ID",
"timestampColumn" : null,
"topicName" : "TEST",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"windowInfo" : {
"type" : "TUMBLING",
"size" : 60.000000000
}
},
"queryPlan" : {
"sources" : [ "A", "B" ],
"sink" : "TEST",
"physicalPlan" : {
"@type" : "tableSinkV1",
"properties" : {
"queryContext" : "TEST"
},
"source" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "Aggregate/Project"
},
"source" : {
"@type" : "tableFilterV1",
"properties" : {
"queryContext" : "Aggregate/HavingFilter"
},
"source" : {
"@type" : "streamWindowedAggregateV1",
"properties" : {
"queryContext" : "Aggregate/Aggregate"
},
"source" : {
"@type" : "streamGroupByV1",
"properties" : {
"queryContext" : "Aggregate/GroupBy"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Aggregate/Prepare"
},
"source" : {
"@type" : "streamTableJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "LEFT",
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSelectKeyV1",
"properties" : {
"queryContext" : "LeftSourceKeyed"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "b",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `ID` STRING"
},
"keyExpression" : "ID"
},
"selectExpressions" : [ "ID AS B_ID", "ROWTIME AS B_ROWTIME", "ROWKEY AS B_ROWKEY" ]
},
"rightSource" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "tableSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "a",
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"timestampColumn" : null,
"sourceSchema" : "`ROWKEY` STRING KEY, `ID` STRING, `REGIONID` STRING"
},
"selectExpressions" : [ "ID AS A_ID", "REGIONID AS A_REGIONID", "ROWTIME AS A_ROWTIME", "ROWKEY AS A_ROWKEY" ]
}
},
"selectExpressions" : [ "A_ID AS KSQL_INTERNAL_COL_0", "B_ROWTIME AS KSQL_INTERNAL_COL_1" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"groupByExpressions" : [ "KSQL_INTERNAL_COL_0" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"nonAggregateColumns" : [ "KSQL_INTERNAL_COL_0", "KSQL_INTERNAL_COL_1" ],
"aggregationFunctions" : [ "COUNT(KSQL_INTERNAL_COL_1)", "COUNT(KSQL_INTERNAL_COL_1)" ],
"windowExpression" : " TUMBLING ( SIZE 1 MINUTES ) "
},
"filterExpression" : "(KSQL_AGG_VARIABLE_1 > 2)"
},
"selectExpressions" : [ "KSQL_INTERNAL_COL_0 AS A_ID", "KSQL_AGG_VARIABLE_0 AS COUNT" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA",
"properties" : { }
},
"valueFormat" : {
"format" : "JSON",
"properties" : { }
},
"options" : [ ]
},
"topicName" : "TEST"
},
"queryId" : "CTAS_TEST_0"
}
} ],
"schemas" : {
"CTAS_TEST_0.KafkaTopic_Right.Source" : "STRUCT<ID VARCHAR, REGIONID VARCHAR> NOT NULL",
"CTAS_TEST_0.KafkaTopic_Left.Source" : "STRUCT<ID VARCHAR> NOT NULL",
"CTAS_TEST_0.Join.Left" : "STRUCT<B_ID VARCHAR, B_ROWTIME BIGINT, B_ROWKEY VARCHAR> NOT NULL",
"CTAS_TEST_0.Aggregate.GroupBy" : "STRUCT<KSQL_INTERNAL_COL_0 VARCHAR, KSQL_INTERNAL_COL_1 BIGINT> NOT NULL",
"CTAS_TEST_0.Aggregate.Aggregate.Materialize" : "STRUCT<KSQL_INTERNAL_COL_0 VARCHAR, KSQL_INTERNAL_COL_1 BIGINT, KSQL_AGG_VARIABLE_0 BIGINT, KSQL_AGG_VARIABLE_1 BIGINT> NOT NULL",
"CTAS_TEST_0.TEST" : "STRUCT<A_ID VARCHAR, COUNT BIGINT> NOT NULL"
},
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.new.api.enabled" : "false",
"ksql.streams.state.dir" : "/var/folders/7c/zftp0089471cylbb3rvw01040000gp/T/confluent4576063723780193454",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
Topologies:
Sub-topology: 0
Source: Join-repartition-source (topics: [Join-repartition])
--> Join
Processor: Join (stores: [KafkaTopic_Right-Reduce])
--> Aggregate-Prepare
<-- Join-repartition-source
Processor: Aggregate-Prepare (stores: [])
--> KSTREAM-FILTER-0000000014
<-- Join
Processor: KSTREAM-FILTER-0000000014 (stores: [])
--> Aggregate-GroupBy
<-- Aggregate-Prepare
Source: KSTREAM-SOURCE-0000000000 (topics: [a])
--> KTABLE-SOURCE-0000000001
Processor: Aggregate-GroupBy (stores: [])
--> Aggregate-GroupBy-repartition-filter
<-- KSTREAM-FILTER-0000000014
Processor: KTABLE-SOURCE-0000000001 (stores: [KafkaTopic_Right-Reduce])
--> KTABLE-TRANSFORMVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: Aggregate-GroupBy-repartition-filter (stores: [])
--> Aggregate-GroupBy-repartition-sink
<-- Aggregate-GroupBy
Processor: KTABLE-TRANSFORMVALUES-0000000002 (stores: [])
--> PrependAliasRight
<-- KTABLE-SOURCE-0000000001
Sink: Aggregate-GroupBy-repartition-sink (topic: Aggregate-GroupBy-repartition)
<-- Aggregate-GroupBy-repartition-filter
Processor: PrependAliasRight (stores: [])
--> none
<-- KTABLE-TRANSFORMVALUES-0000000002

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000004 (topics: [b])
--> KSTREAM-TRANSFORMVALUES-0000000005
Processor: KSTREAM-TRANSFORMVALUES-0000000005 (stores: [])
--> KSTREAM-FILTER-0000000006
<-- KSTREAM-SOURCE-0000000004
Processor: KSTREAM-FILTER-0000000006 (stores: [])
--> KSTREAM-KEY-SELECT-0000000007
<-- KSTREAM-TRANSFORMVALUES-0000000005
Processor: KSTREAM-KEY-SELECT-0000000007 (stores: [])
--> PrependAliasLeft
<-- KSTREAM-FILTER-0000000006
Processor: PrependAliasLeft (stores: [])
--> Join-repartition-filter
<-- KSTREAM-KEY-SELECT-0000000007
Processor: Join-repartition-filter (stores: [])
--> Join-repartition-sink
<-- PrependAliasLeft
Sink: Join-repartition-sink (topic: Join-repartition)
<-- Join-repartition-filter

Sub-topology: 2
Source: Aggregate-GroupBy-repartition-source (topics: [Aggregate-GroupBy-repartition])
--> KSTREAM-AGGREGATE-0000000016
Processor: KSTREAM-AGGREGATE-0000000016 (stores: [Aggregate-Aggregate-Materialize])
--> Aggregate-Aggregate-ToOutputSchema
<-- Aggregate-GroupBy-repartition-source
Processor: Aggregate-Aggregate-ToOutputSchema (stores: [])
--> Aggregate-Aggregate-WindowSelect
<-- KSTREAM-AGGREGATE-0000000016
Processor: Aggregate-Aggregate-WindowSelect (stores: [])
--> Aggregate-HavingFilter-ApplyPredicate
<-- Aggregate-Aggregate-ToOutputSchema
Processor: Aggregate-HavingFilter-ApplyPredicate (stores: [])
--> Aggregate-HavingFilter-Filter
<-- Aggregate-Aggregate-WindowSelect
Processor: Aggregate-HavingFilter-Filter (stores: [])
--> Aggregate-HavingFilter-PostProcess
<-- Aggregate-HavingFilter-ApplyPredicate
Processor: Aggregate-HavingFilter-PostProcess (stores: [])
--> Aggregate-Project
<-- Aggregate-HavingFilter-Filter
Processor: Aggregate-Project (stores: [])
--> KTABLE-TOSTREAM-0000000026
<-- Aggregate-HavingFilter-PostProcess
Processor: KTABLE-TOSTREAM-0000000026 (stores: [])
--> KSTREAM-SINK-0000000027
<-- Aggregate-Project
Sink: KSTREAM-SINK-0000000027 (topic: TEST)
<-- KTABLE-TOSTREAM-0000000026

Loading

0 comments on commit 9452ab6

Please sign in to comment.