Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the new Transport API for TCP/UDP/HTTP implementations #1046

Merged
merged 14 commits into from
Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/docs/asciidoc/tcp-client.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ The following example uses `ProxyProvider`:
[source,java]
----
import reactor.netty.Connection;
import reactor.netty.tcp.ProxyProvider;
import reactor.netty.transport.ProxyProvider;
import reactor.netty.tcp.TcpClient;

public class Application {
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/reactor/netty/ChannelBindException.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,24 @@ public class ChannelBindException extends RuntimeException {
/**
* Build a {@link ChannelBindException}
*
* @param localAddress the local address
* @param bindAddress the local address
* @param cause the root cause
* @return a new {@link ChannelBindException}
* @since 0.9.7
*/
public static ChannelBindException fail(SocketAddress localAddress, @Nullable Throwable cause) {
Objects.requireNonNull(localAddress, "localAddress");
public static ChannelBindException fail(SocketAddress bindAddress, @Nullable Throwable cause) {
Objects.requireNonNull(bindAddress, "bindAddress");
if (cause instanceof java.net.BindException ||
// With epoll/kqueue transport it is
// io.netty.channel.unix.Errors$NativeIoException: bind(..) failed: Address already in use
(cause instanceof IOException && cause.getMessage() != null &&
cause.getMessage().contains("Address already in use"))) {
cause = null;
}
if (!(localAddress instanceof InetSocketAddress)) {
return new ChannelBindException(localAddress.toString(), -1, cause);
if (!(bindAddress instanceof InetSocketAddress)) {
return new ChannelBindException(bindAddress.toString(), -1, cause);
}
InetSocketAddress address = (InetSocketAddress) localAddress;
InetSocketAddress address = (InetSocketAddress) bindAddress;

return new ChannelBindException(address.getHostString(), address.getPort(), cause);
}
Expand Down
62 changes: 62 additions & 0 deletions src/main/java/reactor/netty/ChannelPipelineConfigurer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2011-Present VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty;

import io.netty.channel.Channel;

import javax.annotation.Nullable;
import java.net.SocketAddress;

import static reactor.netty.ReactorNetty.CompositeChannelPipelineConfigurer.compositeChannelPipelineConfigurer;

/**
* Configure the channel pipeline while initializing the channel.
*
* @author Violeta Georgieva
* @since 1.0.0
*/
@FunctionalInterface
public interface ChannelPipelineConfigurer {

/**
* Return a noop configurer
*
* @return a noop configurer
*/
static ChannelPipelineConfigurer emptyConfigurer(){
return ReactorNetty.NOOP_CONFIGURER;
}

/**
* Configure the channel pipeline while initializing the channel.
*
* @param connectionObserver the configured {@link ConnectionObserver}
* @param channel the channel
* @param remoteAddress the remote address
*/
void onChannelInit(ConnectionObserver connectionObserver, Channel channel, @Nullable SocketAddress remoteAddress);

/**
* Chain together another {@link ChannelPipelineConfigurer}
*
* @param other the next {@link ChannelPipelineConfigurer}
*
* @return a new composite {@link ChannelPipelineConfigurer}
*/
default ChannelPipelineConfigurer then(ChannelPipelineConfigurer other) {
return compositeChannelPipelineConfigurer(this, other);
}
}
3 changes: 1 addition & 2 deletions src/main/java/reactor/netty/NettyPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ public interface NettyPipeline {
String HttpDecompressor = LEFT + "decompressor";
String HttpAggregator = LEFT + "httpAggregator";
String HttpTrafficHandler = LEFT + "httpTrafficHandler";
String HttpInitializer = LEFT + "httpInitializer";
String H2CUpgradeHandler = LEFT + "h2cUpgradeHandler";
String AccessLogHandler = LEFT + "accessLogHandler";
String OnChannelWriteIdle = LEFT + "onChannelWriteIdle";
String OnChannelReadIdle = LEFT + "onChannelReadIdle";
String ChunkedWriter = LEFT + "chunkedWriter";
String LoggingHandler = LEFT + "loggingHandler";
String CompressionHandler = LEFT + "compressionHandler";
String HttpMetricsHandler = LEFT + "httpMetricsHandler";
String SslMetricsHandler = LEFT + "sslMetricsHandler";
String ChannelMetricsHandler = LEFT + "channelMetricsHandler";
String ConnectMetricsHandler = LEFT + "connectMetricsHandler";
String WsCompressionHandler = LEFT + "wsCompressionHandler";
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/reactor/netty/ReactorNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package reactor.netty;

import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.Objects;
Expand Down Expand Up @@ -488,6 +489,76 @@ public V call() throws Exception {
}
}

static final class CompositeChannelPipelineConfigurer implements ChannelPipelineConfigurer {

final ChannelPipelineConfigurer[] configurers;

CompositeChannelPipelineConfigurer(ChannelPipelineConfigurer[] configurers) {
this.configurers = configurers;
}

@Override
public void onChannelInit(ConnectionObserver connectionObserver, Channel channel, SocketAddress remoteAddress) {
for (ChannelPipelineConfigurer configurer : configurers) {
configurer.onChannelInit(connectionObserver, channel, remoteAddress);
}
}

static ChannelPipelineConfigurer compositeChannelPipelineConfigurer(
ChannelPipelineConfigurer configurer, ChannelPipelineConfigurer other) {

if (configurer == ChannelPipelineConfigurer.emptyConfigurer()) {
return other;
}

if (other == ChannelPipelineConfigurer.emptyConfigurer()) {
return configurer;
}

final ChannelPipelineConfigurer[] newConfigurers;
final ChannelPipelineConfigurer[] thizConfigurers;
final ChannelPipelineConfigurer[] otherConfigurers;
int length = 2;

if (configurer instanceof CompositeChannelPipelineConfigurer) {
thizConfigurers = ((CompositeChannelPipelineConfigurer) configurer).configurers;
length += thizConfigurers.length - 1;
}
else {
thizConfigurers = null;
}

if (other instanceof CompositeChannelPipelineConfigurer) {
otherConfigurers = ((CompositeChannelPipelineConfigurer)other).configurers;
length += otherConfigurers.length - 1;
}
else {
otherConfigurers = null;
}

newConfigurers = new ChannelPipelineConfigurer[length];

int pos;
if (thizConfigurers != null) {
pos = thizConfigurers.length;
System.arraycopy(thizConfigurers, 0, newConfigurers, 0, pos);
}
else {
pos = 1;
newConfigurers[0] = configurer;
}

if (otherConfigurers != null) {
System.arraycopy(otherConfigurers, 0, newConfigurers, pos, otherConfigurers.length);
}
else {
newConfigurers[pos] = other;
}

return new CompositeChannelPipelineConfigurer(newConfigurers);
}
}

static final class CompositeConnectionObserver implements ConnectionObserver {

final ConnectionObserver[] observers;
Expand Down Expand Up @@ -810,6 +881,8 @@ public synchronized Throwable fillInStackTrace() {
}
}

static final ChannelPipelineConfigurer NOOP_CONFIGURER = (observer, ch, address) -> {};

static final ConnectionObserver NOOP_LISTENER = (connection, newState) -> {};

static final Logger log = Loggers.getLogger(ReactorNetty.class);
Expand Down
Loading