Skip to content

Commit

Permalink
Issue #4475 - proof of concept
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <[email protected]>
  • Loading branch information
lachlan-roberts committed Jan 14, 2020
1 parent a7b9df9 commit 26d12b9
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
Expand Down Expand Up @@ -190,7 +191,23 @@ public void onTextFrame(ByteBuffer buffer, boolean fin) throws IOException
}
else if (wrapper.wantsStreams())
{
final MessageReader stream = new MessageReader(new MessageInputStream());
final CountDownLatch completed = new CountDownLatch(1);
final MessageReader stream = new MessageReader(new MessageInputStream())
{
@Override
public void messageComplete()
{
super.messageComplete();
try
{
completed.await();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
};
activeMessage = stream;

dispatch(new Runnable()
Expand All @@ -201,6 +218,7 @@ public void run()
{
MessageHandler.Whole<Reader> handler = (Whole<Reader>)wrapper.getHandler();
handler.onMessage(stream);
completed.countDown();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import java.io.Writer;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.websocket.ClientEndpoint;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
Expand All @@ -36,11 +40,15 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -49,6 +57,8 @@ public class TextStreamTest
private static final String PATH = "/echo";
private static final String CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

private static CompletableFuture<QueuedTextStreamer> queuedTextStreamerFuture = new CompletableFuture<>();

private Server server;
private ServerConnector connector;
private WebSocketContainer wsClient;
Expand All @@ -62,8 +72,8 @@ public void prepare() throws Exception

ServletContextHandler context = new ServletContextHandler(server, "/", true, false);
ServerContainer container = WebSocketServerContainerInitializer.configureContext(context);
ServerEndpointConfig config = ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build();
container.addEndpoint(config);
container.addEndpoint(ServerEndpointConfig.Builder.create(ServerTextStreamer.class, PATH).build());
container.addEndpoint(ServerEndpointConfig.Builder.create(QueuedTextStreamer.class, "/test").build());

server.start();

Expand Down Expand Up @@ -125,6 +135,26 @@ public void testMoreThanLargestMessageOneByteAtATime() throws Exception
assertArrayEquals(data, client.getEcho());
}

@Test
public void test() throws Exception
{
URI uri = URI.create("ws://localhost:" + connector.getLocalPort() + "/test");
ClientTextStreamer client = new ClientTextStreamer();
Session session = wsClient.connectToServer(client, uri);

final int numLoops = 20;
for (int i = 0; i < numLoops; i++)
session.getBasicRemote().sendText(Integer.toString(i));

This comment has been minimized.

Copy link
@joakime

joakime Jan 14, 2020

Contributor

Now send these as a complex message.

A TEXT Frame (FIN==false) with "Attention " + i
A CONTINUATION Frame (FIN == false) with ".. you will receive message ..."
A CONTINUATION Frame (FIN == true) with " message #" + i

session.close();

QueuedTextStreamer queuedTextStreamer = queuedTextStreamerFuture.get(5, TimeUnit.SECONDS);
for (int i = 0; i < numLoops; i++)
{
String msg = queuedTextStreamer.messages.poll(5, TimeUnit.SECONDS);
assertThat(msg, Matchers.is(Integer.toString(i)));

This comment has been minimized.

Copy link
@lachlan-roberts

lachlan-roberts Jan 14, 2020

Author Contributor

@joakime currently in 9.4.x these messages from 0-19 will be received in a completely random order.

The rough changes I made to JsrEndpointEventDriver above show how this can be improved by just waiting for the thread to exit onMessage(reader) before continuing to read new frames. With these changes all the frames are received in the right order.

This comment has been minimized.

Copy link
@joakime

joakime Jan 14, 2020

Contributor

This works with simple messages, but not fragmented ones.

}
}

private char[] randomChars(int size)
{
char[] data = new char[size];
Expand Down Expand Up @@ -183,4 +213,34 @@ public void echo(Session session, Reader input) throws IOException
}
}
}

public static class QueuedTextStreamer extends Endpoint implements MessageHandler.Whole<Reader>
{
private BlockingArrayQueue<String> messages = new BlockingArrayQueue<>();

public QueuedTextStreamer()
{
queuedTextStreamerFuture.complete(this);
}

@Override
public void onOpen(Session session, EndpointConfig config)
{
session.addMessageHandler(this);
}

@Override
public void onMessage(Reader input)
{
try
{
Thread.sleep(Math.abs(new Random().nextLong() % 200));

This comment has been minimized.

Copy link
@joakime

joakime Jan 14, 2020

Contributor

Get rid of this Thread.sleep!

This comment has been minimized.

Copy link
@lachlan-roberts

lachlan-roberts Jan 14, 2020

Author Contributor

If I remove this Thread.sleep I get the correct order of frames received when I remove the fix. This seems to enough delay and randomness to jumble the order the frames are received.

messages.add(IO.toString(input));

This comment has been minimized.

Copy link
@joakime

joakime Jan 14, 2020

Contributor

Make a variant of this test where you do not read to EOF.

}
catch (Exception e)
{
e.printStackTrace();
}
}
}
}

0 comments on commit 26d12b9

Please sign in to comment.