You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The reason being that RxNetty starts publishing the content to a PublishSubject immediately after the HttpClientResponseonNext() call.
In the first case, the subscription to content happens after the first Observable completes which is when the content completes. In the other case, the subscription happens in the Rx chain of processing HttpClientResponse via the flatMap and hence the content is correctly subscribed.
The reason why this was done was to eliminate the need of caching the content to enable lazy subscription to the content Observable. However, this is pretty limiting because if the code happens to apply an operator that waits for onComplete of the source (returned by RxNetty.createHttpGet() in this example) like single() or .toBlocking().toFuture().get() in this case the code silently ignores the content.
Solution
We need two things to be fixed:
The main Observable, returned by RxNetty.createHttpXXX() methods and returned by HttpClient.submit() must not complete after the content is completed. It should complete when the HttpClientResponse is delivered (there can every only be 1 of these).
The content Observable must cache the content and allow lazy subscription.
Drawbacks
The main drawback of doing this is that the content has to be cached, what happens if the user never subscribes to the content?
There are two things that should be done, viz.,
Have a method ignoreContent() on HttpClientResponse that indicates that the user will never subscribe to content. This will not cache the content in the subject.
Detect such leaks (if ignoreContent() is not called and content is not subscribed) and do verbose logging.
How about HttpServerRequest?
Since RequestHandler gives a callback on HttpServerRequest and not an Observable<HttpServerRequest> this does not seem to be a big issue however it has the same problem i.e. if the content is not eagerly subscribed (inside RequestHandler.handle()) it will loose data.
The text was updated successfully, but these errors were encountered:
The final solution was around the following two classes:
HttpContentHolder applied to both HttpServerRequest and HttpServerResponse providing access to the content and also a method to ignore content.
UnicastContentSubject A subject implementation which only allows a single subscription.
The UnicastContentSubject buffers the content till a subscription arrives. It has a timeout configured at creation or updated at runtime (before subscription), on expiry of which, the subject is disposed and any subscription arriving after that will simply error out.
Change in behavior
Old Behavior
Before this fix, the the Observable returned from HttpClient.submit() used to complete after the content of the response completed.
The content could be lost (this issue) if the content was subscribed out of the onNext call of HttpClientResponse
New Behavior
After this fix, the Observable returned from HttpClient.submit() completes immediately after one callback of HttpClientResponse.
The content can be subscribed out of the onNext call of HttpClientResponse till the content timeout as described above.
HttpClientResponse
expects users to eagerly subscribe to the content when they receive theHttpClientResponse
. This means that one can not do this:Instead the following has to be used:
The reason being that RxNetty starts publishing the content to a
PublishSubject
immediately after theHttpClientResponse
onNext()
call.In the first case, the subscription to content happens after the first
Observable
completes which is when the content completes. In the other case, the subscription happens in the Rx chain of processingHttpClientResponse
via theflatMap
and hence the content is correctly subscribed.The reason why this was done was to eliminate the need of caching the content to enable lazy subscription to the content
Observable
. However, this is pretty limiting because if the code happens to apply an operator that waits foronComplete
of the source (returned byRxNetty.createHttpGet()
in this example) likesingle()
or.toBlocking().toFuture().get()
in this case the code silently ignores the content.Solution
We need two things to be fixed:
Observable
, returned byRxNetty.createHttpXXX()
methods and returned byHttpClient.submit()
must not complete after the content is completed. It should complete when theHttpClientResponse
is delivered (there can every only be 1 of these).Observable
must cache the content and allow lazy subscription.Drawbacks
The main drawback of doing this is that the content has to be cached, what happens if the user never subscribes to the content?
There are two things that should be done, viz.,
ignoreContent()
onHttpClientResponse
that indicates that the user will never subscribe to content. This will not cache the content in the subject.ignoreContent()
is not called and content is not subscribed) and do verbose logging.How about
HttpServerRequest
?Since
RequestHandler
gives a callback onHttpServerRequest
and not anObservable<HttpServerRequest>
this does not seem to be a big issue however it has the same problem i.e. if the content is not eagerly subscribed (insideRequestHandler.handle()
) it will loose data.The text was updated successfully, but these errors were encountered: