diff --git a/pom.xml b/pom.xml index dffe896cb..f0be6996b 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ - 4.3.3 + 4.3.4 UTF-8 UTF-8 1.8 diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java index 24750fc64..f1089c98a 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/NettyStreamingService.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import io.netty.util.concurrent.Future; +import io.reactivex.CompletableEmitter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +83,7 @@ public NettyStreamingService(String apiUrl, int maxFramePayloadLength, Duration this.retryDuration = retryDuration; this.connectionTimeout = connectionTimeout; this.uri = new URI(apiUrl); - this.eventLoopGroup = new NioEventLoopGroup(); + this.eventLoopGroup = new NioEventLoopGroup(2); } catch (URISyntaxException e) { throw new IllegalArgumentException("Error parsing URI " + apiUrl, e); } @@ -123,47 +125,66 @@ public Completable connect() { sslCtx = null; } - final WebSocketClientHandler handler = getWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker( - uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), maxFramePayloadLength), - this::messageHandler); + final WebSocketClientHandler handler = getWebSocketClientHandler( + WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), maxFramePayloadLength), this::messageHandler); Bootstrap b = new Bootstrap(); b.group(eventLoopGroup) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis())) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - ChannelPipeline p = ch.pipeline(); - if (sslCtx != null) { - p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); - } - - WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler(); - List handlers = new ArrayList<>(4); - handlers.add(new HttpClientCodec()); - handlers.add(new HttpObjectAggregator(8192)); - handlers.add(handler); - if (clientExtensionHandler != null) handlers.add(clientExtensionHandler); - p.addLast(handlers.toArray(new ChannelHandler[handlers.size()])); - } - }); + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis())) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); + } + + WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler(); + List handlers = new ArrayList<>(4); + handlers.add(new HttpClientCodec()); + handlers.add(new HttpObjectAggregator(8192)); + if (clientExtensionHandler != null) { + handlers.add(clientExtensionHandler); + } + handlers.add(handler); + p.addLast(handlers.toArray(new ChannelHandler[handlers.size()])); + } + }); b.connect(uri.getHost(), port).addListener((ChannelFuture future) -> { webSocketChannel = future.channel(); if (future.isSuccess()) { - handler.handshakeFuture().addListener(f -> completable.onComplete()); + handler.handshakeFuture().addListener(f -> { + if (f.isSuccess()) { + completable.onComplete(); + } else { + handleError(completable, f.cause()); + } + }); } else { - completable.onError(future.cause()); + handleError(completable, future.cause()); } }); - } catch (Exception throwable) { - completable.onError(throwable); + } + catch (Exception throwable) { + handleError(completable, throwable); } }); } + protected void handleError(CompletableEmitter completable, Throwable t) { + isManualDisconnect = true; + ChannelFuture disconnect = webSocketChannel.disconnect(); + disconnect.addListener(f -> { + if(f.isSuccess()) { + isManualDisconnect = false; + } + }); + completable.onError(t); + } + public Completable disconnect() { isManualDisconnect = true; return Completable.create(completable -> { @@ -171,6 +192,7 @@ public Completable disconnect() { CloseWebSocketFrame closeFrame = new CloseWebSocketFrame(); webSocketChannel.writeAndFlush(closeFrame).addListener(future -> { channels = new ConcurrentHashMap<>(); + eventLoopGroup.shutdownGracefully(); completable.onComplete(); }); } @@ -217,7 +239,7 @@ public Observable subscribeChannel(String channelName, Object... args) { final String channelId = getSubscriptionUniqueId(channelName, args); LOG.info("Subscribing to channel {}", channelId); - return Observable.create(e -> { + return Observable. create(e -> { if (webSocketChannel == null || !webSocketChannel.isOpen()) { e.onError(new NotConnectedException()); } @@ -227,7 +249,8 @@ public Observable subscribeChannel(String channelName, Object... args) { channels.put(channelId, newSubscription); try { sendMessage(getSubscribeMessage(channelName, args)); - } catch (IOException throwable) { + } + catch (IOException throwable) { e.onError(throwable); } } @@ -271,7 +294,6 @@ protected void handleError(T message, Throwable t) { handleChannelError(channel, t); } - protected void handleChannelMessage(String channel, T message) { ObservableEmitter emitter = channels.get(channel).emitter; if (emitter == null) { @@ -296,8 +318,7 @@ protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() { return WebSocketClientCompressionHandler.INSTANCE; } - protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshaker, - WebSocketClientHandler.WebSocketMessageHandler handler) { + protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshaker, WebSocketClientHandler.WebSocketMessageHandler handler) { return new NettyWebSocketClientHandler(handshaker, handler); } @@ -313,13 +334,10 @@ public void channelInactive(ChannelHandlerContext ctx) { } else { super.channelInactive(ctx); LOG.info("Reopening websocket because it was closed by the host"); - final Completable c = connect() - .doOnError(t -> LOG.warn("Problem with reconnect", t)) - .retryWhen(new RetryWithDelay(retryDuration.toMillis())) - .doOnComplete(() -> { - LOG.info("Resubscribing channels"); - resubscribeChannels(); - }); + final Completable c = connect().doOnError(t -> LOG.warn("Problem with reconnect", t)).retryWhen(new RetryWithDelay(retryDuration.toMillis())).doOnComplete(() -> { + LOG.info("Resubscribing channels"); + resubscribeChannels(); + }); c.subscribe(); } } diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/WebSocketClientHandler.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/WebSocketClientHandler.java index 719642272..471db256e 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/WebSocketClientHandler.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/WebSocketClientHandler.java @@ -1,12 +1,21 @@ package info.bitrich.xchangestream.service.netty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import io.netty.util.CharsetUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class WebSocketClientHandler extends SimpleChannelInboundHandler { private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientHandler.class); @@ -47,22 +56,26 @@ public void channelInactive(ChannelHandlerContext ctx) { public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { - handshaker.finishHandshake(ch, (FullHttpResponse) msg); - LOG.info("WebSocket Client connected!"); - handshakeFuture.setSuccess(); + try { + handshaker.finishHandshake(ch, (FullHttpResponse)msg); + LOG.info("WebSocket Client connected!"); + handshakeFuture.setSuccess(); + } + catch (WebSocketHandshakeException e) { + LOG.error("WebSocket Client failed to connect. {}", e.getMessage()); + handshakeFuture.setFailure(e); + } return; } if (msg instanceof FullHttpResponse) { - FullHttpResponse response = (FullHttpResponse) msg; - throw new IllegalStateException( - "Unexpected FullHttpResponse (getStatus=" + response.status() + - ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + FullHttpResponse response = (FullHttpResponse)msg; + throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } - WebSocketFrame frame = (WebSocketFrame) msg; + WebSocketFrame frame = (WebSocketFrame)msg; if (frame instanceof TextWebSocketFrame) { - TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + TextWebSocketFrame textFrame = (TextWebSocketFrame)frame; handler.onMessage(textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { LOG.debug("WebSocket Client received pong"); @@ -74,7 +87,7 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - cause.printStackTrace(); + LOG.debug("", cause); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java index 365088371..129d9455f 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/BinanceStreamingExchange.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange { private static final String API_BASE_URI = "wss://stream.binance.com:9443/"; @@ -16,7 +17,8 @@ public class BinanceStreamingExchange extends BinanceExchange implements Streami private BinanceStreamingService streamingService; private BinanceStreamingMarketDataService streamingMarketDataService; - public BinanceStreamingExchange() {} + public BinanceStreamingExchange() { + } /** * Binance streaming API expects connections to multiple channels to be defined at connection time. To define the channels for this @@ -64,27 +66,21 @@ private BinanceStreamingService createStreamingService(ProductSubscription subsc return new BinanceStreamingService(path, subscription); } - private static String buildSubscriptionStreams(ProductSubscription subscription) { - return buildSubscriptionStrings(subscription.getTicker(), "ticker") + - buildSubscriptionStrings(subscription.getOrderBook(), "depth") + - buildSubscriptionStrings(subscription.getTrades(), "trade"); + public static String buildSubscriptionStreams(ProductSubscription subscription) { + return Stream.of(buildSubscriptionStrings(subscription.getTicker(), "ticker"), + buildSubscriptionStrings(subscription.getOrderBook(), "depth"), + buildSubscriptionStrings(subscription.getTrades(), "trade")) + .filter(s -> !s.isEmpty()) + .collect(Collectors.joining("/")); } private static String buildSubscriptionStrings(List currencyPairs, String subscriptionType) { - StringBuilder builder = new StringBuilder(); - subscriptionStrings(currencyPairs).forEach(subscriptionString -> { - builder.append("/"); - builder.append(subscriptionString); - builder.append("@"); - builder.append(subscriptionType); - }); - return builder.toString(); + return subscriptionStrings(currencyPairs).map( s -> s + "@" + subscriptionType).collect(Collectors.joining("/")); } - private static List subscriptionStrings(List currencyPairs) { + private static Stream subscriptionStrings(List currencyPairs) { return currencyPairs.stream() - .map(pair -> String.join("", pair.toString().split("/")).toLowerCase()) - .collect(Collectors.toList()); + .map(pair -> String.join("", pair.toString().split("/")).toLowerCase()); } } diff --git a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/TickerBinanceWebsocketTransaction.java b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/TickerBinanceWebsocketTransaction.java index d285516dc..098d6fda0 100644 --- a/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/TickerBinanceWebsocketTransaction.java +++ b/xchange-binance/src/main/java/info/bitrich/xchangestream/binance/dto/TickerBinanceWebsocketTransaction.java @@ -1,11 +1,11 @@ package info.bitrich.xchangestream.binance.dto; -import com.fasterxml.jackson.annotation.JsonProperty; - import java.math.BigDecimal; import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h; +import com.fasterxml.jackson.annotation.JsonProperty; + public class TickerBinanceWebsocketTransaction extends ProductBinanceWebSocketTransaction { private final BinanceTicker24h ticker; @@ -57,7 +57,8 @@ public TickerBinanceWebsocketTransaction( closeTime, firstId, lastId, - count); + count, + symbol); ticker.setCurrencyPair(currencyPair); } diff --git a/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceTest.java b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceTest.java new file mode 100644 index 000000000..ab33584ce --- /dev/null +++ b/xchange-binance/src/test/java/info/bitrich/xchangestream/binance/BinanceTest.java @@ -0,0 +1,25 @@ +package info.bitrich.xchangestream.binance; + +import info.bitrich.xchangestream.core.ProductSubscription; +import net.bytebuddy.agent.builder.AgentBuilder; +import org.junit.Assert; +import org.junit.Test; +import org.knowm.xchange.currency.CurrencyPair; + +public class BinanceTest { + + @Test + public void channelCreateUrlTest() { + ProductSubscription.ProductSubscriptionBuilder builder = ProductSubscription.create(); + builder.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC); + String buildSubscriptionStreams = BinanceStreamingExchange.buildSubscriptionStreams(builder.build()); + Assert.assertEquals("btcusd@ticker/dashbtc@ticker", buildSubscriptionStreams); + + ProductSubscription.ProductSubscriptionBuilder builder2 = ProductSubscription.create(); + builder2.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC).addOrderbook(CurrencyPair.ETH_BTC); + String buildSubscriptionStreams2 = BinanceStreamingExchange.buildSubscriptionStreams(builder2.build()); + Assert.assertEquals("btcusd@ticker/dashbtc@ticker/ethbtc@depth", buildSubscriptionStreams2); + } + + +} diff --git a/xchange-okcoin/src/test/java/info/bitrich/xchangestream/okcoin/OkCoinStreamingMarketDataServiceTest.java b/xchange-okcoin/src/test/java/info/bitrich/xchangestream/okcoin/OkCoinStreamingMarketDataServiceTest.java index 00d3826f5..662ecebae 100644 --- a/xchange-okcoin/src/test/java/info/bitrich/xchangestream/okcoin/OkCoinStreamingMarketDataServiceTest.java +++ b/xchange-okcoin/src/test/java/info/bitrich/xchangestream/okcoin/OkCoinStreamingMarketDataServiceTest.java @@ -42,13 +42,13 @@ public void testGetOrderBook() throws Exception { when(okCoinStreamingService.subscribeChannel(any())).thenReturn(Observable.just(jsonNode)); List bids = new ArrayList<>(); - bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.922"), CurrencyPair.BTC_USD, null, null, new BigDecimal("819.9"))); - bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.085"), CurrencyPair.BTC_USD, null, null, new BigDecimal("818.63"))); + bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.922"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("819.9"))); + bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.085"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("818.63"))); List asks = new ArrayList<>(); - asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("0.035"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.6"))); - asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("5.18"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.65"))); - asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("2.89"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.7"))); + asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("0.035"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.6"))); + asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("5.18"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.65"))); + asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("2.89"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.7"))); OrderBook expected = new OrderBook(new Date(1484602135246L), asks, bids);