Skip to content

Commit

Permalink
fix: fix test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
nateab committed Aug 13, 2021
1 parent 29fcf5b commit 0cce482
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ private void maybeNext(final Publisher publisher) {

private boolean isErrored(final Publisher publisher) {
if (dataSourceOperator.droppedRows()) {
System.out.println("dropped rows");
closeInternal();
publisher.reportDroppedRows();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,14 @@ public void onError(final QueryMetadata queryMetadata, final QueryError error) {

@Override
public void onStateChange(final QueryMetadata queryMetadata, final State before,
final State after) { }
final State after) {
listener.onStateChange(queryMetadata, before, after);
}

@Override
public void onClose(final QueryMetadata queryMetadata) { }
public void onClose(final QueryMetadata queryMetadata) {
listener.onClose(queryMetadata);
}
}
);
this.sinkDataSource = requireNonNull(sinkDataSource, "sinkDataSource");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public void shouldStopOnDroppedRows() throws InterruptedException {
scalablePushRegistry, pushDataSourceOperator, context);
doNothing().when(pushDataSourceOperator).setNewRowCallback(runnableCaptor.capture());
when(pushDataSourceOperator.droppedRows()).thenReturn(false, true);
// when(pushDataSourceOperator.hasError()).thenReturn(false, false);

final BufferedPublisher<List<?>> publisher = pushPhysicalPlan.execute();
final TestSubscriber<List<?>> subscriber = new TestSubscriber<>();
Expand Down Expand Up @@ -146,7 +147,7 @@ public void shouldStopOnHasError() throws InterruptedException {

context.owner().cancelTimer(timerId);
});
//

while (subscriber.getError() == null) {
Thread.sleep(100);
}
Expand Down

0 comments on commit 0cce482

Please sign in to comment.