From 74f5c40320db806f48aeaa22778b8049cee5c4bf Mon Sep 17 00:00:00 2001 From: bharris Date: Fri, 6 Apr 2018 10:03:38 -0700 Subject: [PATCH 1/4] [gdax] Enabling of 'user' channel to receive fill messages. --- .../gdax/GDAXStreamingExchange.java | 2 +- .../gdax/GDAXStreamingMarketDataService.java | 34 +++++++---- .../gdax/GDAXStreamingService.java | 37 +++++++++--- .../dto/GDAXWebSocketSubscriptionMessage.java | 57 ++++++++++++++---- .../gdax/dto/GDAXWebSocketTransaction.java | 58 +++++++++++++++++-- .../GDAXWebSocketSubscriptionMessageTest.java | 2 +- 6 files changed, 153 insertions(+), 37 deletions(-) diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java index 80ea24867..7d912efc5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java @@ -28,7 +28,7 @@ protected void initServices() { public Completable connect(ProductSubscription... args) { if (args == null || args.length == 0) throw new UnsupportedOperationException("The ProductSubscription must be defined!"); - this.streamingService = new GDAXStreamingService(API_URI); + this.streamingService = new GDAXStreamingService(API_URI, this); this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService); streamingService.subscribeMultipleCurrencyPairs(args); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java index 7410f51d3..806c7c2b5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java @@ -1,10 +1,18 @@ package info.bitrich.xchangestream.gdax; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.core.StreamingMarketDataService; -import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; -import io.reactivex.Observable; +import static io.netty.util.internal.StringUtil.isNullOrEmpty; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; @@ -13,14 +21,16 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static io.netty.util.internal.StringUtil.isNullOrEmpty; -import static org.knowm.xchange.gdax.GDAXAdapters.*; +import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; +import io.reactivex.Observable; /** * Created by luca on 4/3/17. @@ -142,7 +152,11 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { .filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") && message.getProductId().equals(channelName)) .map(s -> { - Trades adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); + Trades adaptedTrades = null; + if ( s.getUserId() != null ) + adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()}); + else + adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); return adaptedTrades.getTrades().get(0); } ); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java index 20bc346b5..dd782afdc 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java @@ -1,7 +1,18 @@ package info.bitrich.xchangestream.gdax; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; +import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketSubscriptionMessage; import info.bitrich.xchangestream.gdax.netty.WebSocketClientCompressionAllowClientNoContextHandler; @@ -11,12 +22,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.Observable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; public class GDAXStreamingService extends JsonNettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingService.class); @@ -25,11 +30,13 @@ public class GDAXStreamingService extends JsonNettyStreamingService { private static final String SHARE_CHANNEL_NAME = "ALL"; private final Map> subscriptions = new HashMap<>(); private ProductSubscription product = null; + GDAXStreamingExchange exchange; private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - public GDAXStreamingService(String apiUrl) { + public GDAXStreamingService(String apiUrl, GDAXStreamingExchange exchange) { super(apiUrl, Integer.MAX_VALUE); + this.exchange = exchange; } public ProductSubscription getProduct() { @@ -66,15 +73,27 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product); + ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } + GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } @Override public String getUnsubscribeMessage(String channelName) throws IOException { + ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } GDAXWebSocketSubscriptionMessage subscribeMessage = - new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}); + new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java index f40aed31b..8f137cd2a 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java @@ -1,8 +1,10 @@ package info.bitrich.xchangestream.gdax.dto; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import info.bitrich.xchangestream.core.ProductSubscription; import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; import java.util.ArrayList; import java.util.HashMap; @@ -19,6 +21,12 @@ public class GDAXWebSocketSubscriptionMessage { public static final String PRODUCT_IDS = "product_ids"; public static final String NAME = "name"; + // if authenticating + public static final String SIGNATURE = "signature"; + public static final String KEY = "key"; + public static final String PASSPHRASE = "passphrase"; + public static final String TIMESTAMP = "timestamp"; + class GDAXProductSubsctiption { @JsonProperty(NAME) private String name; @@ -26,7 +34,7 @@ class GDAXProductSubsctiption { @JsonProperty(PRODUCT_IDS) private String[] productIds; - public GDAXProductSubsctiption(String name, String[] productIds) { + public GDAXProductSubsctiption(String name, String[] productIds, GDAXWebsocketAuthData authData) { this.name = name; this.productIds = productIds; } @@ -45,15 +53,31 @@ public String[] getProductIds() { @JsonProperty(CHANNELS) private GDAXProductSubsctiption[] channels; - - public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product) { + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(SIGNATURE) + String signature; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(KEY) + String key; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(PASSPHRASE) + String passphrase; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(TIMESTAMP) + String timestamp; + + public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(product); + generateSubscriptionMessage(product, authData); } - public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames) { + public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(channelNames); + generateSubscriptionMessage(channelNames, authData); } private String[] generateProductIds(CurrencyPair[] pairs) { @@ -65,39 +89,48 @@ private String[] generateProductIds(CurrencyPair[] pairs) { return productIds.toArray(new String[productIds.size()]); } - private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs) { + private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs, GDAXWebsocketAuthData authData) { String[] productsIds; productsIds = generateProductIds(pairs); - return new GDAXProductSubsctiption(name, productsIds); + return new GDAXProductSubsctiption(name, productsIds, authData); } - private void generateSubscriptionMessage(String[] channelNames) { + private void generateSubscriptionMessage(String[] channelNames, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); for (String name : channelNames) { - channels.add(new GDAXProductSubsctiption(name, null)); + channels.add(new GDAXProductSubsctiption(name, null, authData)); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); } - private void generateSubscriptionMessage(ProductSubscription productSubscription) { + private void generateSubscriptionMessage(ProductSubscription productSubscription, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); Map> pairs = new HashMap<>(3); pairs.put("level2", productSubscription.getOrderBook()); pairs.put("ticker", productSubscription.getTicker()); pairs.put("matches", productSubscription.getTrades()); + if ( authData != null ) + pairs.put("user", productSubscription.getTrades()); for (Map.Entry> product : pairs.entrySet()) { List currencyPairs = product.getValue(); if (currencyPairs == null || currencyPairs.size() == 0) { continue; } - GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()])); + GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]), authData); channels.add(gdaxProduct); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); + + if ( authData != null ) { + this.key = authData.getKey(); + this.passphrase = authData.getPassphrase(); + this.signature = authData.getSignature(); + this.timestamp = String.valueOf(authData.getTimestamp()); + } } public String getType() { diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java index 295032991..26326b327 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java @@ -5,6 +5,7 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductStats; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -38,7 +39,12 @@ public class GDAXWebSocketTransaction { private final String reason; private final long tradeId; private final String makerOrderId; - private final String takenOrderId; + private final String takerOrderId; + + private final String takerUserId; + private final String userId; + private final String takerProfileId; + private final String profileId; public GDAXWebSocketTransaction( @JsonProperty("type") String type, @@ -65,13 +71,17 @@ public GDAXWebSocketTransaction( @JsonProperty("reason") String reason, @JsonProperty("trade_id") long tradeId, @JsonProperty("maker_order_id") String makerOrderId, - @JsonProperty("taken_order_id") String takenOrderId) { + @JsonProperty("taker_order_id") String takerOrderId, + @JsonProperty("taker_user_id") String takerUserId, + @JsonProperty("user_id") String userId, + @JsonProperty("taker_profile_id") String takerProfileId, + @JsonProperty("profile_id") String profileId) { this.remainingSize = remainingSize; this.reason = reason; this.tradeId = tradeId; this.makerOrderId = makerOrderId; - this.takenOrderId = takenOrderId; + this.takerOrderId = takerOrderId; this.type = type; this.orderId = orderId; this.orderType = orderType; @@ -92,6 +102,10 @@ public GDAXWebSocketTransaction( this.productId = productId; this.sequence = sequence; this.time = time; + this.takerUserId = takerUserId; + this.userId = userId; + this.takerProfileId = takerProfileId; + this.profileId = profileId; } private String[][] GDAXOrderBookChanges(String side, String[][] changes, SortedMap sideEntries, @@ -151,6 +165,11 @@ public GDAXTrade toGDAXTrade() { return new GDAXTrade(time, tradeId, price, size, side); } + public GDAXFill toGDAXFill() { + boolean taker = userId != null && takerUserId != null && userId.equals(takerUserId); + return new GDAXFill(String.valueOf(tradeId), productId, price, size, taker ? takerOrderId : makerOrderId, time, null, null, true, side); + } + public String getType() { return type; } @@ -235,8 +254,31 @@ public String getMakerOrderId() { return makerOrderId; } + /** + * @deprecated Use {@link #getTakerOrderId()} + */ public String getTakenOrderId() { - return takenOrderId; + return takerOrderId; + } + + public String getTakerOrderId() { + return takerOrderId; + } + + public String getTakerUserId() { + return takerUserId; + } + + public String getUserId() { + return userId; + } + + public String getTakerProfileId() { + return takerProfileId; + } + + public String getProfileId() { + return profileId; } @Override @@ -259,6 +301,14 @@ public String toString() { sb.append(", productId='").append(productId).append('\''); sb.append(", sequence=").append(sequence); sb.append(", time='").append(time).append('\''); + if ( userId != null ) + sb.append(", userId='").append(userId).append('\''); + if ( profileId != null ) + sb.append(", profileId='").append(profileId).append('\''); + if ( takerUserId != null ) + sb.append(", takerUserId='").append(takerUserId).append('\''); + if ( takerProfileId != null ) + sb.append(", takerProfileId='").append(takerProfileId).append('\''); sb.append('}'); return sb.toString(); } diff --git a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java index 279ac81df..9b754f0da 100644 --- a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java +++ b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java @@ -18,7 +18,7 @@ public void testWebSocketMessageSerialization() throws JsonProcessingException { ProductSubscription productSubscription = ProductSubscription.create().addOrderbook(CurrencyPair.BTC_USD) .addTrades(CurrencyPair.BTC_USD).addTicker(CurrencyPair.BTC_USD).build(); - GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription); + GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription, null); final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); From 50a77e637ba8dfad6f7dff74990b93cde2a714db Mon Sep 17 00:00:00 2001 From: bharris Date: Fri, 6 Apr 2018 10:09:56 -0700 Subject: [PATCH 2/4] [gdax] Enabling of 'user' channel to receive fill messages. (reverted from commit 74f5c40320db806f48aeaa22778b8049cee5c4bf) --- .../gdax/GDAXStreamingExchange.java | 2 +- .../gdax/GDAXStreamingMarketDataService.java | 34 ++++------- .../gdax/GDAXStreamingService.java | 37 +++--------- .../dto/GDAXWebSocketSubscriptionMessage.java | 57 ++++-------------- .../gdax/dto/GDAXWebSocketTransaction.java | 58 ++----------------- .../GDAXWebSocketSubscriptionMessageTest.java | 2 +- 6 files changed, 37 insertions(+), 153 deletions(-) diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java index 7d912efc5..80ea24867 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java @@ -28,7 +28,7 @@ protected void initServices() { public Completable connect(ProductSubscription... args) { if (args == null || args.length == 0) throw new UnsupportedOperationException("The ProductSubscription must be defined!"); - this.streamingService = new GDAXStreamingService(API_URI, this); + this.streamingService = new GDAXStreamingService(API_URI); this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService); streamingService.subscribeMultipleCurrencyPairs(args); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java index 806c7c2b5..7410f51d3 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java @@ -1,18 +1,10 @@ package info.bitrich.xchangestream.gdax; -import static io.netty.util.internal.StringUtil.isNullOrEmpty; -import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook; -import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker; -import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory; -import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades; - -import java.math.BigDecimal; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; +import io.reactivex.Observable; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; @@ -21,16 +13,14 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; -import org.knowm.xchange.gdax.dto.trade.GDAXFill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.math.BigDecimal; +import java.util.*; -import info.bitrich.xchangestream.core.StreamingMarketDataService; -import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; -import io.reactivex.Observable; +import static io.netty.util.internal.StringUtil.isNullOrEmpty; +import static org.knowm.xchange.gdax.GDAXAdapters.*; /** * Created by luca on 4/3/17. @@ -152,11 +142,7 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { .filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") && message.getProductId().equals(channelName)) .map(s -> { - Trades adaptedTrades = null; - if ( s.getUserId() != null ) - adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()}); - else - adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); + Trades adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); return adaptedTrades.getTrades().get(0); } ); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java index dd782afdc..20bc346b5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java @@ -1,18 +1,7 @@ package info.bitrich.xchangestream.gdax; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.knowm.xchange.ExchangeSpecification; -import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; -import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketSubscriptionMessage; import info.bitrich.xchangestream.gdax.netty.WebSocketClientCompressionAllowClientNoContextHandler; @@ -22,6 +11,12 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.Observable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; public class GDAXStreamingService extends JsonNettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingService.class); @@ -30,13 +25,11 @@ public class GDAXStreamingService extends JsonNettyStreamingService { private static final String SHARE_CHANNEL_NAME = "ALL"; private final Map> subscriptions = new HashMap<>(); private ProductSubscription product = null; - GDAXStreamingExchange exchange; private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - public GDAXStreamingService(String apiUrl, GDAXStreamingExchange exchange) { + public GDAXStreamingService(String apiUrl) { super(apiUrl, Integer.MAX_VALUE); - this.exchange = exchange; } public ProductSubscription getProduct() { @@ -73,27 +66,15 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); - GDAXWebsocketAuthData authData = null; - if ( exchangeSpec.getApiKey() != null ) { - GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); - authData = rawAccountService.getWebsocketAuthData(); - } - GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData); + GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } @Override public String getUnsubscribeMessage(String channelName) throws IOException { - ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); - GDAXWebsocketAuthData authData = null; - if ( exchangeSpec.getApiKey() != null ) { - GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); - authData = rawAccountService.getWebsocketAuthData(); - } GDAXWebSocketSubscriptionMessage subscribeMessage = - new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData); + new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java index 8f137cd2a..f40aed31b 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java @@ -1,10 +1,8 @@ package info.bitrich.xchangestream.gdax.dto; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import info.bitrich.xchangestream.core.ProductSubscription; import org.knowm.xchange.currency.CurrencyPair; -import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; import java.util.ArrayList; import java.util.HashMap; @@ -21,12 +19,6 @@ public class GDAXWebSocketSubscriptionMessage { public static final String PRODUCT_IDS = "product_ids"; public static final String NAME = "name"; - // if authenticating - public static final String SIGNATURE = "signature"; - public static final String KEY = "key"; - public static final String PASSPHRASE = "passphrase"; - public static final String TIMESTAMP = "timestamp"; - class GDAXProductSubsctiption { @JsonProperty(NAME) private String name; @@ -34,7 +26,7 @@ class GDAXProductSubsctiption { @JsonProperty(PRODUCT_IDS) private String[] productIds; - public GDAXProductSubsctiption(String name, String[] productIds, GDAXWebsocketAuthData authData) { + public GDAXProductSubsctiption(String name, String[] productIds) { this.name = name; this.productIds = productIds; } @@ -53,31 +45,15 @@ public String[] getProductIds() { @JsonProperty(CHANNELS) private GDAXProductSubsctiption[] channels; - - @JsonInclude(JsonInclude.Include.NON_EMPTY) - @JsonProperty(SIGNATURE) - String signature; - - @JsonInclude(JsonInclude.Include.NON_EMPTY) - @JsonProperty(KEY) - String key; - - @JsonInclude(JsonInclude.Include.NON_EMPTY) - @JsonProperty(PASSPHRASE) - String passphrase; - - @JsonInclude(JsonInclude.Include.NON_EMPTY) - @JsonProperty(TIMESTAMP) - String timestamp; - - public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product, GDAXWebsocketAuthData authData) { + + public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product) { this.type = type; - generateSubscriptionMessage(product, authData); + generateSubscriptionMessage(product); } - public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames, GDAXWebsocketAuthData authData) { + public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames) { this.type = type; - generateSubscriptionMessage(channelNames, authData); + generateSubscriptionMessage(channelNames); } private String[] generateProductIds(CurrencyPair[] pairs) { @@ -89,48 +65,39 @@ private String[] generateProductIds(CurrencyPair[] pairs) { return productIds.toArray(new String[productIds.size()]); } - private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs, GDAXWebsocketAuthData authData) { + private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs) { String[] productsIds; productsIds = generateProductIds(pairs); - return new GDAXProductSubsctiption(name, productsIds, authData); + return new GDAXProductSubsctiption(name, productsIds); } - private void generateSubscriptionMessage(String[] channelNames, GDAXWebsocketAuthData authData) { + private void generateSubscriptionMessage(String[] channelNames) { List channels = new ArrayList<>(3); for (String name : channelNames) { - channels.add(new GDAXProductSubsctiption(name, null, authData)); + channels.add(new GDAXProductSubsctiption(name, null)); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); } - private void generateSubscriptionMessage(ProductSubscription productSubscription, GDAXWebsocketAuthData authData) { + private void generateSubscriptionMessage(ProductSubscription productSubscription) { List channels = new ArrayList<>(3); Map> pairs = new HashMap<>(3); pairs.put("level2", productSubscription.getOrderBook()); pairs.put("ticker", productSubscription.getTicker()); pairs.put("matches", productSubscription.getTrades()); - if ( authData != null ) - pairs.put("user", productSubscription.getTrades()); for (Map.Entry> product : pairs.entrySet()) { List currencyPairs = product.getValue(); if (currencyPairs == null || currencyPairs.size() == 0) { continue; } - GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]), authData); + GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()])); channels.add(gdaxProduct); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); - - if ( authData != null ) { - this.key = authData.getKey(); - this.passphrase = authData.getPassphrase(); - this.signature = authData.getSignature(); - this.timestamp = String.valueOf(authData.getTimestamp()); - } } public String getType() { diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java index 26326b327..295032991 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java @@ -5,7 +5,6 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductStats; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; -import org.knowm.xchange.gdax.dto.trade.GDAXFill; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -39,12 +38,7 @@ public class GDAXWebSocketTransaction { private final String reason; private final long tradeId; private final String makerOrderId; - private final String takerOrderId; - - private final String takerUserId; - private final String userId; - private final String takerProfileId; - private final String profileId; + private final String takenOrderId; public GDAXWebSocketTransaction( @JsonProperty("type") String type, @@ -71,17 +65,13 @@ public GDAXWebSocketTransaction( @JsonProperty("reason") String reason, @JsonProperty("trade_id") long tradeId, @JsonProperty("maker_order_id") String makerOrderId, - @JsonProperty("taker_order_id") String takerOrderId, - @JsonProperty("taker_user_id") String takerUserId, - @JsonProperty("user_id") String userId, - @JsonProperty("taker_profile_id") String takerProfileId, - @JsonProperty("profile_id") String profileId) { + @JsonProperty("taken_order_id") String takenOrderId) { this.remainingSize = remainingSize; this.reason = reason; this.tradeId = tradeId; this.makerOrderId = makerOrderId; - this.takerOrderId = takerOrderId; + this.takenOrderId = takenOrderId; this.type = type; this.orderId = orderId; this.orderType = orderType; @@ -102,10 +92,6 @@ public GDAXWebSocketTransaction( this.productId = productId; this.sequence = sequence; this.time = time; - this.takerUserId = takerUserId; - this.userId = userId; - this.takerProfileId = takerProfileId; - this.profileId = profileId; } private String[][] GDAXOrderBookChanges(String side, String[][] changes, SortedMap sideEntries, @@ -165,11 +151,6 @@ public GDAXTrade toGDAXTrade() { return new GDAXTrade(time, tradeId, price, size, side); } - public GDAXFill toGDAXFill() { - boolean taker = userId != null && takerUserId != null && userId.equals(takerUserId); - return new GDAXFill(String.valueOf(tradeId), productId, price, size, taker ? takerOrderId : makerOrderId, time, null, null, true, side); - } - public String getType() { return type; } @@ -254,31 +235,8 @@ public String getMakerOrderId() { return makerOrderId; } - /** - * @deprecated Use {@link #getTakerOrderId()} - */ public String getTakenOrderId() { - return takerOrderId; - } - - public String getTakerOrderId() { - return takerOrderId; - } - - public String getTakerUserId() { - return takerUserId; - } - - public String getUserId() { - return userId; - } - - public String getTakerProfileId() { - return takerProfileId; - } - - public String getProfileId() { - return profileId; + return takenOrderId; } @Override @@ -301,14 +259,6 @@ public String toString() { sb.append(", productId='").append(productId).append('\''); sb.append(", sequence=").append(sequence); sb.append(", time='").append(time).append('\''); - if ( userId != null ) - sb.append(", userId='").append(userId).append('\''); - if ( profileId != null ) - sb.append(", profileId='").append(profileId).append('\''); - if ( takerUserId != null ) - sb.append(", takerUserId='").append(takerUserId).append('\''); - if ( takerProfileId != null ) - sb.append(", takerProfileId='").append(takerProfileId).append('\''); sb.append('}'); return sb.toString(); } diff --git a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java index 9b754f0da..279ac81df 100644 --- a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java +++ b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java @@ -18,7 +18,7 @@ public void testWebSocketMessageSerialization() throws JsonProcessingException { ProductSubscription productSubscription = ProductSubscription.create().addOrderbook(CurrencyPair.BTC_USD) .addTrades(CurrencyPair.BTC_USD).addTicker(CurrencyPair.BTC_USD).build(); - GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription, null); + GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription); final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); From beb0a9d276a1c02041fbe048a558004fa43c088d Mon Sep 17 00:00:00 2001 From: bharris Date: Fri, 6 Apr 2018 10:11:34 -0700 Subject: [PATCH 3/4] [gdax] Enabling of 'user' channel to receive fill messages. (reverted from commit 74f5c40320db806f48aeaa22778b8049cee5c4bf) (reverted from commit 50a77e637ba8dfad6f7dff74990b93cde2a714db) --- .../gdax/GDAXStreamingExchange.java | 2 +- .../gdax/GDAXStreamingMarketDataService.java | 34 +++++++---- .../gdax/GDAXStreamingService.java | 37 +++++++++--- .../dto/GDAXWebSocketSubscriptionMessage.java | 57 ++++++++++++++---- .../gdax/dto/GDAXWebSocketTransaction.java | 58 +++++++++++++++++-- .../GDAXWebSocketSubscriptionMessageTest.java | 2 +- 6 files changed, 153 insertions(+), 37 deletions(-) diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java index 80ea24867..7d912efc5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java @@ -28,7 +28,7 @@ protected void initServices() { public Completable connect(ProductSubscription... args) { if (args == null || args.length == 0) throw new UnsupportedOperationException("The ProductSubscription must be defined!"); - this.streamingService = new GDAXStreamingService(API_URI); + this.streamingService = new GDAXStreamingService(API_URI, this); this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService); streamingService.subscribeMultipleCurrencyPairs(args); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java index 7410f51d3..806c7c2b5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java @@ -1,10 +1,18 @@ package info.bitrich.xchangestream.gdax; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.core.StreamingMarketDataService; -import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; -import io.reactivex.Observable; +import static io.netty.util.internal.StringUtil.isNullOrEmpty; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; @@ -13,14 +21,16 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static io.netty.util.internal.StringUtil.isNullOrEmpty; -import static org.knowm.xchange.gdax.GDAXAdapters.*; +import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; +import io.reactivex.Observable; /** * Created by luca on 4/3/17. @@ -142,7 +152,11 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { .filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") && message.getProductId().equals(channelName)) .map(s -> { - Trades adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); + Trades adaptedTrades = null; + if ( s.getUserId() != null ) + adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()}); + else + adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); return adaptedTrades.getTrades().get(0); } ); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java index 20bc346b5..dd782afdc 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java @@ -1,7 +1,18 @@ package info.bitrich.xchangestream.gdax; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.knowm.xchange.ExchangeSpecification; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; +import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketSubscriptionMessage; import info.bitrich.xchangestream.gdax.netty.WebSocketClientCompressionAllowClientNoContextHandler; @@ -11,12 +22,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.Observable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; public class GDAXStreamingService extends JsonNettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingService.class); @@ -25,11 +30,13 @@ public class GDAXStreamingService extends JsonNettyStreamingService { private static final String SHARE_CHANNEL_NAME = "ALL"; private final Map> subscriptions = new HashMap<>(); private ProductSubscription product = null; + GDAXStreamingExchange exchange; private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - public GDAXStreamingService(String apiUrl) { + public GDAXStreamingService(String apiUrl, GDAXStreamingExchange exchange) { super(apiUrl, Integer.MAX_VALUE); + this.exchange = exchange; } public ProductSubscription getProduct() { @@ -66,15 +73,27 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product); + ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } + GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } @Override public String getUnsubscribeMessage(String channelName) throws IOException { + ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } GDAXWebSocketSubscriptionMessage subscribeMessage = - new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}); + new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java index f40aed31b..8f137cd2a 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java @@ -1,8 +1,10 @@ package info.bitrich.xchangestream.gdax.dto; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import info.bitrich.xchangestream.core.ProductSubscription; import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; import java.util.ArrayList; import java.util.HashMap; @@ -19,6 +21,12 @@ public class GDAXWebSocketSubscriptionMessage { public static final String PRODUCT_IDS = "product_ids"; public static final String NAME = "name"; + // if authenticating + public static final String SIGNATURE = "signature"; + public static final String KEY = "key"; + public static final String PASSPHRASE = "passphrase"; + public static final String TIMESTAMP = "timestamp"; + class GDAXProductSubsctiption { @JsonProperty(NAME) private String name; @@ -26,7 +34,7 @@ class GDAXProductSubsctiption { @JsonProperty(PRODUCT_IDS) private String[] productIds; - public GDAXProductSubsctiption(String name, String[] productIds) { + public GDAXProductSubsctiption(String name, String[] productIds, GDAXWebsocketAuthData authData) { this.name = name; this.productIds = productIds; } @@ -45,15 +53,31 @@ public String[] getProductIds() { @JsonProperty(CHANNELS) private GDAXProductSubsctiption[] channels; - - public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product) { + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(SIGNATURE) + String signature; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(KEY) + String key; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(PASSPHRASE) + String passphrase; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(TIMESTAMP) + String timestamp; + + public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(product); + generateSubscriptionMessage(product, authData); } - public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames) { + public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(channelNames); + generateSubscriptionMessage(channelNames, authData); } private String[] generateProductIds(CurrencyPair[] pairs) { @@ -65,39 +89,48 @@ private String[] generateProductIds(CurrencyPair[] pairs) { return productIds.toArray(new String[productIds.size()]); } - private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs) { + private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs, GDAXWebsocketAuthData authData) { String[] productsIds; productsIds = generateProductIds(pairs); - return new GDAXProductSubsctiption(name, productsIds); + return new GDAXProductSubsctiption(name, productsIds, authData); } - private void generateSubscriptionMessage(String[] channelNames) { + private void generateSubscriptionMessage(String[] channelNames, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); for (String name : channelNames) { - channels.add(new GDAXProductSubsctiption(name, null)); + channels.add(new GDAXProductSubsctiption(name, null, authData)); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); } - private void generateSubscriptionMessage(ProductSubscription productSubscription) { + private void generateSubscriptionMessage(ProductSubscription productSubscription, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); Map> pairs = new HashMap<>(3); pairs.put("level2", productSubscription.getOrderBook()); pairs.put("ticker", productSubscription.getTicker()); pairs.put("matches", productSubscription.getTrades()); + if ( authData != null ) + pairs.put("user", productSubscription.getTrades()); for (Map.Entry> product : pairs.entrySet()) { List currencyPairs = product.getValue(); if (currencyPairs == null || currencyPairs.size() == 0) { continue; } - GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()])); + GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]), authData); channels.add(gdaxProduct); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); + + if ( authData != null ) { + this.key = authData.getKey(); + this.passphrase = authData.getPassphrase(); + this.signature = authData.getSignature(); + this.timestamp = String.valueOf(authData.getTimestamp()); + } } public String getType() { diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java index 295032991..26326b327 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java @@ -5,6 +5,7 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductStats; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -38,7 +39,12 @@ public class GDAXWebSocketTransaction { private final String reason; private final long tradeId; private final String makerOrderId; - private final String takenOrderId; + private final String takerOrderId; + + private final String takerUserId; + private final String userId; + private final String takerProfileId; + private final String profileId; public GDAXWebSocketTransaction( @JsonProperty("type") String type, @@ -65,13 +71,17 @@ public GDAXWebSocketTransaction( @JsonProperty("reason") String reason, @JsonProperty("trade_id") long tradeId, @JsonProperty("maker_order_id") String makerOrderId, - @JsonProperty("taken_order_id") String takenOrderId) { + @JsonProperty("taker_order_id") String takerOrderId, + @JsonProperty("taker_user_id") String takerUserId, + @JsonProperty("user_id") String userId, + @JsonProperty("taker_profile_id") String takerProfileId, + @JsonProperty("profile_id") String profileId) { this.remainingSize = remainingSize; this.reason = reason; this.tradeId = tradeId; this.makerOrderId = makerOrderId; - this.takenOrderId = takenOrderId; + this.takerOrderId = takerOrderId; this.type = type; this.orderId = orderId; this.orderType = orderType; @@ -92,6 +102,10 @@ public GDAXWebSocketTransaction( this.productId = productId; this.sequence = sequence; this.time = time; + this.takerUserId = takerUserId; + this.userId = userId; + this.takerProfileId = takerProfileId; + this.profileId = profileId; } private String[][] GDAXOrderBookChanges(String side, String[][] changes, SortedMap sideEntries, @@ -151,6 +165,11 @@ public GDAXTrade toGDAXTrade() { return new GDAXTrade(time, tradeId, price, size, side); } + public GDAXFill toGDAXFill() { + boolean taker = userId != null && takerUserId != null && userId.equals(takerUserId); + return new GDAXFill(String.valueOf(tradeId), productId, price, size, taker ? takerOrderId : makerOrderId, time, null, null, true, side); + } + public String getType() { return type; } @@ -235,8 +254,31 @@ public String getMakerOrderId() { return makerOrderId; } + /** + * @deprecated Use {@link #getTakerOrderId()} + */ public String getTakenOrderId() { - return takenOrderId; + return takerOrderId; + } + + public String getTakerOrderId() { + return takerOrderId; + } + + public String getTakerUserId() { + return takerUserId; + } + + public String getUserId() { + return userId; + } + + public String getTakerProfileId() { + return takerProfileId; + } + + public String getProfileId() { + return profileId; } @Override @@ -259,6 +301,14 @@ public String toString() { sb.append(", productId='").append(productId).append('\''); sb.append(", sequence=").append(sequence); sb.append(", time='").append(time).append('\''); + if ( userId != null ) + sb.append(", userId='").append(userId).append('\''); + if ( profileId != null ) + sb.append(", profileId='").append(profileId).append('\''); + if ( takerUserId != null ) + sb.append(", takerUserId='").append(takerUserId).append('\''); + if ( takerProfileId != null ) + sb.append(", takerProfileId='").append(takerProfileId).append('\''); sb.append('}'); return sb.toString(); } diff --git a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java index 279ac81df..9b754f0da 100644 --- a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java +++ b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java @@ -18,7 +18,7 @@ public void testWebSocketMessageSerialization() throws JsonProcessingException { ProductSubscription productSubscription = ProductSubscription.create().addOrderbook(CurrencyPair.BTC_USD) .addTrades(CurrencyPair.BTC_USD).addTicker(CurrencyPair.BTC_USD).build(); - GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription); + GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription, null); final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); From a280d6125afc45ca3aad9f416071647fbf12b673 Mon Sep 17 00:00:00 2001 From: bharris Date: Wed, 16 May 2018 13:43:08 -0700 Subject: [PATCH 4/4] [gdax] Refactored obtaining of Websocket Auth Data to occur in the connect() call. --- .../gdax/GDAXStreamingExchange.java | 16 ++++++++++++++- .../gdax/GDAXStreamingService.java | 20 +++---------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java index 7d912efc5..d08572336 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java @@ -7,6 +7,8 @@ import io.reactivex.Completable; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.gdax.GDAXExchange; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; +import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; /** * GDAX Streaming Exchange. Connects to live WebSocket feed. @@ -28,7 +30,19 @@ protected void initServices() { public Completable connect(ProductSubscription... args) { if (args == null || args.length == 0) throw new UnsupportedOperationException("The ProductSubscription must be defined!"); - this.streamingService = new GDAXStreamingService(API_URI, this); + ExchangeSpecification exchangeSpec = getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + try { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } + catch (Exception e) { + logger.warn("Failed attempting to acquire Websocket AuthData needed for private data on" + + " websocket. Will only receive public information via API", e); + } + } + this.streamingService = new GDAXStreamingService(API_URI, authData); this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService); streamingService.subscribeMultipleCurrencyPairs(args); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java index dd782afdc..e16398d5a 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java @@ -4,9 +4,7 @@ import java.util.HashMap; import java.util.Map; -import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; -import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,13 +28,13 @@ public class GDAXStreamingService extends JsonNettyStreamingService { private static final String SHARE_CHANNEL_NAME = "ALL"; private final Map> subscriptions = new HashMap<>(); private ProductSubscription product = null; - GDAXStreamingExchange exchange; + private GDAXWebsocketAuthData authData = null; private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - public GDAXStreamingService(String apiUrl, GDAXStreamingExchange exchange) { + public GDAXStreamingService(String apiUrl,GDAXWebsocketAuthData authData) { super(apiUrl, Integer.MAX_VALUE); - this.exchange = exchange; + this.authData = authData; } public ProductSubscription getProduct() { @@ -73,12 +71,6 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); - GDAXWebsocketAuthData authData = null; - if ( exchangeSpec.getApiKey() != null ) { - GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); - authData = rawAccountService.getWebsocketAuthData(); - } GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); @@ -86,12 +78,6 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE @Override public String getUnsubscribeMessage(String channelName) throws IOException { - ExchangeSpecification exchangeSpec = exchange.getExchangeSpecification(); - GDAXWebsocketAuthData authData = null; - if ( exchangeSpec.getApiKey() != null ) { - GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) exchange.getAccountService(); - authData = rawAccountService.getWebsocketAuthData(); - } GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData); ObjectMapper objectMapper = new ObjectMapper();