diff --git a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java index 7674d1e02d62..92c2fc4811dc 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/materialization/ks/KsStateStoreTest.java @@ -65,7 +65,7 @@ public class KsStateStoreTest { .build(); @Rule - public final Timeout timeout = Timeout.seconds(1); + public final Timeout timeout = Timeout.seconds(10); @Rule public final ExpectedException expectedException = ExpectedException.none(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java index 56607d7a65d5..b7801a373773 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java @@ -40,9 +40,11 @@ import io.confluent.ksql.util.TransientQueryMetadata; import io.confluent.ksql.version.metrics.ActivenessRegistrar; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -232,24 +234,29 @@ private Response handlePrintTopic( final PreparedStatement statement ) { final PrintTopic printTopic = statement.getStatement(); - final String topicName = printTopic.getTopic().toString(); + final String topicName = printTopic.getTopic(); if (!serviceContext.getTopicClient().isTopicExists(topicName)) { - String reverseSuggestion = ""; - final String nameReversedCase = reverseCase(topicName); - if (serviceContext.getTopicClient().isTopicExists(nameReversedCase)) { - reverseSuggestion = "Did you mean '" + nameReversedCase + "'?" + System.lineSeparator(); - } + final Collection possibleAlternatives = + findPossibleTopicMatches(topicName, serviceContext); + + final String reverseSuggestion = possibleAlternatives.isEmpty() + ? "" + : possibleAlternatives.stream() + .map(name -> "\tprint " + name + ";") + .collect(Collectors.joining( + System.lineSeparator(), + System.lineSeparator() + "Did you mean:" + System.lineSeparator(), + "" + )); + throw new KsqlRestException( - Errors.badRequest(String.format( - "Could not find topic '%s', " - + "or the KSQL user does not have permissions to list the topic." - + System.lineSeparator() + Errors.badRequest( + "Could not find topic '" + topicName + "', " + + "or the KSQL user does not have permissions to list the topic. " + + "Topic names are case-sensitive." + reverseSuggestion - + "KSQL will treat unquoted topic names as uppercase." - + System.lineSeparator() - + "To print a case-sensitive topic use quotes, for example: print \'Topic\';", - topicName))); + )); } final Map propertiesWithOverrides = @@ -267,17 +274,13 @@ private Response handlePrintTopic( return Response.ok().entity(topicStreamWriter).build(); } - private String reverseCase(final String str) { - final char[] chars = str.toCharArray(); - for (int i = 0; i < chars.length; i++) { - final char c = chars[i]; - if (Character.isUpperCase(c)) { - chars[i] = Character.toLowerCase(c); - } else { - chars[i] = Character.toUpperCase(c); - } - } - return new String(chars); + private static Collection findPossibleTopicMatches( + final String topicName, + final ServiceContext serviceContext + ) { + return serviceContext.getTopicClient().listTopicNames().stream() + .filter(name -> name.equalsIgnoreCase(topicName)) + .collect(Collectors.toSet()); } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java index 6f4f424dd22c..5b6fdf1cc79e 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResourceTest.java @@ -20,23 +20,21 @@ import static io.confluent.ksql.rest.entity.KsqlErrorMessageMatchers.errorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionErrorMessage; import static io.confluent.ksql.rest.server.resources.KsqlRestExceptionMatchers.exceptionStatusCode; -import static org.easymock.EasyMock.anyLong; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.mock; -import static org.easymock.EasyMock.niceMock; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext.ExecuteResult; import io.confluent.ksql.engine.KsqlEngine; @@ -47,8 +45,6 @@ import io.confluent.ksql.parser.tree.PrintTopic; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; -import io.confluent.ksql.planner.PlanSourceExtractorVisitor; -import io.confluent.ksql.planner.plan.OutputNode; import io.confluent.ksql.rest.Errors; import io.confluent.ksql.rest.entity.KsqlErrorMessage; import io.confluent.ksql.rest.entity.KsqlRequest; @@ -87,10 +83,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.Mock; -import org.easymock.MockType; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus.Code; import org.hamcrest.Matchers; import org.junit.Before; @@ -98,8 +91,10 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; -@RunWith(EasyMockRunner.class) +@RunWith(MockitoJUnitRunner.class) public class StreamedQueryResourceTest { private static final Duration DISCONNECT_CHECK_INTERVAL = Duration.ofMillis(1000); @@ -115,38 +110,34 @@ public class StreamedQueryResourceTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); - - @Mock(MockType.NICE) + @Mock private KsqlEngine mockKsqlEngine; - @Mock(MockType.NICE) + @Mock private ServiceContext serviceContext; - @Mock(MockType.NICE) + @Mock private KafkaTopicClient mockKafkaTopicClient; - @Mock(MockType.NICE) + @Mock private StatementParser mockStatementParser; @Mock private CommandQueue commandQueue; - @Mock(MockType.NICE) + @Mock private ActivenessRegistrar activenessRegistrar; @Mock private Consumer queryCloseCallback; - @Mock(MockType.NICE) + @Mock private KsqlAuthorizationValidator authorizationValidator; private StreamedQueryResource testResource; - private final static String queryString = "SELECT * FROM test_stream;"; + private final static String queryString = "SELECT * FROM test_stream EMIT CHANGES;"; private final static String printString = "Print TEST_TOPIC;"; private final static String topicName = "test_stream"; private PreparedStatement statement; @Before public void setup() { - expect(serviceContext.getTopicClient()).andReturn(mockKafkaTopicClient); - expect(mockKsqlEngine.hasActiveQueries()).andReturn(false); + when(serviceContext.getTopicClient()).thenReturn(mockKafkaTopicClient); statement = PreparedStatement.of("s", mock(Statement.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - replay(mockKsqlEngine, mockStatementParser); + when(mockStatementParser.parseSingleStatement(queryString)).thenReturn(statement); testResource = new StreamedQueryResource( mockKsqlEngine, @@ -199,11 +190,8 @@ public void shouldThrowOnHandleStatementIfNotConfigured() { @Test public void shouldReturn400OnBadStatement() { // Given: - reset(mockStatementParser); - expect(mockStatementParser.parseSingleStatement(anyString())) - .andThrow(new IllegalArgumentException("some error message")); - - replay(mockStatementParser); + when(mockStatementParser.parseSingleStatement(any())) + .thenThrow(new IllegalArgumentException("some error message")); // Expect expectedException.expect(KsqlRestException.class); @@ -220,10 +208,7 @@ public void shouldReturn400OnBadStatement() { } @Test - public void shouldNotWaitIfCommandSequenceNumberSpecified() { - // Given: - replay(commandQueue); - + public void shouldNotWaitIfCommandSequenceNumberSpecified() throws Exception { // When: testResource.streamQuery( serviceContext, @@ -231,17 +216,11 @@ public void shouldNotWaitIfCommandSequenceNumberSpecified() { ); // Then: - verify(commandQueue); + verify(commandQueue, never()).ensureConsumedPast(anyLong(), any()); } @Test public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { - // Given: - commandQueue.ensureConsumedPast(eq(3L), anyObject()); - expectLastCall(); - - replay(commandQueue); - // When: testResource.streamQuery( serviceContext, @@ -249,17 +228,15 @@ public void shouldWaitIfCommandSequenceNumberSpecified() throws Exception { ); // Then: - verify(commandQueue); + verify(commandQueue).ensureConsumedPast(eq(3L), any()); } @Test public void shouldReturnServiceUnavailableIfTimeoutWaitingForCommandSequenceNumber() throws Exception { // Given: - commandQueue.ensureConsumedPast(anyLong(), anyObject()); - expectLastCall().andThrow(new TimeoutException("whoops")); - - replay(commandQueue); + doThrow(new TimeoutException("whoops")) + .when(commandQueue).ensureConsumedPast(anyLong(), any()); // Expect expectedException.expect(KsqlRestException.class); @@ -308,27 +285,12 @@ public void shouldStreamRowsCorrectly() throws Throwable { rowQueuePopulatorThread.start(); final KafkaStreams mockKafkaStreams = mock(KafkaStreams.class); - mockKafkaStreams.start(); - expectLastCall(); - mockKafkaStreams.setUncaughtExceptionHandler(anyObject(Thread.UncaughtExceptionHandler.class)); - expectLastCall(); - mockKafkaStreams.cleanUp(); - expectLastCall(); - mockKafkaStreams.close(); - expectLastCall(); - - final OutputNode mockOutputNode = niceMock(OutputNode.class); - expect(mockOutputNode.accept(anyObject(PlanSourceExtractorVisitor.class), anyObject())) - .andReturn(null); final Map requestStreamsProperties = Collections.emptyMap(); - reset(mockStatementParser); statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - - reset(mockKsqlEngine); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); final TransientQueryMetadata transientQueryMetadata = new TransientQueryMetadata( @@ -344,12 +306,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { Collections.emptyMap(), Collections.emptyMap(), queryCloseCallback); - reset(mockOutputNode); - expect(mockKsqlEngine.execute(serviceContext, - ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) - .andReturn(ExecuteResult.of(transientQueryMetadata)); - replay(mockKsqlEngine, mockStatementParser, mockKafkaStreams, mockOutputNode); + when(mockKsqlEngine.execute(serviceContext, + ConfiguredStatement.of(statement, requestStreamsProperties, VALID_CONFIG))) + .thenReturn(ExecuteResult.of(transientQueryMetadata)); final Response response = testResource.streamQuery( @@ -398,7 +358,10 @@ public void shouldStreamRowsCorrectly() throws Throwable { rowQueuePopulatorThread.join(); // Definitely want to make sure that the Kafka Streams instance has been closed and cleaned up - verify(mockKafkaStreams); + verify(mockKafkaStreams).start(); + verify(mockKafkaStreams).setUncaughtExceptionHandler(any()); + verify(mockKafkaStreams).cleanUp(); + verify(mockKafkaStreams).close(); // If one of the other threads has somehow managed to throw an exception without breaking things up until this // point, we throw that exception now in the main thread and cause the test to fail @@ -470,35 +433,26 @@ public void write(final int b) throws IOException { @Test public void shouldUpdateTheLastRequestTime() { - // Given: - activenessRegistrar.updateLastRequestTime(); - EasyMock.expectLastCall(); - - EasyMock.replay(activenessRegistrar); - - // When: + /// When: testResource.streamQuery( serviceContext, new KsqlRequest(queryString, Collections.emptyMap(), null) ); // Then: - EasyMock.verify(activenessRegistrar); + verify(activenessRegistrar).updateLastRequestTime(); } @Test public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); - replay(mockStatementParser, authorizationValidator); + doThrow( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -518,19 +472,13 @@ public void shouldReturnForbiddenKafkaAccessIfKsqlTopicAuthorizationException() @Test public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("query", mock(Query.class)); - expect(mockStatementParser.parseSingleStatement(queryString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlException( - "", - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName) - ))); - - replay(mockStatementParser, authorizationValidator); + when(mockStatementParser.parseSingleStatement(queryString)) + .thenReturn(statement); + doThrow(new KsqlException( + "", + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName)))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -550,18 +498,15 @@ public void shouldReturnForbiddenKafkaAccessIfRootCauseKsqlTopicAuthorizationExc } @Test - public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() throws Exception { + public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationException() { // Given: - reset(mockStatementParser, authorizationValidator); - statement = PreparedStatement.of("print", mock(PrintTopic.class)); - expect(mockStatementParser.parseSingleStatement(printString)) - .andReturn(statement); - authorizationValidator.checkAuthorization(anyObject(), anyObject(), anyObject()); - expectLastCall().andThrow( - new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))); + when(mockStatementParser.parseSingleStatement(printString)) + .thenReturn(statement); - replay(mockStatementParser, authorizationValidator); + doThrow( + new KsqlTopicAuthorizationException(AclOperation.READ, Collections.singleton(topicName))) + .when(authorizationValidator).checkAuthorization(any(), any(), any()); // When: final Response response = testResource.streamQuery( @@ -575,4 +520,47 @@ public void shouldReturnForbiddenKafkaAccessIfPrintTopicKsqlTopicAuthorizationEx assertEquals(response.getStatus(), expected.getStatus()); assertEquals(response.getEntity(), expected.getEntity()); } + + @Test + public void shouldSuggestAlternativesIfPrintTopicDoesNotExist() { + // Given: + final PrintTopic cmd = mock(PrintTopic.class); + when(cmd.getTopic()).thenReturn("TEST_TOPIC"); + statement = PreparedStatement.of("print", cmd); + when(mockStatementParser.parseSingleStatement(any())) + .thenReturn(statement); + + when(mockKafkaTopicClient.isTopicExists(any())).thenReturn(false); + when(mockKafkaTopicClient.listTopicNames()).thenReturn(ImmutableSet.of( + "aTopic", + "test_topic", + "Test_Topic" + )); + + // Then: + expectedException.expect(KsqlRestException.class); + expectedException.expect(exceptionStatusCode(is(HttpStatus.Code.BAD_REQUEST))); + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString( + "Could not find topic 'TEST_TOPIC', " + + "or the KSQL user does not have permissions to list the topic. " + + "Topic names are case-sensitive." + + System.lineSeparator() + + "Did you mean:" + )))); + + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString("\tprint test_topic;" + )))); + + expectedException.expect(exceptionErrorMessage( + errorMessage(containsString("\tprint Test_Topic;" + )))); + + // When: + testResource.streamQuery( + serviceContext, + new KsqlRequest(printString, Collections.emptyMap(), null) + ); + } }