diff --git a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java index 2684325e22e0..c2010a70c149 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/topic/TopicDeleteInjector.java @@ -34,7 +34,9 @@ import io.confluent.ksql.util.ExecutorUtil.RetryBehaviour; import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * This {@code Injector} will delete the topic associated with a @@ -92,6 +94,7 @@ public ConfiguredStatement inject( final DataSource source = metastore.getSource(sourceName); if (source != null) { + checkTopicRefs(source); try { ExecutorUtil.executeWithRetries( () -> topicClient.deleteTopics(ImmutableList.of(source.getKafkaTopicName())), @@ -120,4 +123,24 @@ public ConfiguredStatement inject( return statement.withStatement(withoutDeleteText, withoutDelete); } + + private void checkTopicRefs(final DataSource source) { + final String topicName = source.getKafkaTopicName(); + final String sourceName = source.getName(); + final Map> sources = metastore.getAllDataSources(); + final String using = sources.values().stream() + .filter(s -> s.getKafkaTopicName().equals(topicName)) + .map(DataSource::getName) + .filter(name -> !sourceName.equals(name)) + .collect(Collectors.joining(", ")); + if (!using.isEmpty()) { + throw new KsqlException( + String.format( + "Refusing to delete topic. Found other data sources (%s) using topic %s", + using, + topicName + ) + ); + } + } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java index 63d0d6213014..e8a71a9c46c9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/topic/TopicDeleteInjectorTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -43,6 +44,8 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import org.junit.Before; import org.junit.Rule; @@ -55,12 +58,14 @@ @RunWith(MockitoJUnitRunner.class) public class TopicDeleteInjectorTest { + private static final String SOURCE_NAME = "SOMETHING"; + private static final String TOPIC_NAME = "something"; private static final ConfiguredStatement DROP_WITH_DELETE_TOPIC = givenStatement( "DROP STREAM SOMETHING DELETE TOPIC", - new DropStream(QualifiedName.of("SOMETHING"), false, true)); + new DropStream(QualifiedName.of(SOURCE_NAME), false, true)); private static final ConfiguredStatement DROP_WITHOUT_DELETE_TOPIC = givenStatement( "DROP STREAM SOMETHING", - new DropStream(QualifiedName.of("SOMETHING"), false, false)); + new DropStream(QualifiedName.of(SOURCE_NAME), false, false)); @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -80,8 +85,9 @@ public class TopicDeleteInjectorTest { public void setUp() { deleteInjector = new TopicDeleteInjector(metaStore, topicClient, registryClient); - when(metaStore.getSource("SOMETHING")).thenAnswer(inv -> source); - when(source.getKafkaTopicName()).thenReturn("something"); + when(metaStore.getSource(SOURCE_NAME)).thenAnswer(inv -> source); + when(source.getName()).thenReturn(SOURCE_NAME); + when(source.getKafkaTopicName()).thenReturn(TOPIC_NAME); when(source.getValueSerdeFactory()).thenReturn(new KsqlJsonSerdeFactory()); } @@ -162,6 +168,60 @@ public void shouldThrowExceptionIfSourceDoesNotExist() { deleteInjector.inject(dropStatement); } + @Test + public void shouldNotThrowIfNoOtherSourcesUsingTopic() { + // Given: + final ConfiguredStatement dropStatement = givenStatement( + "DROP SOMETHING DELETE TOPIC;", + new DropStream(QualifiedName.of(SOURCE_NAME), + true, + true) + ); + final DataSource other1 = givenSource("OTHER", "other"); + final Map> sources = new HashMap<>(); + sources.put(SOURCE_NAME, source); + sources.put("OTHER", other1); + when(metaStore.getAllDataSources()).thenReturn(sources); + + // When: + deleteInjector.inject(dropStatement); + } + + @Test + @SuppressWarnings("unchecked") + public void shouldThrowExceptionIfOtherSourcesUsingTopic() { + // Given: + final ConfiguredStatement dropStatement = givenStatement( + "DROP SOMETHING DELETE TOPIC;", + new DropStream(QualifiedName.of(SOURCE_NAME), + true, + true) + ); + final DataSource other1 = givenSource("OTHER1", TOPIC_NAME); + final DataSource other2 = givenSource("OTHER2", TOPIC_NAME); + final Map> sources = new HashMap<>(); + sources.put(SOURCE_NAME, source); + sources.put("OTHER1", other1); + sources.put("OTHER2", other2); + when(metaStore.getAllDataSources()).thenReturn(sources); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Refusing to delete topic. " + + "Found other data sources (OTHER1, OTHER2) using topic something"); + + // When: + deleteInjector.inject(dropStatement); + } + + private DataSource givenSource(final String name, final String topicName) { + final DataSource source = mock(DataSource.class); + when(source.getName()).thenReturn(name); + when(source.getKafkaTopicName()).thenReturn(topicName); + return source; + } + private static ConfiguredStatement givenStatement( final String text, final T statement