From 3722fbff6a33fb3c894ff93389a91bb2eb9a78c2 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Sun, 16 Sep 2018 09:13:08 -0700 Subject: [PATCH 1/9] fix #30 Strips query parameters from template matching (#433) --- .../ipc/netty/http/server/HttpPredicate.java | 15 +++++++- .../http/server/UriPathTemplateTest.java | 37 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpPredicate.java b/src/main/java/reactor/ipc/netty/http/server/HttpPredicate.java index 61130c6ff5..c59a84f11a 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpPredicate.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpPredicate.java @@ -266,13 +266,23 @@ static final class UriPathTemplate { private final Pattern uriPattern; + static String filterQueryParams(String uri) { + int hasQuery = uri.lastIndexOf("?"); + if (hasQuery != -1) { + return uri.substring(0, hasQuery); + } + else { + return uri; + } + } + /** * Creates a new {@code UriPathTemplate} from the given {@code uriPattern}. * * @param uriPattern The pattern to be used by the template */ - public UriPathTemplate(String uriPattern) { - String s = "^" + uriPattern; + UriPathTemplate(String uriPattern) { + String s = "^" + filterQueryParams(uriPattern); Matcher m = NAME_SPLAT_PATTERN.matcher(s); while (m.find()) { @@ -347,6 +357,7 @@ final Map match(String uri) { } private Matcher matcher(String uri) { + uri = filterQueryParams(uri); Matcher m = matchers.get(uri); if (null == m) { m = uriPattern.matcher(uri); diff --git a/src/test/java/reactor/ipc/netty/http/server/UriPathTemplateTest.java b/src/test/java/reactor/ipc/netty/http/server/UriPathTemplateTest.java index bdab207f9b..7ff7a352bf 100644 --- a/src/test/java/reactor/ipc/netty/http/server/UriPathTemplateTest.java +++ b/src/test/java/reactor/ipc/netty/http/server/UriPathTemplateTest.java @@ -27,9 +27,46 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasEntry; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; public class UriPathTemplateTest { + @Test + public void patternShouldMatchPathWithOnlyLetters() { + UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}"); + // works as expected + assertThat(uriPathTemplate.match("/test/1").get("order"), is("1")); + } + + @Test + public void patternShouldMatchPathWithDots() { + UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}"); + // does not match, the dot in the segment parameter breaks matching + // expected: a map containing {"order": "2.0"}, found: empty map + assertThat(uriPathTemplate.match("/test/2.0").get("order"), is("2.0")); + } + + @Test + public void staticPatternShouldMatchPathWithQueryParams() { + UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/3"); + // does not match, the query parameter breaks matching + // expected: true, found: false + assertTrue(uriPathTemplate.matches("/test/3?q=reactor")); +// assertThat(uriPathTemplate.matches("/test/3?q=reactor"), is(true)); + } + + @Test + public void parameterizedPatternShouldMatchPathWithQueryParams() { + UriPathTemplate uriPathTemplate = new UriPathTemplate("/test/{order}"); + // does not match, the query parameter breaks matching + // expected: a map containing {"order": "3"}, found: a map containing {"order": "3?q=reactor"} + assertEquals("3", + uriPathTemplate.match("/test/3?q=reactor") + .get("order")); +// assertThat(uriPathTemplate.match("/test/3?q=reactor").get("order"), is("3")); + } + @Test public void staticPathShouldBeMatched() { UriPathTemplate template = new UriPathTemplate("/comments"); From 8aad03d84c50abdd90f429ad4413b9ac564abfaa Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Tue, 16 Oct 2018 14:57:32 -0700 Subject: [PATCH 2/9] Defer Pooled connection operation creation to after channelActive This is an attempt to fix an out of order http client request issue when SSL is enabled. --- .../channel/PooledClientContextHandler.java | 3 - .../ipc/netty/http/client/HttpClient.java | 13 ++++- .../java/reactor/ipc/netty/tcp/TcpClient.java | 56 ++++++++++++++++++- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java b/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java index 31368b6b92..7a1f96dc95 100644 --- a/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java @@ -199,9 +199,6 @@ final void connectOrAcquire(CHANNEL c) { if (log.isDebugEnabled()) { log.debug(format(c, "Acquired active channel")); } - if (createOperations(c, null) == null) { - setFuture(pool.acquire()); - } } diff --git a/src/main/java/reactor/ipc/netty/http/client/HttpClient.java b/src/main/java/reactor/ipc/netty/http/client/HttpClient.java index 8fe61c3e11..612235e614 100644 --- a/src/main/java/reactor/ipc/netty/http/client/HttpClient.java +++ b/src/main/java/reactor/ipc/netty/http/client/HttpClient.java @@ -38,7 +38,6 @@ import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.logging.LoggingHandler; - import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @@ -405,7 +404,7 @@ protected ContextHandler doHandler(BiFunction onSetup) { - return ContextHandler.newClientContext(sink, + ContextHandler h = ContextHandler.newClientContext(sink, options, loggingHandler, secure, @@ -416,7 +415,15 @@ protected ContextHandler doHandler(BiFunction newHandler(BiFunction { + if (f.isSuccess()) { + Channel c = (Channel) f.getNow(); + if (!c.isOpen()) return; + + c.attr(ACTIVE) + .get() + .subscribe(null, null, () -> { + if (c.eventLoop() + .inEventLoop()) { + contextHandler.createOperations( + c, + null); + } + else { + c.eventLoop() + .execute(() -> contextHandler.createOperations( + c, + null)); + } + }); + + } + })); } }); } @@ -216,7 +244,7 @@ protected ContextHandler doHandler(BiFunction onSetup) { - return ContextHandler.newClientContext(sink, + ContextHandler h = ContextHandler.newClientContext(sink, options, loggingHandler, secure, @@ -224,13 +252,35 @@ protected ContextHandler doHandler(BiFunction ChannelOperations.bind(ch, handler, c)); + + if (handler == null) { + h.onPipeline(ACTIVE_CONFIGURATOR); + } + + return h; } - protected static final ChannelOperations.OnNew EMPTY = (a,b,c) -> null; static final LoggingHandler loggingHandler = new LoggingHandler(TcpClient.class); + static final AttributeKey> ACTIVE = AttributeKey.valueOf( + "$POOLED_ACTIVE_EVENT_DISPATCHER"); + + protected static final BiConsumer> ACTIVE_CONFIGURATOR = (p, h) -> { + p.channel() + .attr(ACTIVE) + .compareAndSet(null, DirectProcessor.create()); + }; + + protected static final ChannelOperations.OnNew EMPTY = (a,b,c) -> { + a.attr(ACTIVE) + .get() + .onComplete(); + + return null; + }; + public static final class Builder { private Consumer> options; From b6c198316925f7a23ba9dddf7b6f6e6e6db41696 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Mon, 22 Oct 2018 16:41:46 -0700 Subject: [PATCH 3/9] Try race fix --- .../ipc/netty/channel/ContextHandler.java | 11 +++- .../java/reactor/ipc/netty/tcp/TcpClient.java | 55 ++++++++++++++----- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/channel/ContextHandler.java b/src/main/java/reactor/ipc/netty/channel/ContextHandler.java index 854b2f9322..3e60a6e005 100644 --- a/src/main/java/reactor/ipc/netty/channel/ContextHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/ContextHandler.java @@ -243,8 +243,13 @@ public final ContextHandler autoCreateOperations(boolean autoCreateOper } } - channel.pipeline() - .get(ChannelOperationsHandler.class).lastContext = this; + ChannelOperationsHandler h = channel.pipeline() + .get(ChannelOperationsHandler.class); + + if (h == null) { + return null; + } + h.lastContext = this; channel.eventLoop().execute(op::onHandlerStart); } @@ -300,7 +305,7 @@ protected void doStarted(Channel channel) { } @Override - protected void initChannel(CHANNEL ch) throws Exception { + protected void initChannel(CHANNEL ch) { accept(ch); } diff --git a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java index 85aa6b92bf..8148923137 100644 --- a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java +++ b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java @@ -40,6 +40,7 @@ import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.channel.AbortedException; import reactor.ipc.netty.channel.ChannelOperations; import reactor.ipc.netty.channel.ContextHandler; import reactor.ipc.netty.options.ClientOptions; @@ -202,24 +203,13 @@ protected Mono newHandler(BiFunction { if (f.isSuccess()) { Channel c = (Channel) f.getNow(); - if (!c.isOpen()) return; + + ActiveChannelOperationFactory acof = + new ActiveChannelOperationFactory(contextHandler, c, sink); c.attr(ACTIVE) .get() - .subscribe(null, null, () -> { - if (c.eventLoop() - .inEventLoop()) { - contextHandler.createOperations( - c, - null); - } - else { - c.eventLoop() - .execute(() -> contextHandler.createOperations( - c, - null)); - } - }); + .subscribe(null, acof, acof); } })); @@ -227,6 +217,41 @@ protected Mono newHandler(BiFunction { + + final ContextHandler contextHandler; + final Channel c; + final MonoSink sink; + + ActiveChannelOperationFactory(ContextHandler contextHandler, + Channel c, + MonoSink sink) { + this.sink = sink; + this.contextHandler = contextHandler; + this.c = c; + } + + @Override + public void accept(Throwable throwable) { + sink.error(throwable); + } + + @Override + public void run() { + if (c.eventLoop() + .inEventLoop()) { + if (contextHandler.createOperations(c, null) == null) { + sink.error(new AbortedException("Connection has been closed")); + } + } + else { + c.eventLoop() + .execute(this); + } + } + } + /** * Create a {@link ContextHandler} for {@link Bootstrap#handler()} * From ff9a8b382e3cb5bf7e6fa28039fdd8913c0351cc Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 26 Oct 2018 07:28:41 -0700 Subject: [PATCH 4/9] Do not propagate an exception - just close without error --- src/main/java/reactor/ipc/netty/tcp/TcpClient.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java index 8148923137..15e9bfd398 100644 --- a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java +++ b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java @@ -40,7 +40,6 @@ import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; -import reactor.ipc.netty.channel.AbortedException; import reactor.ipc.netty.channel.ChannelOperations; import reactor.ipc.netty.channel.ContextHandler; import reactor.ipc.netty.options.ClientOptions; @@ -234,7 +233,7 @@ static final class ActiveChannelOperationFactory implements Runnable, @Override public void accept(Throwable throwable) { - sink.error(throwable); + sink.success(); } @Override @@ -242,7 +241,7 @@ public void run() { if (c.eventLoop() .inEventLoop()) { if (contextHandler.createOperations(c, null) == null) { - sink.error(new AbortedException("Connection has been closed")); + sink.success(); } } else { From 5a8fe36dfaee4bf5b97a154c020b3d21a36d47a4 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 26 Oct 2018 15:26:05 -0700 Subject: [PATCH 5/9] Revert error drop --- src/main/java/reactor/ipc/netty/tcp/TcpClient.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java index 15e9bfd398..7041107106 100644 --- a/src/main/java/reactor/ipc/netty/tcp/TcpClient.java +++ b/src/main/java/reactor/ipc/netty/tcp/TcpClient.java @@ -40,6 +40,7 @@ import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.channel.AbortedException; import reactor.ipc.netty.channel.ChannelOperations; import reactor.ipc.netty.channel.ContextHandler; import reactor.ipc.netty.options.ClientOptions; @@ -233,7 +234,7 @@ static final class ActiveChannelOperationFactory implements Runnable, @Override public void accept(Throwable throwable) { - sink.success(); + sink.error(throwable); } @Override @@ -241,7 +242,7 @@ public void run() { if (c.eventLoop() .inEventLoop()) { if (contextHandler.createOperations(c, null) == null) { - sink.success(); + sink.error(new AbortedException("Failed to acquire")); } } else { From d33cb2c74f9b81a511f2c341ae75e9f952d3d797 Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 26 Oct 2018 15:38:25 -0700 Subject: [PATCH 6/9] update netty version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 33c68da506..ca5ee902b8 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,7 @@ ext { assertJVersion = '3.6.1' // Libraries - nettyVersion = '4.1.29.Final' + nettyVersion = '4.1.30.Final' jacksonDatabindVersion = '2.5.1' // Testing From f3da469eb717ca10773b81a1b58f9ea61c263a2c Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Fri, 26 Oct 2018 15:56:11 -0700 Subject: [PATCH 7/9] Do not re-acquire (loss of original listener) - leave aborted return --- .../reactor/ipc/netty/channel/PooledClientContextHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java b/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java index 7a1f96dc95..c05602cbc2 100644 --- a/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java +++ b/src/main/java/reactor/ipc/netty/channel/PooledClientContextHandler.java @@ -190,9 +190,9 @@ final void connectOrAcquire(CHANNEL c) { if (!c.isActive()) { if (log.isDebugEnabled()) { - log.debug(format(c, "Immediately aborted pooled channel, re-acquiring new channel")); + log.debug(format(c, "Immediately aborted pooled channel")); } - setFuture(pool.acquire()); +// setFuture(pool.acquire()); return; } From 962e03d13b8dbffbfea0c444452ddc327c26b406 Mon Sep 17 00:00:00 2001 From: Spring Buildmaster Date: Fri, 26 Oct 2018 23:05:15 +0000 Subject: [PATCH 8/9] [artifactory-release] Release version 0.7.11.RELEASE --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index d3f121eea4..63b1455cca 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.7.11.BUILD-SNAPSHOT \ No newline at end of file +version=0.7.11.RELEASE \ No newline at end of file From 6a5032b25ff7fcfd285190670f91c2cd0fe7e757 Mon Sep 17 00:00:00 2001 From: Spring Buildmaster Date: Fri, 26 Oct 2018 23:05:20 +0000 Subject: [PATCH 9/9] [artifactory-release] Next development version 0.7.12.BUILD-SNAPSHOT --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 63b1455cca..07bdba9750 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.7.11.RELEASE \ No newline at end of file +version=0.7.12.BUILD-SNAPSHOT \ No newline at end of file