From 4b7d1ccfcb396632ec7d1315a18642d9a2c767b1 Mon Sep 17 00:00:00 2001 From: Tomasz Nguyen Date: Fri, 16 Apr 2021 13:39:49 +0100 Subject: [PATCH] feat: emit an error reason before closing websocket Reasons for closing websocket are limited to 123 bytes. We can, however, emit a final message before closing the socket - we're using that here to send a non-truncated version of the error to display in the UI --- .../rest/server/resources/streaming/SessionUtil.java | 9 ++++++++- .../server/resources/streaming/SessionUtilTest.java | 11 +++++++++++ .../confluent/ksql/rest/entity/KsqlRequestTest.java | 4 ++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtil.java index 0d0cedf3b2f5..8086fa0e094c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtil.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtil.java @@ -15,6 +15,8 @@ package io.confluent.ksql.rest.server.resources.streaming; +import com.google.common.collect.ImmutableMap; +import io.confluent.ksql.rest.ApiJsonMapper; import io.vertx.core.http.ServerWebSocket; import java.nio.charset.StandardCharsets; import org.slf4j.Logger; @@ -37,7 +39,12 @@ static void closeSilently( final int code, final String message) { try { - webSocket.close((short) code, truncate(message)); + final ImmutableMap finalMessage = ImmutableMap.of( + "error", + message != null ? message : "" + ); + final String json = ApiJsonMapper.INSTANCE.get().writeValueAsString(finalMessage); + webSocket.writeFinalTextFrame(json).close((short) code, truncate(message)); } catch (final Exception e) { LOG.info("Exception caught closing websocket", e); } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtilTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtilTest.java index 4b42de9c33bf..db68fd393075 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtilTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/resources/streaming/SessionUtilTest.java @@ -23,8 +23,10 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import io.vertx.core.http.ServerWebSocket; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -42,12 +44,18 @@ public class SessionUtilTest { @Captor private ArgumentCaptor reasonCaptor; + @Before + public void setUp() { + when(websocket.writeFinalTextFrame(any(String.class))).thenReturn(websocket); + } + @Test public void shouldCloseQuietly() throws Exception { // Given: doThrow(new RuntimeException("Boom")).when(websocket) .close(any(Short.class), any(String.class)); + // When: SessionUtil.closeSilently(websocket, INVALID_MESSAGE_TYPE.code(), "reason"); @@ -65,6 +73,7 @@ public void shouldNotTruncateShortReasons() throws Exception { SessionUtil.closeSilently(websocket, INVALID_MESSAGE_TYPE.code(), reason); // Then: + verify(websocket).writeFinalTextFrame(any(String.class)); verify(websocket).close(codeCaptor.capture(), reasonCaptor.capture()); assertThat(reasonCaptor.getValue(), is(reason)); } @@ -80,6 +89,7 @@ public void shouldTruncateMessageLongerThanCloseReasonAllows() throws Exception SessionUtil.closeSilently(websocket, INVALID_MESSAGE_TYPE.code(), reason); // Then: + verify(websocket).writeFinalTextFrame(any(String.class)); verify(websocket).close(codeCaptor.capture(), reasonCaptor.capture()); assertThat(reasonCaptor.getValue(), is( "A long message that is longer than the maximum size that the CloseReason class " @@ -99,6 +109,7 @@ public void shouldTruncateLongMessageWithMultiByteChars() throws Exception { SessionUtil.closeSilently(websocket, INVALID_MESSAGE_TYPE.code(), reason); // Then: + verify(websocket).writeFinalTextFrame(any(String.class)); verify(websocket).close(codeCaptor.capture(), reasonCaptor.capture()); assertThat(reasonCaptor.getValue(), is( "A long message that is longer than the maximum size that the CloseReason class will " diff --git a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 0e365e58fd77..03489aee9546 100644 --- a/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksqldb-rest-model/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -159,7 +159,7 @@ public void shouldSerializeToJson() { final String jsonRequest = serialize(A_REQUEST); // Then: - assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER)); + assertThat(deserialize(jsonRequest), is(deserialize(A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER))); } @Test @@ -168,7 +168,7 @@ public void shouldSerializeToJsonWithCommandNumber() { final String jsonRequest = serialize(A_REQUEST_WITH_COMMAND_NUMBER); // Then: - assertThat(jsonRequest, is(A_JSON_REQUEST_WITH_COMMAND_NUMBER)); + assertThat(deserialize(jsonRequest), is(deserialize(A_JSON_REQUEST_WITH_COMMAND_NUMBER))); } @Test