Skip to content

Commit

Permalink
fix: get rows returned metric working (#8230)
Browse files Browse the repository at this point in the history
  • Loading branch information
nateab authored Oct 5, 2021
1 parent c4a036a commit da4f71e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.confluent.ksql.physical.scalablepush.PushQueryPreparer;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.CompletionHandler;
import io.confluent.ksql.query.LimitHandler;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants.QuerySourceType;
import io.confluent.ksql.util.KsqlConstants.RoutingNodeType;
Expand All @@ -39,7 +39,7 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata {
private volatile boolean closed = false;
private final LogicalSchema logicalSchema;
private final QueryId queryId;
private final BlockingRowQueue rowQueue;
private final TransientQueryQueue transientQueryQueue;
private final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics;
private final ResultType resultType;
private final PushQueryQueuePopulator pushQueryQueuePopulator;
Expand All @@ -59,7 +59,7 @@ public class ScalablePushQueryMetadata implements PushQueryMetadata {
public ScalablePushQueryMetadata(
final LogicalSchema logicalSchema,
final QueryId queryId,
final BlockingRowQueue blockingRowQueue,
final TransientQueryQueue transientQueryQueue,
final Optional<ScalablePushQueryMetrics> scalablePushQueryMetrics,
final ResultType resultType,
final PushQueryQueuePopulator pushQueryQueuePopulator,
Expand All @@ -70,7 +70,7 @@ public ScalablePushQueryMetadata(
) {
this.logicalSchema = logicalSchema;
this.queryId = queryId;
this.rowQueue = blockingRowQueue;
this.transientQueryQueue = transientQueryQueue;
this.scalablePushQueryMetrics = scalablePushQueryMetrics;
this.resultType = resultType;
this.pushQueryQueuePopulator = pushQueryQueuePopulator;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void start() {

@Override
public void close() {
rowQueue.close();
transientQueryQueue.close();
startFuture.thenApply(handle -> {
handle.close();
return null;
Expand All @@ -123,18 +123,18 @@ public boolean isRunning() {

@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP")
public BlockingRowQueue getRowQueue() {
return rowQueue;
public TransientQueryQueue getRowQueue() {
return transientQueryQueue;
}

@Override
public void setLimitHandler(final LimitHandler limitHandler) {
rowQueue.setLimitHandler(limitHandler);
transientQueryQueue.setLimitHandler(limitHandler);
}

@Override
public void setCompletionHandler(final CompletionHandler completionHandler) {
rowQueue.setCompletionHandler(completionHandler);
transientQueryQueue.setCompletionHandler(completionHandler);
}

@Override
Expand Down Expand Up @@ -186,7 +186,7 @@ public RoutingNodeType getRoutingNodeType() {
}

public long getTotalRowsReturned() {
return rowQueue.size();
return transientQueryQueue.getTotalRowsQueued();
}

public long getTotalRowsProcessed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.confluent.ksql.internal.ScalablePushQueryMetrics;
import io.confluent.ksql.physical.scalablepush.PushQueryQueuePopulator;
import io.confluent.ksql.physical.scalablepush.PushRouting.PushConnectionsHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PushQueryMetadata.ResultType;
import java.util.Optional;
Expand All @@ -45,7 +45,7 @@ public class ScalablePushQueryMetadataTest {
@Mock
private LogicalSchema logicalSchema;
@Mock
private BlockingRowQueue blockingRowQueue;
private TransientQueryQueue transientQueryQueue;
@Mock
private PushQueryQueuePopulator populator;
@Mock
Expand All @@ -64,7 +64,7 @@ public void setUp() {
query = new ScalablePushQueryMetadata(
logicalSchema,
new QueryId("queryid"),
blockingRowQueue,
transientQueryQueue,
metrics,
ResultType.STREAM,
populator,
Expand Down

0 comments on commit da4f71e

Please sign in to comment.