Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

xhange 4.3.4, binance fix, okcoin test fix, NettyStreamingService tuning #141

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
</repositories>

<properties>
<xchange.version>4.3.3</xchange.version>
<xchange.version>4.3.4</xchange.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.util.concurrent.Future;
import io.reactivex.CompletableEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -81,7 +83,7 @@ public NettyStreamingService(String apiUrl, int maxFramePayloadLength, Duration
this.retryDuration = retryDuration;
this.connectionTimeout = connectionTimeout;
this.uri = new URI(apiUrl);
this.eventLoopGroup = new NioEventLoopGroup();
this.eventLoopGroup = new NioEventLoopGroup(2);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Error parsing URI " + apiUrl, e);
}
Expand Down Expand Up @@ -123,54 +125,74 @@ public Completable connect() {
sslCtx = null;
}

final WebSocketClientHandler handler = getWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), maxFramePayloadLength),
this::messageHandler);
final WebSocketClientHandler handler = getWebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders(), maxFramePayloadLength), this::messageHandler);

Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis()))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}

WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler();
List<ChannelHandler> handlers = new ArrayList<>(4);
handlers.add(new HttpClientCodec());
handlers.add(new HttpObjectAggregator(8192));
handlers.add(handler);
if (clientExtensionHandler != null) handlers.add(clientExtensionHandler);
p.addLast(handlers.toArray(new ChannelHandler[handlers.size()]));
}
});
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, java.lang.Math.toIntExact(connectionTimeout.toMillis()))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}

WebSocketClientExtensionHandler clientExtensionHandler = getWebSocketClientExtensionHandler();
List<ChannelHandler> handlers = new ArrayList<>(4);
handlers.add(new HttpClientCodec());
handlers.add(new HttpObjectAggregator(8192));
if (clientExtensionHandler != null) {
handlers.add(clientExtensionHandler);
}
handlers.add(handler);
p.addLast(handlers.toArray(new ChannelHandler[handlers.size()]));
}
});

b.connect(uri.getHost(), port).addListener((ChannelFuture future) -> {
webSocketChannel = future.channel();
if (future.isSuccess()) {
handler.handshakeFuture().addListener(f -> completable.onComplete());
handler.handshakeFuture().addListener(f -> {
if (f.isSuccess()) {
completable.onComplete();
} else {
handleError(completable, f.cause());
}
});
} else {
completable.onError(future.cause());
handleError(completable, future.cause());
}

});
} catch (Exception throwable) {
completable.onError(throwable);
}
catch (Exception throwable) {
handleError(completable, throwable);
}
});
}

protected void handleError(CompletableEmitter completable, Throwable t) {
isManualDisconnect = true;
ChannelFuture disconnect = webSocketChannel.disconnect();
disconnect.addListener(f -> {
if(f.isSuccess()) {
isManualDisconnect = false;
}
});
completable.onError(t);
}

public Completable disconnect() {
isManualDisconnect = true;
return Completable.create(completable -> {
if (webSocketChannel.isOpen()) {
CloseWebSocketFrame closeFrame = new CloseWebSocketFrame();
webSocketChannel.writeAndFlush(closeFrame).addListener(future -> {
channels = new ConcurrentHashMap<>();
eventLoopGroup.shutdownGracefully();
completable.onComplete();
});
}
Expand Down Expand Up @@ -217,7 +239,7 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
final String channelId = getSubscriptionUniqueId(channelName, args);
LOG.info("Subscribing to channel {}", channelId);

return Observable.<T>create(e -> {
return Observable.<T> create(e -> {
if (webSocketChannel == null || !webSocketChannel.isOpen()) {
e.onError(new NotConnectedException());
}
Expand All @@ -227,7 +249,8 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
channels.put(channelId, newSubscription);
try {
sendMessage(getSubscribeMessage(channelName, args));
} catch (IOException throwable) {
}
catch (IOException throwable) {
e.onError(throwable);
}
}
Expand Down Expand Up @@ -271,7 +294,6 @@ protected void handleError(T message, Throwable t) {
handleChannelError(channel, t);
}


protected void handleChannelMessage(String channel, T message) {
ObservableEmitter<T> emitter = channels.get(channel).emitter;
if (emitter == null) {
Expand All @@ -296,8 +318,7 @@ protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return WebSocketClientCompressionHandler.INSTANCE;
}

protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshaker,
WebSocketClientHandler.WebSocketMessageHandler handler) {
protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshaker, WebSocketClientHandler.WebSocketMessageHandler handler) {
return new NettyWebSocketClientHandler(handshaker, handler);
}

Expand All @@ -313,13 +334,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
} else {
super.channelInactive(ctx);
LOG.info("Reopening websocket because it was closed by the host");
final Completable c = connect()
.doOnError(t -> LOG.warn("Problem with reconnect", t))
.retryWhen(new RetryWithDelay(retryDuration.toMillis()))
.doOnComplete(() -> {
LOG.info("Resubscribing channels");
resubscribeChannels();
});
final Completable c = connect().doOnError(t -> LOG.warn("Problem with reconnect", t)).retryWhen(new RetryWithDelay(retryDuration.toMillis())).doOnComplete(() -> {
LOG.info("Resubscribing channels");
resubscribeChannels();
});
c.subscribe();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package info.bitrich.xchangestream.service.netty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientHandler.class);
Expand Down Expand Up @@ -47,22 +56,26 @@ public void channelInactive(ChannelHandlerContext ctx) {
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
LOG.info("WebSocket Client connected!");
handshakeFuture.setSuccess();
try {
handshaker.finishHandshake(ch, (FullHttpResponse)msg);
LOG.info("WebSocket Client connected!");
handshakeFuture.setSuccess();
}
catch (WebSocketHandshakeException e) {
LOG.error("WebSocket Client failed to connect. {}", e.getMessage());
handshakeFuture.setFailure(e);
}
return;
}

if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
FullHttpResponse response = (FullHttpResponse)msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}

WebSocketFrame frame = (WebSocketFrame) msg;
WebSocketFrame frame = (WebSocketFrame)msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
handler.onMessage(textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
LOG.debug("WebSocket Client received pong");
Expand All @@ -74,7 +87,7 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
LOG.debug("", cause);
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {
private static final String API_BASE_URI = "wss://stream.binance.com:9443/";

private BinanceStreamingService streamingService;
private BinanceStreamingMarketDataService streamingMarketDataService;

public BinanceStreamingExchange() {}
public BinanceStreamingExchange() {
}

/**
* Binance streaming API expects connections to multiple channels to be defined at connection time. To define the channels for this
Expand Down Expand Up @@ -64,27 +66,21 @@ private BinanceStreamingService createStreamingService(ProductSubscription subsc
return new BinanceStreamingService(path, subscription);
}

private static String buildSubscriptionStreams(ProductSubscription subscription) {
return buildSubscriptionStrings(subscription.getTicker(), "ticker") +
buildSubscriptionStrings(subscription.getOrderBook(), "depth") +
buildSubscriptionStrings(subscription.getTrades(), "trade");
public static String buildSubscriptionStreams(ProductSubscription subscription) {
return Stream.of(buildSubscriptionStrings(subscription.getTicker(), "ticker"),
buildSubscriptionStrings(subscription.getOrderBook(), "depth"),
buildSubscriptionStrings(subscription.getTrades(), "trade"))
.filter(s -> !s.isEmpty())
.collect(Collectors.joining("/"));
}

private static String buildSubscriptionStrings(List<CurrencyPair> currencyPairs, String subscriptionType) {
StringBuilder builder = new StringBuilder();
subscriptionStrings(currencyPairs).forEach(subscriptionString -> {
builder.append("/");
builder.append(subscriptionString);
builder.append("@");
builder.append(subscriptionType);
});
return builder.toString();
return subscriptionStrings(currencyPairs).map( s -> s + "@" + subscriptionType).collect(Collectors.joining("/"));
}

private static List<String> subscriptionStrings(List<CurrencyPair> currencyPairs) {
private static Stream<String> subscriptionStrings(List<CurrencyPair> currencyPairs) {
return currencyPairs.stream()
.map(pair -> String.join("", pair.toString().split("/")).toLowerCase())
.collect(Collectors.toList());
.map(pair -> String.join("", pair.toString().split("/")).toLowerCase());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package info.bitrich.xchangestream.binance.dto;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.math.BigDecimal;

import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h;

import com.fasterxml.jackson.annotation.JsonProperty;

public class TickerBinanceWebsocketTransaction extends ProductBinanceWebSocketTransaction {

private final BinanceTicker24h ticker;
Expand Down Expand Up @@ -57,7 +57,8 @@ public TickerBinanceWebsocketTransaction(
closeTime,
firstId,
lastId,
count);
count,
symbol);
ticker.setCurrencyPair(currencyPair);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.core.ProductSubscription;
import net.bytebuddy.agent.builder.AgentBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.knowm.xchange.currency.CurrencyPair;

public class BinanceTest {

@Test
public void channelCreateUrlTest() {
ProductSubscription.ProductSubscriptionBuilder builder = ProductSubscription.create();
builder.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC);
String buildSubscriptionStreams = BinanceStreamingExchange.buildSubscriptionStreams(builder.build());
Assert.assertEquals("btcusd@ticker/dashbtc@ticker", buildSubscriptionStreams);

ProductSubscription.ProductSubscriptionBuilder builder2 = ProductSubscription.create();
builder2.addTicker(CurrencyPair.BTC_USD).addTicker(CurrencyPair.DASH_BTC).addOrderbook(CurrencyPair.ETH_BTC);
String buildSubscriptionStreams2 = BinanceStreamingExchange.buildSubscriptionStreams(builder2.build());
Assert.assertEquals("btcusd@ticker/dashbtc@ticker/ethbtc@depth", buildSubscriptionStreams2);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public void testGetOrderBook() throws Exception {
when(okCoinStreamingService.subscribeChannel(any())).thenReturn(Observable.just(jsonNode));

List<LimitOrder> bids = new ArrayList<>();
bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.922"), CurrencyPair.BTC_USD, null, null, new BigDecimal("819.9")));
bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.085"), CurrencyPair.BTC_USD, null, null, new BigDecimal("818.63")));
bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.922"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("819.9")));
bids.add(new LimitOrder(Order.OrderType.BID, new BigDecimal("0.085"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("818.63")));

List<LimitOrder> asks = new ArrayList<>();
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("0.035"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.6")));
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("5.18"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.65")));
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("2.89"), CurrencyPair.BTC_USD, null, null, new BigDecimal("821.7")));
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("0.035"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.6")));
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("5.18"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.65")));
asks.add(new LimitOrder(Order.OrderType.ASK, new BigDecimal("2.89"), CurrencyPair.BTC_USD, null, new Date(1484602135246L), new BigDecimal("821.7")));

OrderBook expected = new OrderBook(new Date(1484602135246L), asks, bids);

Expand Down