Skip to content

Commit

Permalink
fix: don't cleanup topics on engine close (#4658)
Browse files Browse the repository at this point in the history
Co-authored-by: Rohan <[email protected]>
Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
3 people authored and stevenpyzhang committed Mar 4, 2020
1 parent 00cb299 commit be9a97b
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public ExecuteResult execute(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ public String getSchemasDescription() {
public PersistenceSchemas getPersistenceSchemas() {
return persistenceSchemas;
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -148,14 +148,34 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
kafkaStreams.close();
doClose(true);
closeCallback.accept(this);
}

kafkaStreams.cleanUp();
protected void doClose(final boolean cleanUp) {
kafkaStreams.close(Duration.ofMillis(closeTimeout));

queryStateListener.ifPresent(QueryStateListener::close);
if (cleanUp) {
kafkaStreams.cleanUp();
}

closeCallback.accept(this);
queryStateListener.ifPresent(QueryStateListener::close);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -470,6 +471,63 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final QueryMetadata query = KsqlEngineTestUtil.executeQuery(
serviceContext,
ksqlEngine,
"select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.start();

// When:
ksqlEngine.close();

// Then:
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -57,9 +59,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -71,8 +75,13 @@ public void setup() {
topoplogy,
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
closeCallback,
closeTimeout) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -134,6 +143,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close(Duration.ofMillis(closeTimeout));
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -145,6 +172,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}

@Test
public void shouldReturnSources() {
assertThat(query.getSourceNames(), is(SOME_SOURCES));
Expand Down

0 comments on commit be9a97b

Please sign in to comment.