Skip to content

Commit

Permalink
feat: add stream pull queries to HTTP/2
Browse files Browse the repository at this point in the history
  • Loading branch information
John Roesler committed Sep 15, 2021
1 parent d59d33c commit dfe4e0a
Show file tree
Hide file tree
Showing 42 changed files with 1,165 additions and 837 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,7 +84,6 @@ public static KsqlContext create(
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
() -> Admin.create(ksqlConfig.getKsqlAdminClientConfigProps()),
processingLogContext,
functionRegistry,
serviceInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -107,6 +108,7 @@
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -366,7 +368,8 @@ ScalablePushQueryMetadata executeScalablePushQuery(
@SuppressWarnings("OptionalGetWithoutIsPresent") // Known to be non-empty
TransientQueryMetadata executeTransientQuery(
final ConfiguredStatement<Query> statement,
final boolean excludeTombstones
final boolean excludeTombstones,
final Optional<ImmutableMap<TopicPartition, Long>> endOffsets
) {
final ExecutorPlans plans = planQuery(statement, statement.getStatement(),
Optional.empty(), Optional.empty(), engineContext.getMetaStore());
Expand All @@ -391,7 +394,8 @@ TransientQueryMetadata executeTransientQuery(
outputNode.getSchema(),
outputNode.getLimit(),
outputNode.getWindowInfo(),
excludeTombstones
excludeTombstones,
endOffsets
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -107,12 +104,9 @@ public class KsqlEngine implements KsqlExecutionContext, Closeable {
private final EngineContext primaryContext;
private final QueryCleanupService cleanupService;
private final OrphanedTransientQueryCleaner orphanedTransientQueryCleaner;
private final KsqlConfig ksqlConfig;
private final Admin persistentAdminClient;

public KsqlEngine(
final ServiceContext serviceContext,
final Supplier<Admin> adminSupplier,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final ServiceInfo serviceInfo,
Expand All @@ -131,7 +125,6 @@ public KsqlEngine(
serviceInfo.customMetricsTags(),
serviceInfo.metricsExtension()
),
adminSupplier,
queryIdGenerator,
ksqlConfig,
queryEventListeners
Expand All @@ -145,7 +138,6 @@ public KsqlEngine(
final String serviceId,
final MutableMetaStore metaStore,
final Function<KsqlEngine, KsqlEngineMetrics> engineMetricsFactory,
final Supplier<Admin> adminSupplier,
final QueryIdGenerator queryIdGenerator,
final KsqlConfig ksqlConfig,
final List<QueryEventListener> queryEventListeners
Expand Down Expand Up @@ -181,8 +173,6 @@ public KsqlEngine(
1000,
TimeUnit.MILLISECONDS
);
this.ksqlConfig = ksqlConfig;
this.persistentAdminClient = adminSupplier.get();

cleanupService.startAsync();
}
Expand Down Expand Up @@ -314,7 +304,7 @@ public TransientQueryMetadata executeTransientQuery(
try {
final TransientQueryMetadata query = EngineExecutor
.create(primaryContext, serviceContext, statement.getSessionConfig())
.executeTransientQuery(statement, excludeTombstones);
.executeTransientQuery(statement, excludeTombstones, Optional.empty());
return query;
} catch (final KsqlStatementException e) {
throw e;
Expand Down Expand Up @@ -364,15 +354,16 @@ public StreamPullQueryMetadata createStreamPullQuery(
.putAll(statementOrig.getSessionConfig().getOverrides())
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100)
// There's no point in EOS, since this query only produces side effects.
.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE)
.build()
);
final ImmutableMap<TopicPartition, Long> endOffsets =
getQueryInputEndOffsets(analysis, serviceContext.getAdminClient());

final TransientQueryMetadata transientQueryMetadata = EngineExecutor
.create(primaryContext, serviceContext, statement.getSessionConfig())
.executeTransientQuery(statement, excludeTombstones);

final ImmutableMap<TopicPartition, Long> endOffsets =
getQueryInputEndOffsets(analysis, statement, serviceContext.getAdminClient());
.executeTransientQuery(statement, excludeTombstones, Optional.of(endOffsets));

QueryLogger.info(
"Streaming stream pull query results '{}'",
Expand All @@ -382,35 +373,8 @@ public StreamPullQueryMetadata createStreamPullQuery(
return new StreamPullQueryMetadata(transientQueryMetadata, endOffsets);
}

public boolean passedEndOffsets(final StreamPullQueryMetadata streamPullQueryMetadata) {

try {
final ListConsumerGroupOffsetsResult result =
persistentAdminClient.listConsumerGroupOffsets(
streamPullQueryMetadata.getTransientQueryMetadata().getQueryApplicationId()
);

final Map<TopicPartition, OffsetAndMetadata> metadataMap =
result.partitionsToOffsetAndMetadata().get();

final Map<TopicPartition, Long> endOffsets = streamPullQueryMetadata.getEndOffsets();

for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
final OffsetAndMetadata offsetAndMetadata = metadataMap.get(entry.getKey());
if (offsetAndMetadata == null || offsetAndMetadata.offset() < entry.getValue()) {
log.debug("SPQ waiting on " + entry + " at " + offsetAndMetadata);
return false;
}
}
return true;
} catch (final ExecutionException | InterruptedException e) {
throw new KsqlException(e);
}
}

private ImmutableMap<TopicPartition, Long> getQueryInputEndOffsets(
final ImmutableAnalysis analysis,
final ConfiguredStatement<Query> statement,
final Admin admin) {

final String sourceTopicName = analysis.getFrom().getDataSource().getKafkaTopicName();
Expand All @@ -419,21 +383,9 @@ private ImmutableMap<TopicPartition, Long> getQueryInputEndOffsets(
sourceTopicName
);

final Object processingGuarantee = statement
.getSessionConfig()
.getConfig(true)
.getKsqlStreamConfigProps()
.get(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);

final IsolationLevel isolationLevel =
StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee)
? IsolationLevel.READ_UNCOMMITTED
: IsolationLevel.READ_COMMITTED;

return getEndOffsets(
admin,
topicDescription,
isolationLevel
topicDescription
);
}

Expand All @@ -459,8 +411,7 @@ private TopicDescription getTopicDescription(final Admin admin, final String sou

private ImmutableMap<TopicPartition, Long> getEndOffsets(
final Admin admin,
final TopicDescription topicDescription,
final IsolationLevel isolationLevel) {
final TopicDescription topicDescription) {
final Map<TopicPartition, OffsetSpec> topicPartitions =
topicDescription
.partitions()
Expand All @@ -471,7 +422,7 @@ private ImmutableMap<TopicPartition, Long> getEndOffsets(
final ListOffsetsResult listOffsetsResult = admin.listOffsets(
topicPartitions,
new ListOffsetsOptions(
isolationLevel
IsolationLevel.READ_UNCOMMITTED
)
);

Expand Down Expand Up @@ -557,7 +508,7 @@ public void close(final boolean closeQueries) {

try {
cleanupService.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
} catch (final TimeoutException e) {
log.warn("Timed out while closing cleanup service. "
+ "External resources for the following applications may be orphaned: {}",
cleanupService.pendingApplicationIds()
Expand All @@ -566,7 +517,6 @@ public void close(final boolean closeQueries) {

engineMetrics.close();
aggregateMetricsCollector.shutdown();
persistentAdminClient.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public TransientQueryMetadata executeTransientQuery(
engineContext,
serviceContext,
statement.getSessionConfig()
).executeTransientQuery(statement, excludeTombstones);
).executeTransientQuery(statement, excludeTombstones, Optional.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public interface BlockingRowQueue {
*/
void setLimitHandler(LimitHandler limitHandler);

/**
* Sets a handler that will be called when the query completes.
* Replaces any previous handler.
*/
void setCompletionHandler(CompletionHandler completionHandler);

/**
* Sets the callback that will be called any time a new row is accepted into the queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@

package io.confluent.ksql.query;

public interface LimitQueueCallback extends QueueCallback {
public interface CompletionHandler {

/**
* Sets the limit handler that will be called when the limit is reached.
*
* <p>Replaces any previous handler.
*
* @param limitHandler the handler.
* Fired when the query is complete
*/
void setLimitHandler(LimitHandler limitHandler);
void complete();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ public void setLimitHandler(final LimitHandler limitHandler) {
this.limitHandler = limitHandler;
}

@Override
public void setCompletionHandler(final CompletionHandler completionHandler) {
// not currently used in pull queries, although future refactoring might be able to
// take advantage of this mechanism.
}

@Override
public void setQueuedCallback(final Runnable queuedCallback) {
final Runnable parent = this.queuedCallback;
Expand Down
Loading

0 comments on commit dfe4e0a

Please sign in to comment.