Skip to content

Commit

Permalink
[websocket] send messages with a queue each after each (#638)
Browse files Browse the repository at this point in the history
Fixes #632

Co-authored-by: Jonah Graham <[email protected]>
  • Loading branch information
ivy-lli and jonahgraham authored Jun 28, 2022
1 parent 286d022 commit 19500c5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
package org.eclipse.lsp4j.websocket.jakarta;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

import jakarta.websocket.SendHandler;
import jakarta.websocket.SendResult;
import jakarta.websocket.Session;

import org.eclipse.lsp4j.jsonrpc.JsonRpcException;
Expand All @@ -31,6 +35,8 @@ public class WebSocketMessageConsumer implements MessageConsumer {

private final Session session;
private final MessageJsonHandler jsonHandler;
private final ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
private final WebSocketSendHandler handler = new WebSocketSendHandler();

public WebSocketMessageConsumer(Session session, MessageJsonHandler jsonHandler) {
this.session = session;
Expand All @@ -55,7 +61,8 @@ protected void sendMessage(String message) throws IOException {
if (session.isOpen()) {
int length = message.length();
if (length <= session.getMaxTextMessageBufferSize()) {
session.getAsyncRemote().sendText(message);
messageQueue.add(message);
handler.handleNextMessage();
} else {
int currentOffset = 0;
while (currentOffset < length) {
Expand All @@ -69,4 +76,20 @@ protected void sendMessage(String message) throws IOException {
}
}

private class WebSocketSendHandler implements SendHandler {
private final AtomicBoolean isSending = new AtomicBoolean();

@Override
public void onResult(SendResult result) {
isSending.set(false);
handleNextMessage();
}

void handleNextMessage() {
if (session.isOpen() && !messageQueue.isEmpty() && isSending.compareAndSet(false, true)) {
session.getAsyncRemote().sendText(messageQueue.poll(), this);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public void testNotifications() throws Exception {
Assert.assertEquals("123456", client.result);
}

@Test
public void testManyConcurrentNotifications() throws Exception {
String expectedResult = "";
for (char i = 'a'; i <= 'z'; i ++) {
String x = Character.toString(i);
client.server.notify(x);
expectedResult += x;
}
int expectedResultLenght = expectedResult.length();
try {
await(() -> server.result.length() == expectedResultLenght);
} catch (Error e) {
// discard this error so that the nice error displays in the assertEquals
}
Assert.assertEquals(expectedResult, server.result);
}

@Test
public void testChunkedNotification() throws Exception {
StringBuilder messageBuilder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
package org.eclipse.lsp4j.websocket;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;

import org.eclipse.lsp4j.jsonrpc.JsonRpcException;
Expand All @@ -30,6 +34,8 @@ public class WebSocketMessageConsumer implements MessageConsumer {

private final Session session;
private final MessageJsonHandler jsonHandler;
private final ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>();
private final WebSocketSendHandler handler = new WebSocketSendHandler();

public WebSocketMessageConsumer(Session session, MessageJsonHandler jsonHandler) {
this.session = session;
Expand All @@ -54,7 +60,8 @@ protected void sendMessage(String message) throws IOException {
if (session.isOpen()) {
int length = message.length();
if (length <= session.getMaxTextMessageBufferSize()) {
session.getAsyncRemote().sendText(message);
messageQueue.add(message);
handler.handleNextMessage();
} else {
int currentOffset = 0;
while (currentOffset < length) {
Expand All @@ -68,4 +75,20 @@ protected void sendMessage(String message) throws IOException {
}
}

private class WebSocketSendHandler implements SendHandler {
private final AtomicBoolean isSending = new AtomicBoolean();

@Override
public void onResult(SendResult result) {
isSending.set(false);
handleNextMessage();
}

void handleNextMessage() {
if (session.isOpen() && !messageQueue.isEmpty() && isSending.compareAndSet(false, true)) {
session.getAsyncRemote().sendText(messageQueue.poll(), this);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public void testNotifications() throws Exception {
Assert.assertEquals("123456", client.result);
}

@Test
public void testManyConcurrentNotifications() throws Exception {
String expectedResult = "";
for (char i = 'a'; i <= 'z'; i ++) {
String x = Character.toString(i);
client.server.notify(x);
expectedResult += x;
}
int expectedResultLenght = expectedResult.length();
try {
await(() -> server.result.length() == expectedResultLenght);
} catch (Error e) {
// discard this error so that the nice error displays in the assertEquals
}
Assert.assertEquals(expectedResult, server.result);
}

@Test
public void testChunkedNotification() throws Exception {
StringBuilder messageBuilder = new StringBuilder();
Expand Down

0 comments on commit 19500c5

Please sign in to comment.