Skip to content

Commit

Permalink
Feedback part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Jan 20, 2021
1 parent 6f5d288 commit 55d3da3
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.physical.pull.PullQueryQueuePopulator;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.LogicalPlanner;
import io.confluent.ksql.planner.plan.DataSourceNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -134,7 +135,6 @@ public CompletableFuture<Void> handlePullQuery(
locations, pullQueryQueue);
completableFuture.complete(null);
} catch (Throwable t) {
pullQueryQueue.close();
completableFuture.completeExceptionally(t);
}
});
Expand Down Expand Up @@ -324,56 +324,15 @@ private static void forwardTo(
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_SKIP_FORWARDING, true,
KsqlRequestConfig.KSQL_REQUEST_INTERNAL_REQUEST, true,
KsqlRequestConfig.KSQL_REQUEST_QUERY_PULL_PARTITIONS, partitions);
final AtomicInteger processedRows = new AtomicInteger(0);
final AtomicReference<Header> header = new AtomicReference<>();
final RestResponse<Integer> response = serviceContext
.getKsqlClient()
.makeQueryRequest(
owner.location(),
statement.getStatementText(),
statement.getSessionConfig().getOverrides(),
requestProperties,
streamedRows -> {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
}

if (row.getErrorMessage().isPresent()) {
throw new KsqlStatementException(
row.getErrorMessage().get().getMessage(),
statement.getStatementText()
);
}

if (!row.getRow().isPresent()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to "
+ "missing row data.",
statement.getStatement(), statement.getSessionConfig().getOverrides(),
requestProperties, owner));
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}

if (!pullQueryQueue.acceptRows(rows)) {
LOG.info("Failed to queue all rows");
}
}
streamedRowsHandler(owner, statement, requestProperties, pullQueryQueue, rowFactory,
outputSchema)
);

if (response.isErroneous()) {
Expand All @@ -393,6 +352,59 @@ private static void forwardTo(
}
}

private static Consumer<List<StreamedRow>> streamedRowsHandler(
final KsqlNode owner,
final ConfiguredStatement<Query> statement,
final Map<String, Object> requestProperties,
final PullQueryQueue pullQueryQueue,
final BiFunction<List<?>, LogicalSchema, PullQueryRow> rowFactory,
final LogicalSchema outputSchema
) {
final AtomicInteger processedRows = new AtomicInteger(0);
final AtomicReference<Header> header = new AtomicReference<>();
return streamedRows -> {
if (streamedRows == null || streamedRows.isEmpty()) {
return;
}
final List<PullQueryRow> rows = new ArrayList<>();

// If this is the first row overall, skip the header
final int previousProcessedRows = processedRows.getAndAdd(streamedRows.size());
for (int i = 0; i < streamedRows.size(); i++) {
final StreamedRow row = streamedRows.get(i);
if (i == 0 && previousProcessedRows == 0) {
final Optional<Header> optionalHeader = row.getHeader();
optionalHeader.ifPresent(h -> validateSchema(outputSchema, h.getSchema(), owner));
optionalHeader.ifPresent(header::set);
continue;
}

if (row.getErrorMessage().isPresent()) {
throw new KsqlStatementException(
row.getErrorMessage().get().getMessage(),
statement.getStatementText()
);
}

if (!row.getRow().isPresent()) {
throw new KsqlServerException(String.format(
"Forwarding pull query request [%s, %s, %s] to node %s failed due to "
+ "missing row data.",
statement.getStatement(), statement.getSessionConfig().getOverrides(),
requestProperties, owner));
}

final List<?> r = row.getRow().get().getColumns();
Preconditions.checkNotNull(header.get());
rows.add(rowFactory.apply(r, header.get().getSchema()));
}

if (!pullQueryQueue.acceptRows(rows)) {
LOG.info("Failed to queue all rows");
}
};
}

private static void validateSchema(
final LogicalSchema expectedSchema,
final LogicalSchema forwardedSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
* This queue allows for results to be streamed back to the client when running pull queries.
* Streaming behavior is important when dealing with large results since we don't want to hold it
* all in memory at once.
* <p>
* New rows are produced and enqueued by PullPhysicalPlan if the request is being handled locally
*
* <p>New rows are produced and enqueued by PullPhysicalPlan if the request is being handled locally
* or HARouting if the request must be forwarded to another node. This is done with the method
* acceptRow and may block the caller if the queue is at capacity.
* <p>
* Rows are consumed by the request thread of the endpoint. This is done with the various poll
*
* <p>Rows are consumed by the request thread of the endpoint. This is done with the various poll
* methods.
*/
public class PullQueryQueue implements BlockingRowQueue {
Expand Down Expand Up @@ -82,8 +82,8 @@ public void setQueuedCallback(final Runnable queuedCallback) {
final Runnable parent = this.queuedCallback;

this.queuedCallback = () -> {
parent.run();
queuedCallback.run();
parent.run();
queuedCallback.run();
};
}

Expand Down Expand Up @@ -162,7 +162,7 @@ public boolean acceptRows(final List<PullQueryRow> tableRows) {
for (PullQueryRow row : tableRows) {
if (!acceptRow(row)) {
return false;
};
}
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu
// This allows us to hit the limit without having to queue one last row
if (queue.isEmpty()) {
ctx.runOnContext(v -> sendComplete());
} else {
ctx.runOnContext(v -> doSend());
}
});
this.queryHandle = queryHandle;
Expand Down
Loading

0 comments on commit 55d3da3

Please sign in to comment.