Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #191 from badgerwithagun/fix-gdax-disconnect-npe
Browse files Browse the repository at this point in the history
- Bitfinex proxy support
- Various updated dependencies
- Performance improvements (ObjectMapper reuse and using treeToValue() calls)
- Fix for non-3-character tickers on Binance
- Reconnect/connection success events for some exchanges (GDAX being the main one)
  • Loading branch information
Flemingjp authored Oct 24, 2018
2 parents 09a7cf6 + 55513ff commit 368454c
Show file tree
Hide file tree
Showing 42 changed files with 636 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

public abstract class JsonNettyStreamingService extends NettyStreamingService<JsonNode> {
private static final Logger LOG = LoggerFactory.getLogger(JsonNettyStreamingService.class);
protected final ObjectMapper objectMapper = StreamingObjectMapperHelper.getObjectMapper();

public JsonNettyStreamingService(String apiUrl) {
super(apiUrl);
Expand All @@ -22,7 +23,6 @@ public JsonNettyStreamingService(String apiUrl, int maxFramePayloadLength) {
@Override
public void messageHandler(String message) {
LOG.debug("Received message: {}", message);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode;

// Parse incoming message to JSON
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -35,9 +37,15 @@
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.internal.SocketUtils;
import io.reactivex.Completable;
import io.reactivex.CompletableEmitter;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;

Expand All @@ -61,12 +69,22 @@ public Subscription(ObservableEmitter<T> emitter, String channelName, Object[] a
private final int maxFramePayloadLength;
private final URI uri;
private boolean isManualDisconnect = false;
private boolean connectedSuccessfully = false;
private Channel webSocketChannel;
private Duration retryDuration;
private Duration connectionTimeout;
private final NioEventLoopGroup eventLoopGroup;
private volatile NioEventLoopGroup eventLoopGroup;
protected Map<String, Subscription> channels = new ConcurrentHashMap<>();
private boolean compressedMessages = false;
private final List<ObservableEmitter<Throwable>> reconnFailEmitters = new LinkedList<>();
private final List<ObservableEmitter<Object>> connectionSuccessEmitters = new LinkedList<>();

//debugging
private boolean acceptAllCertificates = false;
private boolean enableLoggingHandler = false;
private LogLevel loggingHandlerLevel = LogLevel.DEBUG;
private String socksProxyHost;
private Integer socksProxyPort;

public NettyStreamingService(String apiUrl) {
this(apiUrl, 65536);
Expand All @@ -82,7 +100,6 @@ public NettyStreamingService(String apiUrl, int maxFramePayloadLength, Duration
this.retryDuration = retryDuration;
this.connectionTimeout = connectionTimeout;
this.uri = new URI(apiUrl);
this.eventLoopGroup = new NioEventLoopGroup();
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error parsing URI " + apiUrl, e);
}
Expand Down Expand Up @@ -119,37 +136,48 @@ public Completable connect() {
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient().build();
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
if (acceptAllCertificates) {
sslContextBuilder = sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
}
sslCtx = sslContextBuilder.build();
} else {
sslCtx = null;
}

final WebSocketClientHandler handler = getWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), maxFramePayloadLength),
uri, WebSocketVersion.V13, null, true, getCustomHeaders(), maxFramePayloadLength),
this::messageHandler);

Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup(2);
b.group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis()))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();

if (socksProxyHost != null) {
p.addLast(new Socks5ProxyHandler(SocketUtils.socketAddress(socksProxyHost, socksProxyPort)));
}
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}

WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler();
List<ChannelHandler> handlers = new ArrayList<>(4);
handlers.add(new HttpClientCodec());

if (enableLoggingHandler) handlers.add(new LoggingHandler(loggingHandlerLevel));
if (compressedMessages) handlers.add(WebSocketClientCompressionHandler.INSTANCE);
handlers.add(new HttpObjectAggregator(8192));

if (clientExtensionHandler != null) {
handlers.add(clientExtensionHandler);
}

handlers.add(handler);
p.addLast(handlers.toArray(new ChannelHandler[handlers.size()]));
}
Expand All @@ -162,29 +190,65 @@ protected void initChannel(SocketChannel ch) {
if (f.isSuccess()) {
completable.onComplete();
} else {
completable.onError(f.cause());
handleError(completable, f.cause());
}
});
} else {
completable.onError(future.cause());
handleError(completable, future.cause());
}

});
} catch (Exception throwable) {
completable.onError(throwable);
handleError(completable, throwable);
}
}).doOnError(t -> {
LOG.warn("Problem with connection", t);
reconnFailEmitters.forEach(emitter -> emitter.onNext(t));
}).retryWhen(new RetryWithDelay(retryDuration.toMillis()))
.doOnComplete(() -> {
connectedSuccessfully = true;
LOG.warn("Resubscribing channels");
resubscribeChannels();

connectionSuccessEmitters.forEach(emitter -> {
emitter.onNext(new Object());
}
);

});
}

protected void handleError(CompletableEmitter completable, Throwable t) {
isManualDisconnect = true;
ChannelFuture disconnect = webSocketChannel.disconnect();
disconnect.addListener(f -> {
if(f.isSuccess()) {
isManualDisconnect = false;
}
});
completable.onError(t);
}

protected DefaultHttpHeaders getCustomHeaders() {
return new DefaultHttpHeaders();
}

public Completable disconnect() {
isManualDisconnect = true;
connectedSuccessfully = false;
return Completable.create(completable -> {
if (webSocketChannel.isOpen()) {
CloseWebSocketFrame closeFrame = new CloseWebSocketFrame();
webSocketChannel.writeAndFlush(closeFrame).addListener(future -> {
channels = new ConcurrentHashMap<>();
completable.onComplete();
eventLoopGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS).addListener(f -> {
LOG.info("Disconnected");
completable.onComplete();
});
});
} else {
LOG.warn("Disconnect called but already disconnected");
completable.onComplete();
}
});
}
Expand Down Expand Up @@ -225,6 +289,14 @@ public void sendMessage(String message) {
}
}

public Observable<Throwable> subscribeReconnectFailure() {
return Observable.<Throwable>create(observableEmitter -> reconnFailEmitters.add(observableEmitter));
}

public Observable<Object> subscribeConnectionSuccess() {
return Observable.<Object>create(e -> connectionSuccessEmitters.add(e));
}

public Observable<T> subscribeChannel(String channelName, Object... args) {
final String channelId = getSubscriptionUniqueId(channelName, args);
LOG.info("Subscribing to channel {}", channelId);
Expand Down Expand Up @@ -285,7 +357,12 @@ protected void handleError(T message, Throwable t) {


protected void handleChannelMessage(String channel, T message) {
ObservableEmitter<T> emitter = channels.get(channel).emitter;
NettyStreamingService<T>.Subscription subscription = channels.get(channel);
if (subscription == null) {
LOG.debug("Channel has been closed {}.", channel);
return;
}
ObservableEmitter<T> emitter = subscription.emitter;
if (emitter == null) {
LOG.debug("No subscriber for channel {}.", channel);
return;
Expand All @@ -295,11 +372,12 @@ protected void handleChannelMessage(String channel, T message) {
}

protected void handleChannelError(String channel, Throwable t) {
if (!channel.contains(channel)) {
LOG.error("Unexpected channel's error: {}, {}.", channel, t);
NettyStreamingService<T>.Subscription subscription = channels.get(channel);
if (subscription == null) {
LOG.debug("Channel {} has been closed.", channel);
return;
}
ObservableEmitter<T> emitter = channels.get(channel).emitter;
ObservableEmitter<T> emitter = subscription.emitter;
if (emitter == null) {
LOG.debug("No subscriber for channel {}.", channel);
return;
Expand Down Expand Up @@ -328,15 +406,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
isManualDisconnect = false;
} else {
super.channelInactive(ctx);
LOG.info("Reopening websocket because it was closed by the host");
final Completable c = connect()
.doOnError(t -> LOG.warn("Problem with reconnect", t))
.retryWhen(new RetryWithDelay(retryDuration.toMillis()))
.doOnComplete(() -> {
LOG.info("Resubscribing channels");
resubscribeChannels();
});
c.subscribe();
if (connectedSuccessfully) {
LOG.info("Reopening websocket because it was closed by the host");
eventLoopGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS).addListener(f -> connect().subscribe());
}
}
}
}
Expand All @@ -346,4 +419,26 @@ public boolean isSocketOpen() {
}

public void useCompressedMessages(boolean compressedMessages) { this.compressedMessages = compressedMessages; }

public void setAcceptAllCertificates(boolean acceptAllCertificates) {
this.acceptAllCertificates = acceptAllCertificates;
}

public void setEnableLoggingHandler(boolean enableLoggingHandler) {
this.enableLoggingHandler = enableLoggingHandler;
}

public void setLoggingHandlerLevel(LogLevel loggingHandlerLevel) {
this.loggingHandlerLevel = loggingHandlerLevel;
}

public void setSocksProxyHost(String socksProxyHost) {
this.socksProxyHost = socksProxyHost;
}

public void setSocksProxyPort(Integer socksProxyPort) {
this.socksProxyPort = socksProxyPort;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package info.bitrich.xchangestream.service.netty;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class should be merged with ObjectMapperHelper from XStream..
*
* @author Nikita Belenkiy on 19/06/2018.
*/
public class StreamingObjectMapperHelper {

private static final ObjectMapper objectMapper;

static {
objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

private StreamingObjectMapperHelper() {

}

public static ObjectMapper getObjectMapper() {
return objectMapper;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse)msg);
LOG.info("WebSocket Client connected!");
handshakeFuture.setSuccess();
LOG.info("WebSocket Client connected!");
handshakeFuture.setSuccess();
}
catch (WebSocketHandshakeException e) {
LOG.error("WebSocket Client failed to connect. {}", e.getMessage());
Expand Down Expand Up @@ -97,4 +97,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
ctx.close();
}
}
}
4 changes: 2 additions & 2 deletions service-pusher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
<dependency>
<groupId>com.pusher</groupId>
<artifactId>pusher-java-client</artifactId>
<version>1.4.0</version>
<version>1.8.0</version>
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import io.reactivex.Observable;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
Expand Down Expand Up @@ -57,6 +58,16 @@ public boolean isAlive() {
return streamingService!= null && streamingService.isSocketOpen();
}

@Override
public Observable<Throwable> reconnectFailure() {
return streamingService.subscribeReconnectFailure();
}

@Override
public Observable<Object> connectionSuccess() {
return streamingService.subscribeConnectionSuccess();
}

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
Expand Down
Loading

0 comments on commit 368454c

Please sign in to comment.