-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce branches for pull queries on streams #8045
Conversation
This change implements part of [KLIP-53](https://github.com/confluentinc/ksql/blob/master/design-proposals/klip-53-pull-queries-on-streams.md)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick PR to start incrementally introducing stream pull queries.
I tried to do it all in one pass, but the three different endpoints really explodes the code complexity.
My plan is to start with this PR, then to add the feature to one endpoint at a time, complete with tests for those endpoints. That way, after each commit, for any given endpoint, the feature will be fully functional or produce a nice error message.
@@ -46,6 +46,7 @@ private KsqlConstants() { | |||
public static final String KSQL_QUERY_SOURCE_TAG = "query_source"; | |||
public static final String KSQL_QUERY_PLAN_TYPE_TAG = "query_plan_type"; | |||
public static final String KSQL_QUERY_ROUTING_TYPE_TAG = "query_routing_type"; | |||
public static final String KSQL_DATA_SOURCE_TYPE_TAG = "data_source_type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a metric tag for the data source type
@@ -210,7 +213,8 @@ PullQueryResult executeTablePullQuery( | |||
physicalPlan.getQueryId(), pullQueryQueue); | |||
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator, | |||
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics, physicalPlan.getSourceType(), | |||
physicalPlan.getPlanType(), routingNodeType, physicalPlan::getRowsReadFromDataSource); | |||
physicalPlan.getPlanType(), routingNodeType, DataSourceType.KTABLE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all the pull request metrics, we are introducing the data type tag, so that we will be able to distinguish between table and stream pull requests.
throw new KsqlException("Unexpected data source type for table pull query: " | ||
+ source.getDataSourceType() + " " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now just serving as a last-minute check on the source type. As far as we know, though, all code paths leading here will only send us table queries now, so it makes sense to reword the error.
) { | ||
final MetricsKey key = new MetricsKey(sourceType, planType, routingNodeType); | ||
final MetricsKey key = new MetricsKey(sourceType, planType, routingNodeType, dataSourceType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's where we're actually adding the new type tag. Question: is this a compatible change?
final MetricsKey that = (MetricsKey) o; | ||
return sourceType == that.sourceType && planType == that.planType | ||
&& routingNodeType == that.routingNodeType && dataSourceType == that.dataSourceType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched this out for auto-generated equals/hashcode/toString.
+ routingNodeTypeName() + "-" | ||
+ dataSourceTypeName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data source needs to be part of the variant name now.
final DataSource dataSource = analysis.getFrom().getDataSource(); | ||
final DataSource.DataSourceType dataSourceType = dataSource.getDataSourceType(); | ||
switch (dataSourceType) { | ||
case KTABLE: | ||
return createTablePullQueryPublisher( | ||
analysis, | ||
context, | ||
serviceContext, | ||
statement, | ||
pullQueryMetrics, | ||
workerExecutor, | ||
metricsCallbackHolder | ||
); | ||
case KSTREAM: | ||
throw new KsqlStatementException( | ||
"Pull queries are not supported on streams.", | ||
statement.getStatementText() | ||
); | ||
default: | ||
throw new KsqlStatementException( | ||
"Unexpected data source type for pull query: " + dataSourceType, | ||
statement.getStatementText() | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main contribution of this PR: Detect the source type up front and add a switch statement so that we'll have a place to add the stream pull query in later.
) { | ||
final Metrics metrics = pullMetrics.getMetrics(); | ||
final Map<String, String> tags = ImmutableMap.<String, String>builder() | ||
.putAll(CUSTOM_TAGS_WITH_SERVICE_ID) | ||
.put(KsqlConstants.KSQL_QUERY_SOURCE_TAG, sourceType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_QUERY_PLAN_TYPE_TAG, planType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG, routingNodeType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_DATA_SOURCE_TYPE_TAG, dataSourceType.name().toLowerCase()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that we have to modify the expectations of this test is making me nervous that this might not be a compatible change.
It would be safer, though more verbose, to just add a whole new set of identical metrics for stream pull query metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is compatible. I would think it should be but might be worth checking. I think this is preferable to creating a whole new set of metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that:
- for DD, the metrics tag matching is not necessarily exhaustive: i.e. if you have three tags, and the DD collection rule only sets two of them, it would collect all metrics that have the matching values for these two tags. So that should be compatible.
- for telemetry, my low-confidence memory is that it is similar to DD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cc @cadonna who may help verifying here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another question that is interesting to answer is that what would break if it's not compatible? do we just clobber our old metrics or ignore old-format metrics that get emitted? that might be acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, upon further reflection (and upon actually trying to use these metrics for stream pull queries as well), I've decided that it's both too risky and not useful enough to do this by adding this tag, so I've pulled the new tag from this PR.
When I add stream pull queries, I'll just add new metrics to measure them.
) { | ||
final Metrics metrics = pullMetrics.getMetrics(); | ||
final Map<String, String> tags = ImmutableMap.<String, String>builder() | ||
.putAll(CUSTOM_TAGS_WITH_SERVICE_ID) | ||
.put(KsqlConstants.KSQL_QUERY_SOURCE_TAG, sourceType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_QUERY_PLAN_TYPE_TAG, planType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_QUERY_ROUTING_TYPE_TAG, routingNodeType.name().toLowerCase()) | ||
.put(KsqlConstants.KSQL_DATA_SOURCE_TYPE_TAG, dataSourceType.name().toLowerCase()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is compatible. I would think it should be but might be worth checking. I think this is preferable to creating a whole new set of metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too pending the metrics question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
This change implements part of KLIP-53
Reviewer checklist