Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket JSR356 implementation not honoring javadoc of MessageHandler on Whole<Reader> #4475

Closed
stolsvik opened this issue Jan 14, 2020 · 9 comments
Assignees

Comments

@stolsvik
Copy link

stolsvik commented Jan 14, 2020

Jetty version
jetty-9.4.25.v20191220

Java version
1.8.0_112

OS type/version
Ubuntu 18.04.3 LTS

Description
JSR 356 WebSocket's MessageHandler's JavaDoc states "Each web socket session uses no more than one thread at a time to call its MessageHandlers. This means that, provided each message handler instance is used to handle messages for one web socket session, at most one thread at a time can be calling any of its methods."

This has also held for everything I've tried as long as the registered listener is a Whole<String>. However, since I'm parsing JSON using Jackson, it hit me that I could as well use a Whole<Reader> and feed that directly into the Jackson ObjectReader. All of a sudden, I started to get protocol errors from my library. It depends on a HELLO message, containing auth, being sent before any other.

From the logs, it seems that with Whole<Reader>, two threads are delivering messages concurrently - and they are even sometimes coming in the wrong order, which is what triggers the failure.

This is a set of log lines showing concurrent processing of messages, but at least in the right order (HELLO before SEND):
00:42:07.573 [qtp833240229-150] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [org.eclipse.jetty.websocket.common.message.MessageReader@106ebd8d] on MatsSocketSessionId [null], WebSocket SessionId:3, this:MatsSocketSession@3cdf2ae9 {} 00:42:07.573 [qtp833240229-151] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [org.eclipse.jetty.websocket.common.message.MessageReader@2dd4f197] on MatsSocketSessionId [null], WebSocket SessionId:3, this:MatsSocketSession@3cdf2ae9 {} 00:42:07.573 [qtp833240229-150] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[HELLO:NEW]->null,tid:MatsSocket_start_mJrl0b,cid:GImeQ9akra] {} 00:42:07.573 [qtp833240229-150] INFO c.s.m.w.MatsTestWebsocketServer - Resolving Authorization header to principal for header [DummyAuth:1578958947562]. {} 00:42:07.573 [qtp833240229-150] INFO c.s.m.w.impl.MatsSocketSession - MatsSocket HELLO! {matssocket.type=HELLO, matssocket.subType=NEW} 00:42:07.573 [qtp833240229-151] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[SEND]->Test.single,tid:SEND_3J3yyp,cid:null] {} 00:42:07.573 [qtp833240229-151] INFO c.s.m.w.impl.MatsSocketSession - \- SEND to:[Test.single], reply:[null], msg:[{}]. {matssocket.type=SEND}

Here's a different set where they come both concurrent and in the wrong order (SEND before HELLO), thus the SEND is triggering close of socket, even while the concurrent HELLO is being processed.
00:42:07.619 [qtp349259569-144] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [org.eclipse.jetty.websocket.common.message.MessageReader@13d88ad8] on MatsSocketSessionId [null], WebSocket SessionId:4, this:MatsSocketSession@f5142d7 {} 00:42:07.619 [qtp349259569-99] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [org.eclipse.jetty.websocket.common.message.MessageReader@4ad307b0] on MatsSocketSessionId [null], WebSocket SessionId:4, this:MatsSocketSession@f5142d7 {} 00:42:07.620 [qtp349259569-99] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[SEND]->Test.single,tid:SEND_3nj9E9,cid:null] {} 00:42:07.620 [qtp349259569-144] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[HELLO:NEW]->null,tid:MatsSocket_start_OCjlh2,cid:MFSqZtWVXL] {} 00:42:07.620 [qtp349259569-99] ERROR c.s.m.w.impl.MatsSocketSession - We have not got Authorization header! {} 00:42:07.620 [qtp349259569-144] INFO c.s.m.w.MatsTestWebsocketServer - Resolving Authorization header to principal for header [DummyAuth:1578958947613]. {} 00:42:07.620 [qtp349259569-99] INFO c.s.m.w.impl.DefaultMatsSocketServer - Closing WebSocket SessionId [4]: code: [VIOLATED_POLICY], reason:[Missing Authorization header] {} 00:42:07.620 [qtp349259569-144] INFO c.s.m.w.impl.MatsSocketSession - MatsSocket HELLO! {matssocket.type=HELLO, matssocket.subType=NEW}

By literally only changing Whole<Reader> to Whole<String> and also doing Reader->String in the onMessage method, I get these lines for the same test - notice how it is a) in order, b) the first message is finished before the next comes in, and c) it is even the same thread that does the processing for those two messages.
01:01:57.205 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [[{"t":"HELLO","clv":"MatsSocket.js,v0.8.9; User-Agent: Unknown","ts":1578960117203,"an":"TestApp","av":"1.2.3","auth":"DummyAuth:1578960137196","cid":"WTvoxdiO8Y","tid":"MatsSocket_start_mKnhFA","st":"NEW"}]] on MatsSocketSessionId [null], WebSocket SessionId:4, this:MatsSocketSession@3185190d {} 01:01:57.205 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[HELLO:NEW]->null,tid:MatsSocket_start_mKnhFA,cid:WTvoxdiO8Y] {} 01:01:57.205 [qtp1068945248-79] INFO c.s.m.w.MatsTestWebsocketServer - Resolving Authorization header to principal for header [DummyAuth:1578960137196]. {} 01:01:57.205 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - MatsSocket HELLO! {matssocket.type=HELLO, matssocket.subType=NEW} 01:01:57.223 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - WebSocket received message [[{"eid":"Test.single","tid":"SEND_g1BBep","msg":{},"t":"SEND","cmcts":1578960117196,"cmseq":0}]] on MatsSocketSessionId [M5Q5oSpeow25hjY0], WebSocket SessionId:4, this:MatsSocketSession@3185190d {matssocket.principal=Mr. Dummy Auth, matssocket.sessionId=M5Q5oSpeow25hjY0} 01:01:57.223 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - Messages: [[SEND]->Test.single,tid:SEND_g1BBep,cid:null] {matssocket.principal=Mr. Dummy Auth, matssocket.sessionId=M5Q5oSpeow25hjY0} 01:01:57.223 [qtp1068945248-79] INFO c.s.m.w.impl.MatsSocketSession - \- SEND to:[Test.single], reply:[null], msg:[{}]. {matssocket.principal=Mr. Dummy Auth, matssocket.sessionId=M5Q5oSpeow25hjY0, matssocket.type=SEND}

@lachlan-roberts lachlan-roberts self-assigned this Jan 14, 2020
@joakime
Copy link
Contributor

joakime commented Jan 14, 2020

Putting on my javax.websocket spec hat for a second ...

When you are using Whole<Reader> (or Whole<InputStream>) the messages are delivered in a dispatched scenario, meaning they are mandated (by spec) to go to a new thread for the call to onMessage(Reader message).

All of the frames that make up the Reader content can come from multiple threads, as those are not part of the call to onMessage(Reader message).
When you return from onMessage(Reader message) the only signal that you are done is when the final frame (Frame with FIN==true) is read completely from the underlying data that makes up the Reader.

The container implementation is now free to read the next frame and trigger the next onMessage(Reader message) via another thread dispatch.

The Reader interface exists for streaming content, a websocket session that opens a reader and keeps it open for a long period of time (think voip, webrtc, video games, etc).

Putting my Jetty hat now ...

If you are using Reader to access lots of small message content (each message under 40MB) then you are going to see many of these kinds of behaviors.
Plus you are needlessly adding a large amount of pressure for your websocket session. (Each message is a new thread!)

Using Whole<String> or Partial<String> messages is always going to be a better choice for a websocket session that has many messages.

Does Jetty have a bug here?

Possibly, as the strict interpretation of Whole<Reader> is that it shouldn't progress unless onMessage(Reader message) is dispatched AND the final Frame's bytes are read from the underlying stream.
However, this has proven to be VERY problematic with certain libraries (Jackson for example stops reading from the stream once the JSON is complete, never going to EOF for the stream).
As we got many many bugs about the rigid interpretation of the javax.websocket MessageHandler spec it was decided that once the final Frame (FIN==true) is sent to the underlying Stream that means the stream is done.
What happens if the Message is in a single websocket Frame that is itself FIN==true? The initial onMessage(Reader message) satisfies both the dispatch and the final frame requirement.
This has waffled over the years, having the pendulum swing back and forth between the interpretations of rigid spec and the loose spec.

TL;DR;

It comes down to, how the final frame of data for the stream is handled.
Is it "is read" (meaning EOF) as the strict interpretation suggest?
Or is it "made available" (meaning the stream has reached the final frame) as the loose interpretation suggests?

The reasons you see proper behavior with String vs Reader is ...

  • that String waits for return from onMessage(String) to process the next bytes
  • while Reader is dispatched on first frame to onMessage(Reader) which does not and cannot wait for return.

@lachlan-roberts
Copy link
Contributor

@joakime maybe we should at least block until onMessage(Reader) has returned before going on to read more frames (after we have appended the content of a fin frame + EOF). Otherwise how is the application to tell the correct order of the frames? If the messages are slow to process and fast to read we could have many calls to onMessage(Reader) running concurrently all for different messages and you don't know the order.

@joakime
Copy link
Contributor

joakime commented Jan 14, 2020

@lachlan-roberts you cannot wait for the return of onMessage(Reader) to process more frames.

As this is a valid implementation ...

    @Override
    public void onMessage(Reader reader)
    {
        JSON json = parseJson(reader);
        processJson(json);
    }

(Yes, I realize that this could be done with a Whole<JSON> too, but the point still remains about reading the stream within that dispatch)

You cannot process more frames if you wait for the onMessage(Reader reader) to return.
That kind of implementation is also why the spec mandates that you dispatch to a new thread for processing of that message event.

What if this message consists of 100 frames? you read the initial TEXT data frame (FIN==false), initiated the dispatch to onMessage(Reader reader), and now you wait for it to finish, but you cannot read while you wait.
That's why the Frame with FIN==true is critical.
It's really the only place to wait for further message processing.
But what do you wait on?
The addition of that final frame to the set of buffers that make up the stream? (the loose interpretation, and works with bad libs that don't read to EOF, like Jackson)
Or the read of that final buffer to EOF? (the strict interpretation, which never works with bad libs that don't read to EOF)

lachlan-roberts added a commit that referenced this issue Jan 14, 2020
Signed-off-by: Lachlan Roberts <[email protected]>
@lachlan-roberts
Copy link
Contributor

@joakime I mean that keep reading new frames if we don't see fin==true, but if we do see fin==true we should not read the next frame until onMessage() exits.

See my comments in 26d12b9 for a demo of the problem and a PoC fix.

@joakime
Copy link
Contributor

joakime commented Jan 14, 2020

@lachlan-roberts now do it again with fragmented messages.
(TEXT Frame [Fin=false] + CONTINUATION Frame [Fin=false] + CONTINUATION Frame [Fin=true])

lachlan-roberts added a commit that referenced this issue Jan 14, 2020
@lachlan-roberts
Copy link
Contributor

@joakime It still works for fragmented messages. 16c406b

lachlan-roberts added a commit that referenced this issue Jan 14, 2020
@stolsvik
Copy link
Author

stolsvik commented Jan 14, 2020

Well, at least the Whole<Reader> seems to work as I hoped: It streams the bytes from the actual socket? So that if the JSON in question is large (which they seldom are, but can be), I do not needlessy allocate String objects, rather getting the bytes (chars) streamed directly from the socket into the deserializer. Is this correct?

I obviously have not been that down into the details here as you guys, and might be wading completely to out of my depths, but I don't quite get the problem with the return of onMessage: If I do not read the entire message before returning, then could you not just dump the rest on the floor until you've read to the end of that message? I've obviously "had my fill", so to speak, so the rest is not of interest to me. If this was e.g. a megabyte of spaces after the final "}", the String (in Whole) would also contain that megabyte, and I would effectively not read that either.

What I feel is rather obviously a non-compliance compared to what the JavaDoc there explicitly tells me, is that there is two threads running through that method at the same time. And what is rather baffling, is that they quite often come in the opposite order of how they were sent (that log line you see in my post is literally the first operation in the method). I also do not see why it would be good to be so eager to get rid of these messages instead of queuing them, or not read more from the TCP socket, or something: They come in over a TCP pipe, in a specific order. To be able to invoke my method on two different threads in the opposite order you must have parsed a chunk from the socket that contained two messages, made a Reader out of them both, and sent off two threads to my method concurrently?

Anyway, you mentioned Whole<JSON>, which got me to think and leads me over to a question that is user support: Would the most efficient approach here be to register a TextStream-Decoder that decoded directly into my target type? Would I also then be assured that the onMessage invocations came in one by one, and in the correct order?

Wrt to return (sending/encoding), is there a difference in efficiency between using a TextStream-Encoder, and just getting the getBasicRemote().getSendWriter() and piping it in there?

(PS: It is evidently not possible to add a decoder with generics like this: class EnvelopeListDecoder implements TextStream\<List\<MatsSocketEnvelopeDto\>\>?)

@lachlan-roberts
Copy link
Contributor

@stolsvik using the stream still copies the bytes from the WebSocket frame to accumulate in the MessageInputStream, so they are not being streamed directly from the socket.

Yes if you return from onMessage the correct behaviour should be to just dump the remaining bytes until the end of the message.

There is a non-compliance compared to the JavaDoc here. Jetty is reading the messages in the right order, the problem is that it is not waiting for you to finish with the first one before reading/delivering the next one. So you can end up with multiple concurrent calls to onMessage and in your case its just a race as to which you see as first.

Using the Decoder.TextStream isn't going to be any more efficient than just doing it yourself with the Reader in onMessage.

I am proposing a fix with PR #4486, so if you want you can test with that branch and let us know if it fixes the problem for you.

lachlan-roberts added a commit that referenced this issue Jan 23, 2020
…mMessageOrder

Issue #4475 - fix WebSocket streaming message ordering
@lachlan-roberts
Copy link
Contributor

This will be fixed in the 9.4.27 release.

This is not a problem for jetty-10 as we only receive the next frame once the callback has been succeeded, and there is logic to only succeed a fin==true frames callback once onMessage has exited.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants