Skip to content

Commit

Permalink
feat: fail scalable push query if error is detected in subscribed per…
Browse files Browse the repository at this point in the history
…sistent query (#7996)

* fix: add wiring

* chore: add comments for planning

* feat: fail query if rebalance or error detected

* fix: set error to true onError

* style: fix spacing

* test: manual testing

* test: manual testing

* test: manual testing

* chore: clean up config

* chore: remove unused field

* fix: fix test failures

* test: fix tests

* test: fix tests

* Trigger Build

* fix: address feedback

* style: fix checkstyle

* style: fix checkstyle violation

* fix: fix tests

* style: fix checkstyle

* fix: address pr feedback
  • Loading branch information
nateab authored Aug 17, 2021
1 parent 75894bb commit eed501c
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ProcessingQueue {
private final int queueSizeLimit;
private boolean closed = false;
private boolean droppedRows = false;
private boolean hasError = false;
private Runnable newRowCallback = () -> { };

public ProcessingQueue(final QueryId queryId) {
Expand Down Expand Up @@ -106,6 +107,14 @@ public synchronized boolean hasDroppedRows() {
return droppedRows;
}

public synchronized void onError() {
hasError = true;
}

public synchronized boolean getHasError() {
return hasError;
}

public QueryId getQueryId() {
return queryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,8 @@ public boolean isClosed() {

private void maybeNext(final Publisher publisher) {
List<?> row;
while ((row = (List<?>)next()) != null) {
if (dataSourceOperator.droppedRows()) {
closeInternal();
publisher.reportDroppedRows();
break;
} else {
publisher.accept(row);
}
while (!isErrored(publisher) && (row = (List<?>)next()) != null) {
publisher.accept(row);
}
if (!closed) {
if (timer >= 0) {
Expand All @@ -103,6 +97,19 @@ private void maybeNext(final Publisher publisher) {
}
}

private boolean isErrored(final Publisher publisher) {
if (dataSourceOperator.droppedRows()) {
closeInternal();
publisher.reportDroppedRows();
return true;
} else if (dataSourceOperator.hasError()) {
closeInternal();
publisher.reportHasError();
return true;
}
return false;
}

private void open(final Publisher publisher) {
VertxUtils.checkContext(context);
dataSourceOperator.setNewRowCallback(() -> context.runOnContext(v -> maybeNext(publisher)));
Expand Down Expand Up @@ -151,5 +158,9 @@ public Publisher(final Context ctx) {
public void reportDroppedRows() {
sendError(new RuntimeException("Dropped rows"));
}

public void reportHasError() {
sendError(new RuntimeException("Persistent query has error"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ private void handleRow(final Record<Object, GenericRow> record) {
}
}

public synchronized void onError() {
for (ProcessingQueue queue : processingQueues.values()) {
queue.onError();
}
}

@Override
public Processor<Object, GenericRow, Void, Void> get() {
return new PeekProcessor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ public void setNewRowCallback(final Runnable newRowCallback) {
public boolean droppedRows() {
return processingQueue.hasDroppedRows();
}

@Override
public boolean hasError() {
return processingQueue.getHasError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface PushDataSourceOperator {

// If rows have been dropped.
boolean droppedRows();

boolean hasError();
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,17 @@ public void acceptRow(final List<?> key, final GenericRow value) {
}
}

/**
* Accepts the rows without blocking.
* @param key The key
* @param value The value
* @return If the row was accepted or discarded for an acceptable reason, false if it was rejected
* because the queue was full.
*/
public boolean acceptRowNonBlocking(final List<?> key, final GenericRow value) {
try {
if (!callback.shouldQueue()) {
return false;
return true;
}

final KeyValue<List<?>, GenericRow> row = keyValue(key, value);
Expand All @@ -153,8 +160,9 @@ public boolean acceptRowNonBlocking(final List<?> key, final GenericRow value) {
} catch (final InterruptedException e) {
// Forced shutdown?
Thread.currentThread().interrupt();
return false;
}
return false;
return true;
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
Expand All @@ -40,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

Expand Down Expand Up @@ -105,7 +107,7 @@ public PersistentQueryMetadataImpl(
maxQueryErrorsQueueSize,
retryBackoffInitialMs,
retryBackoffMaxMs,
listener
new QueryListenerWrapper(listener, scalablePushRegistry)
);
this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource");
this.schemas = requireNonNull(schemas, "schemas");
Expand Down Expand Up @@ -226,4 +228,32 @@ public synchronized void stop() {
public Optional<ScalablePushRegistry> getScalablePushRegistry() {
return scalablePushRegistry;
}

private static final class QueryListenerWrapper implements Listener {
private final Listener listener;
private final Optional<ScalablePushRegistry> scalablePushRegistry;

private QueryListenerWrapper(final Listener listener,
final Optional<ScalablePushRegistry> scalablePushRegistry) {
this.listener = listener;
this.scalablePushRegistry = scalablePushRegistry;
}

@Override
public void onError(final QueryMetadata queryMetadata, final QueryError error) {
this.listener.onError(queryMetadata, error);
scalablePushRegistry.ifPresent(ScalablePushRegistry::onError);
}

@Override
public void onStateChange(final QueryMetadata queryMetadata, final State before,
final State after) {
this.listener.onStateChange(queryMetadata, before, after);
}

@Override
public void onClose(final QueryMetadata queryMetadata) {
this.listener.onClose(queryMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void setLimitHandler(final LimitHandler limitHandler) {

@Override
public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler handler) {
// We don't do anything special here since the persistent query handles its own errors
onException(handler::handle);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,13 @@ public void shouldHitLimit() {
assertThat(queue.poll(), nullValue());
assertThat(queue.hasDroppedRows(), is(true));
}

@Test
public void shouldDefaultToFalseForHasError() {
// Given:
final ProcessingQueue queue = new ProcessingQueue(new QueryId("a"));

// Then:
assertThat(queue.getHasError(),is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void shouldStopOnDroppedRows() throws InterruptedException {
final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId,
scalablePushRegistry, pushDataSourceOperator, context);
doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture());
when(pushDataSourceOperator.droppedRows()).thenReturn(false, true);
when(pushDataSourceOperator.droppedRows()).thenReturn(false, false, true);

final BufferedPublisher<List<?>> publisher = pushPhysicalPlan.execute();
final TestSubscriber<List<?>> subscriber = new TestSubscriber<>();
Expand Down Expand Up @@ -124,6 +124,39 @@ public void shouldStopOnDroppedRows() throws InterruptedException {
assertThat(pushPhysicalPlan.isClosed(), is(true));
}

@Test
public void shouldStopOnHasError() throws InterruptedException {
final PushPhysicalPlan pushPhysicalPlan = new PushPhysicalPlan(root, logicalSchema, queryId,
scalablePushRegistry, pushDataSourceOperator, context);
doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture());
when(pushDataSourceOperator.hasError()).thenReturn(false, false, true);

final BufferedPublisher<List<?>> publisher = pushPhysicalPlan.execute();
final TestSubscriber<List<?>> subscriber = new TestSubscriber<>();
publisher.subscribe(subscriber);

context.owner().setPeriodic(50, timerId -> {
if (runnableCaptor.getValue() == null) {
return;
}
when(root.next()).thenReturn(ROW1, ROW2, null);

runnableCaptor.getValue().run();
runnableCaptor.getValue().run();

context.owner().cancelTimer(timerId);
});

while (subscriber.getError() == null) {
Thread.sleep(100);
}

assertThat(subscriber.getError().getMessage(), containsString("Persistent query has error"));
assertThat(subscriber.getValues().size(), is(1));
assertThat(subscriber.getValues().get(0), is(ROW1));
assertThat(pushPhysicalPlan.isClosed(), is(true));
}

public static class TestSubscriber<T> implements Subscriber<T> {

private Subscription sub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,17 @@ public void shouldCreate_noApplicationServer() {
// Then
assertThat(registry.isPresent(), is(false));
}

@Test
public void shouldCallOnErrorOnQueue() {
// Given
ScalablePushRegistry registry = new ScalablePushRegistry(locator, SCHEMA, false, false);
registry.register(processingQueue);

// When
registry.onError();

// Then:
verify(processingQueue).onError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class PeekStreamOperatorTest {
private TableRow row2;
@Mock
private Runnable newRowCallback;
@Mock
private ProcessingQueue processingQueue;

@Test
public void shouldGetRowsFromOperator() {
Expand All @@ -57,4 +59,15 @@ public void shouldGetRowsFromOperator() {
locator.close();
verify(registry, times(1)).unregister(processingQueue);
}

@Test
public void shouldDefaultToFalseForHasErrorOnQueue() {
// Given:
final PeekStreamOperator locator = new PeekStreamOperator(registry, dataSourceNode, QUERY_ID);
// When:
locator.open();

// Then:
assertThat(locator.hasError(),is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.physical.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryError;
Expand Down Expand Up @@ -91,6 +92,8 @@ public class PersistentQueryMetadataTest {
private ProcessingLogger processingLogger;
@Mock
private Listener listener;
@Mock
private ScalablePushRegistry scalablePushRegistry;

private PersistentQueryMetadata query;

Expand Down Expand Up @@ -125,7 +128,7 @@ public void setUp() {
0L,
0L,
listener,
Optional.empty()
Optional.of(scalablePushRegistry)
);

query.initialize();
Expand Down

0 comments on commit eed501c

Please sign in to comment.