Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObservableConnection.cleanupConnection() does a blocking call. #145

Closed
NiteshKant opened this issue Jun 19, 2014 · 0 comments
Closed

ObservableConnection.cleanupConnection() does a blocking call. #145

NiteshKant opened this issue Jun 19, 2014 · 0 comments
Labels
Milestone

Comments

@NiteshKant
Copy link
Member

ObservableConnection.cleanupConnection() removes the ReadTimeoutHandler, if present, from the pipeline to avoid timeouts when the pooled connection is idle.
Netty's ChannelPipeline.remove() invoked here blocks till completion if invoked from any thread but the eventloop associated with the pipeline in question.

The relevant code in netty can be found here

There will be situations in which this can cause a deadlock. Let us consider the following example

public class TestDL {

    public static void main(String[] args) {

        final HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", 9999)
                                                        .config(new RxClient.ClientConfig.Builder()
                                                                        .readTimeout(1, TimeUnit.MINUTES)
                                                                        .build())
                                                        .build();

        RxNetty.createHttpServer(9999, new RequestHandler<ByteBuf, ByteBuf>() {
            @Override
            public Observable<Void> handle(HttpServerRequest<ByteBuf> request,
                                           final HttpServerResponse<ByteBuf> response) {
                String uri = request.getUri();
                if (uri.contains("fanout")) {
                    return Observable.zip(getBackendResponse(client), getBackendResponse(client),
                                          new Func2<ByteBuf, ByteBuf, ByteBuf>() {
                                              @Override
                                              public ByteBuf call(ByteBuf byteBuf, ByteBuf byteBuf2) {
                                                  return Unpooled.copiedBuffer(byteBuf, byteBuf2);
                                              }
                                          })
                                     .map(new Func1<ByteBuf, Void>() {
                                         @Override
                                         public Void call(ByteBuf byteBuf) {
                                             response.write(byteBuf);
                                             return null;
                                         }
                                     });
                } else {
                    return response.writeStringAndFlush("Welcome to the deadlock world!");
                }
            }
        }).start();

        RxNetty.createHttpGet("http://localhost:9999/fanout")
               .flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
                   @Override
                   public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
                       return response.getContent();
                   }
               })
               .toBlocking()
               .forEach(new Action1<ByteBuf>() {
                   @Override
                   public void call(ByteBuf byteBuf) {
                       System.out.println(byteBuf.toString(Charset.defaultCharset()));
                   }
               });
    }

    private static Observable<ByteBuf> getBackendResponse(HttpClient<ByteBuf, ByteBuf> client) {
        HttpClientRequest<ByteBuf> hello = HttpClientRequest.createGet("/hello");
        return client.submit(hello)
                      .flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
                          @Override
                          public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
                              return response.getContent();
                          }
                      });
    }
}

In the above example, "/fanout" request will trigger two outbound requests, the responses of which are zipped. The zip operator at the end unsubscribes from both the sources which in turn closes the underlying connection hence causing the ReadTimeoutHandler to be removed. Since, this unsubscription will be done from one of the eventloop of the client, one of the two connection close will be on a different eventloop and hence the remove() of ReadTimeoutHandler will be submitted to the eventloop and then the code will block for completion. Now, in a scenario where a few concurrent requests are being served by this server, it may so happen that the eventloop on which remove is scheduled, is itself waiting for some other remove to finish. In a convoluted scenario, these two eventloops may actually be waiting for each other hence causing a deadlock.

@NiteshKant NiteshKant added this to the 0.3.6 milestone Jun 19, 2014
@NiteshKant NiteshKant self-assigned this Jun 19, 2014
@NiteshKant NiteshKant added the bug label Jun 19, 2014
NiteshKant pushed a commit to NiteshKant/RxNetty that referenced this issue Jun 19, 2014
ChannelPipeline.remove() is a blocking call when not called from the associated eventloop.

Now, instead of removing the handler from the pipeline, it is deactivated on close and re-activated when the pipeline is used again.
NiteshKant added a commit that referenced this issue Jun 24, 2014
Fixes issue #145 (ReadTimeoutHandler remove causes deadlock)
@NiteshKant NiteshKant removed their assignment Aug 19, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant