Skip to content

Commit

Permalink
BESU-128: Fix WebSocket frames handling (hyperledger#210)
Browse files Browse the repository at this point in the history
* Updated WebSocketService (handle -> textMessageHandle)
* Updated WebSocketRequestHandler to accept String instead of Buffer

Signed-off-by: Lucas Saldanha <[email protected]>
  • Loading branch information
lucassaldanha authored Nov 22, 2019
1 parent 7052fda commit beb9d0b
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.ConcurrentLinkedDeque;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.http.WebSocket;
Expand Down Expand Up @@ -70,7 +69,7 @@ public JsonRpcSuccessEvent unsubscribe(final Subscription subscription) {

private JsonRpcSuccessEvent send(final String json) {

connection.writeBinaryMessage(Buffer.buffer(json));
connection.writeTextMessage(json);

WaitUtils.waitFor(() -> assertThat(receivedResponse).isEqualTo(true));

Expand Down
12 changes: 5 additions & 7 deletions besu/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
Expand Down Expand Up @@ -320,13 +319,12 @@ private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesi
WebSocketConfiguration.DEFAULT_WEBSOCKET_HOST,
"/",
ws -> {
ws.write(
Buffer.buffer(
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}"));
ws.handler(
buffer -> {
ws.writeTextMessage(
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"]}");
ws.textMessageHandler(
payload -> {
final boolean matches =
buffer.toString().equals("{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":\"0x0\"}");
payload.equals("{\"jsonrpc\":\"2.0\",\"id\":2,\"result\":\"0x0\"}");
if (matches) {
future.complete();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ public WebSocketRequestHandler(final Vertx vertx, final Map<String, JsonRpcMetho
this.methods = methods;
}

public void handle(final String id, final Buffer buffer) {
handle(Optional.empty(), id, buffer, Optional.empty());
public void handle(final String id, final String payload) {
handle(Optional.empty(), id, payload, Optional.empty());
}

public void handle(
final Optional<AuthenticationService> authenticationService,
final String id,
final Buffer buffer,
final String payload,
final Optional<User> user) {
vertx.executeBlocking(
future -> {
final WebSocketRpcRequest request;
try {
request = buffer.toJsonObject().mapTo(WebSocketRpcRequest.class);
request = Json.decodeValue(payload, WebSocketRpcRequest.class);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ private Handler<ServerWebSocket> websocketHandler() {

LOG.debug("Websocket Connected ({})", socketAddressAsString(socketAddress));

websocket.handler(
buffer -> {
websocket.textMessageHandler(
payload -> {
LOG.debug(
"Received Websocket request {} ({})",
buffer.toString(),
payload,
socketAddressAsString(socketAddress));

AuthenticationUtils.getUser(
authenticationService,
token,
user ->
websocketRequestHandler.handle(
authenticationService, connectionId, buffer, user));
authenticationService, connectionId, payload, user));
});

websocket.closeHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.UUID;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void handlerDeliversResponseSuccessfully(final TestContext context) {
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -116,7 +115,7 @@ public void jsonDecodeFailureShouldRespondInvalidRequest(final TestContext conte
verifyZeroInteractions(jsonRpcMethodMock);
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer("")));
.completionHandler(v -> handler.handle(websocketId, ""));

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -139,7 +138,7 @@ public void objectMapperFailureShouldRespondInvalidRequest(final TestContext con
verifyZeroInteractions(jsonRpcMethodMock);
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer("{}")));
.completionHandler(v -> handler.handle(websocketId, "{}"));

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -163,7 +162,7 @@ public void absentMethodShouldRespondMethodNotFound(final TestContext context) {
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand All @@ -189,7 +188,7 @@ public void onExceptionProcessingRequestShouldRespondInternalError(final TestCon
context.assertEquals(Json.encode(expectedResponse), msg.body());
async.complete();
})
.completionHandler(v -> handler.handle(websocketId, Buffer.buffer(requestJson.toString())));
.completionHandler(v -> handler.handle(websocketId, requestJson.toString()));

async.awaitSuccess(WebSocketRequestHandlerTest.VERTX_AWAIT_TIMEOUT_MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.Lists;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
Expand Down Expand Up @@ -209,7 +208,7 @@ public void websocketServiceWithBadHeaderAuthenticationToken(final TestContext c
options,
headers,
webSocket -> {
webSocket.write(Buffer.buffer(request));
webSocket.writeTextMessage(request);

webSocket.handler(
buffer -> {
Expand Down Expand Up @@ -246,7 +245,7 @@ public void websocketServiceWithGoodHeaderAuthenticationToken(final TestContext
options,
headers,
webSocket -> {
webSocket.write(Buffer.buffer(requestSub));
webSocket.writeTextMessage(requestSub);

webSocket.handler(
buffer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
Expand All @@ -34,7 +35,9 @@
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketBase;
import io.vertx.core.http.WebSocketFrame;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
Expand Down Expand Up @@ -98,13 +101,13 @@ public void websocketServiceExecutesHandlerOnMessage(final TestContext context)
httpClient.websocket(
"/",
webSocket -> {
webSocket.write(Buffer.buffer(request));

webSocket.handler(
buffer -> {
context.assertEquals(expectedResponse, buffer.toString());
async.complete();
});

webSocket.writeTextMessage(request);
});

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
Expand Down Expand Up @@ -182,4 +185,34 @@ public void handleLoginRequestWithAuthDisabled() {
request.putHeader("Content-Type", "application/json; charset=utf-8");
request.end("{\"username\":\"user\",\"password\":\"pass\"}");
}

@Test
public void webSocketDoesNotToHandlePingPayloadAsJsonRpcRequest(final TestContext context) {
final Async async = context.async();

httpClient.webSocket(
"/",
result -> {
WebSocket websocket = result.result();

websocket.handler(
buffer -> {
final String payload = buffer.toString();
if (!payload.equals("foo")) {
context.fail("Only expected PONG response with same payload as PING request");
}
});

websocket.closeHandler(
h -> {
verifyNoInteractions(webSocketRequestHandlerSpy);
async.complete();
});

websocket.writeFrame(WebSocketFrame.pingFrame(Buffer.buffer("foo")));
websocket.close();
});

async.awaitSuccess(VERTX_AWAIT_TIMEOUT_MILLIS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.stream.Collectors;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -77,8 +76,7 @@ public void shouldAddConnectionToMap(final TestContext context) {
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_1, Buffer.buffer(Json.encode(subscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID_1, Json.encode(subscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down Expand Up @@ -119,12 +117,12 @@ public void shouldAddMultipleConnectionsToMap(final TestContext context) {
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_2, Buffer.buffer(Json.encode(subscribeRequestBody2))));
CONNECTION_ID_2, Json.encode(subscribeRequestBody2)));
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID_1, Buffer.buffer(Json.encode(subscribeRequestBody1))));
CONNECTION_ID_1, Json.encode(subscribeRequestBody1)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.HashMap;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.Json;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
Expand Down Expand Up @@ -76,8 +75,7 @@ public void shouldRemoveConnectionWithSingleSubscriptionFromMap(final TestContex
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID, Buffer.buffer(Json.encode(unsubscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID, Json.encode(unsubscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down Expand Up @@ -109,8 +107,7 @@ public void shouldRemoveSubscriptionAndKeepConnection(final TestContext context)
})
.completionHandler(
v ->
webSocketRequestHandler.handle(
CONNECTION_ID, Buffer.buffer(Json.encode(unsubscribeRequestBody))));
webSocketRequestHandler.handle(CONNECTION_ID, Json.encode(unsubscribeRequestBody)));

async.awaitSuccess(ASYNC_TIMEOUT);
}
Expand Down

0 comments on commit beb9d0b

Please sign in to comment.