diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java index 80ea24867..d08572336 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingExchange.java @@ -7,6 +7,8 @@ import io.reactivex.Completable; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.gdax.GDAXExchange; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; +import org.knowm.xchange.gdax.service.GDAXAccountServiceRaw; /** * GDAX Streaming Exchange. Connects to live WebSocket feed. @@ -28,7 +30,19 @@ protected void initServices() { public Completable connect(ProductSubscription... args) { if (args == null || args.length == 0) throw new UnsupportedOperationException("The ProductSubscription must be defined!"); - this.streamingService = new GDAXStreamingService(API_URI); + ExchangeSpecification exchangeSpec = getExchangeSpecification(); + GDAXWebsocketAuthData authData = null; + if ( exchangeSpec.getApiKey() != null ) { + try { + GDAXAccountServiceRaw rawAccountService = (GDAXAccountServiceRaw) getAccountService(); + authData = rawAccountService.getWebsocketAuthData(); + } + catch (Exception e) { + logger.warn("Failed attempting to acquire Websocket AuthData needed for private data on" + + " websocket. Will only receive public information via API", e); + } + } + this.streamingService = new GDAXStreamingService(API_URI, authData); this.streamingMarketDataService = new GDAXStreamingMarketDataService(this.streamingService); streamingService.subscribeMultipleCurrencyPairs(args); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java index 7410f51d3..806c7c2b5 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingMarketDataService.java @@ -1,10 +1,18 @@ package info.bitrich.xchangestream.gdax; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import info.bitrich.xchangestream.core.StreamingMarketDataService; -import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; -import io.reactivex.Observable; +import static io.netty.util.internal.StringUtil.isNullOrEmpty; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptOrderBook; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTicker; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTradeHistory; +import static org.knowm.xchange.gdax.GDAXAdapters.adaptTrades; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dto.marketdata.OrderBook; import org.knowm.xchange.dto.marketdata.Ticker; @@ -13,14 +21,16 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductBook; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.math.BigDecimal; -import java.util.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static io.netty.util.internal.StringUtil.isNullOrEmpty; -import static org.knowm.xchange.gdax.GDAXAdapters.*; +import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketTransaction; +import io.reactivex.Observable; /** * Created by luca on 4/3/17. @@ -142,7 +152,11 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { .filter(message -> !isNullOrEmpty(message.getType()) && message.getType().equals("match") && message.getProductId().equals(channelName)) .map(s -> { - Trades adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); + Trades adaptedTrades = null; + if ( s.getUserId() != null ) + adaptedTrades = adaptTradeHistory(new GDAXFill[]{s.toGDAXFill()}); + else + adaptedTrades = adaptTrades(new GDAXTrade[]{s.toGDAXTrade()}, currencyPair); return adaptedTrades.getTrades().get(0); } ); diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java index 20bc346b5..e16398d5a 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/GDAXStreamingService.java @@ -1,7 +1,16 @@ package info.bitrich.xchangestream.gdax; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.gdax.dto.GDAXWebSocketSubscriptionMessage; import info.bitrich.xchangestream.gdax.netty.WebSocketClientCompressionAllowClientNoContextHandler; @@ -11,12 +20,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.Observable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; public class GDAXStreamingService extends JsonNettyStreamingService { private static final Logger LOG = LoggerFactory.getLogger(GDAXStreamingService.class); @@ -25,11 +28,13 @@ public class GDAXStreamingService extends JsonNettyStreamingService { private static final String SHARE_CHANNEL_NAME = "ALL"; private final Map> subscriptions = new HashMap<>(); private ProductSubscription product = null; + private GDAXWebsocketAuthData authData = null; private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null; - public GDAXStreamingService(String apiUrl) { + public GDAXStreamingService(String apiUrl,GDAXWebsocketAuthData authData) { super(apiUrl, Integer.MAX_VALUE); + this.authData = authData; } public ProductSubscription getProduct() { @@ -66,7 +71,7 @@ protected String getChannelNameFromMessage(JsonNode message) { @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { - GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product); + GDAXWebSocketSubscriptionMessage subscribeMessage = new GDAXWebSocketSubscriptionMessage(SUBSCRIBE, product, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } @@ -74,7 +79,7 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE @Override public String getUnsubscribeMessage(String channelName) throws IOException { GDAXWebSocketSubscriptionMessage subscribeMessage = - new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}); + new GDAXWebSocketSubscriptionMessage(UNSUBSCRIBE, new String[]{"level2", "matches", "ticker"}, authData); ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(subscribeMessage); } diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java index f40aed31b..8f137cd2a 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessage.java @@ -1,8 +1,10 @@ package info.bitrich.xchangestream.gdax.dto; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import info.bitrich.xchangestream.core.ProductSubscription; import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.gdax.dto.account.GDAXWebsocketAuthData; import java.util.ArrayList; import java.util.HashMap; @@ -19,6 +21,12 @@ public class GDAXWebSocketSubscriptionMessage { public static final String PRODUCT_IDS = "product_ids"; public static final String NAME = "name"; + // if authenticating + public static final String SIGNATURE = "signature"; + public static final String KEY = "key"; + public static final String PASSPHRASE = "passphrase"; + public static final String TIMESTAMP = "timestamp"; + class GDAXProductSubsctiption { @JsonProperty(NAME) private String name; @@ -26,7 +34,7 @@ class GDAXProductSubsctiption { @JsonProperty(PRODUCT_IDS) private String[] productIds; - public GDAXProductSubsctiption(String name, String[] productIds) { + public GDAXProductSubsctiption(String name, String[] productIds, GDAXWebsocketAuthData authData) { this.name = name; this.productIds = productIds; } @@ -45,15 +53,31 @@ public String[] getProductIds() { @JsonProperty(CHANNELS) private GDAXProductSubsctiption[] channels; - - public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product) { + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(SIGNATURE) + String signature; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(KEY) + String key; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(PASSPHRASE) + String passphrase; + + @JsonInclude(JsonInclude.Include.NON_EMPTY) + @JsonProperty(TIMESTAMP) + String timestamp; + + public GDAXWebSocketSubscriptionMessage(String type, ProductSubscription product, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(product); + generateSubscriptionMessage(product, authData); } - public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames) { + public GDAXWebSocketSubscriptionMessage(String type, String[] channelNames, GDAXWebsocketAuthData authData) { this.type = type; - generateSubscriptionMessage(channelNames); + generateSubscriptionMessage(channelNames, authData); } private String[] generateProductIds(CurrencyPair[] pairs) { @@ -65,39 +89,48 @@ private String[] generateProductIds(CurrencyPair[] pairs) { return productIds.toArray(new String[productIds.size()]); } - private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs) { + private GDAXProductSubsctiption generateGDAXProduct(String name, CurrencyPair[] pairs, GDAXWebsocketAuthData authData) { String[] productsIds; productsIds = generateProductIds(pairs); - return new GDAXProductSubsctiption(name, productsIds); + return new GDAXProductSubsctiption(name, productsIds, authData); } - private void generateSubscriptionMessage(String[] channelNames) { + private void generateSubscriptionMessage(String[] channelNames, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); for (String name : channelNames) { - channels.add(new GDAXProductSubsctiption(name, null)); + channels.add(new GDAXProductSubsctiption(name, null, authData)); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); } - private void generateSubscriptionMessage(ProductSubscription productSubscription) { + private void generateSubscriptionMessage(ProductSubscription productSubscription, GDAXWebsocketAuthData authData) { List channels = new ArrayList<>(3); Map> pairs = new HashMap<>(3); pairs.put("level2", productSubscription.getOrderBook()); pairs.put("ticker", productSubscription.getTicker()); pairs.put("matches", productSubscription.getTrades()); + if ( authData != null ) + pairs.put("user", productSubscription.getTrades()); for (Map.Entry> product : pairs.entrySet()) { List currencyPairs = product.getValue(); if (currencyPairs == null || currencyPairs.size() == 0) { continue; } - GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()])); + GDAXProductSubsctiption gdaxProduct = generateGDAXProduct(product.getKey(), product.getValue().toArray(new CurrencyPair[product.getValue().size()]), authData); channels.add(gdaxProduct); } this.channels = channels.toArray(new GDAXProductSubsctiption[channels.size()]); + + if ( authData != null ) { + this.key = authData.getKey(); + this.passphrase = authData.getPassphrase(); + this.signature = authData.getSignature(); + this.timestamp = String.valueOf(authData.getTimestamp()); + } } public String getType() { diff --git a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java index 295032991..26326b327 100644 --- a/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java +++ b/xchange-gdax/src/main/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketTransaction.java @@ -5,6 +5,7 @@ import org.knowm.xchange.gdax.dto.marketdata.GDAXProductStats; import org.knowm.xchange.gdax.dto.marketdata.GDAXProductTicker; import org.knowm.xchange.gdax.dto.marketdata.GDAXTrade; +import org.knowm.xchange.gdax.dto.trade.GDAXFill; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -38,7 +39,12 @@ public class GDAXWebSocketTransaction { private final String reason; private final long tradeId; private final String makerOrderId; - private final String takenOrderId; + private final String takerOrderId; + + private final String takerUserId; + private final String userId; + private final String takerProfileId; + private final String profileId; public GDAXWebSocketTransaction( @JsonProperty("type") String type, @@ -65,13 +71,17 @@ public GDAXWebSocketTransaction( @JsonProperty("reason") String reason, @JsonProperty("trade_id") long tradeId, @JsonProperty("maker_order_id") String makerOrderId, - @JsonProperty("taken_order_id") String takenOrderId) { + @JsonProperty("taker_order_id") String takerOrderId, + @JsonProperty("taker_user_id") String takerUserId, + @JsonProperty("user_id") String userId, + @JsonProperty("taker_profile_id") String takerProfileId, + @JsonProperty("profile_id") String profileId) { this.remainingSize = remainingSize; this.reason = reason; this.tradeId = tradeId; this.makerOrderId = makerOrderId; - this.takenOrderId = takenOrderId; + this.takerOrderId = takerOrderId; this.type = type; this.orderId = orderId; this.orderType = orderType; @@ -92,6 +102,10 @@ public GDAXWebSocketTransaction( this.productId = productId; this.sequence = sequence; this.time = time; + this.takerUserId = takerUserId; + this.userId = userId; + this.takerProfileId = takerProfileId; + this.profileId = profileId; } private String[][] GDAXOrderBookChanges(String side, String[][] changes, SortedMap sideEntries, @@ -151,6 +165,11 @@ public GDAXTrade toGDAXTrade() { return new GDAXTrade(time, tradeId, price, size, side); } + public GDAXFill toGDAXFill() { + boolean taker = userId != null && takerUserId != null && userId.equals(takerUserId); + return new GDAXFill(String.valueOf(tradeId), productId, price, size, taker ? takerOrderId : makerOrderId, time, null, null, true, side); + } + public String getType() { return type; } @@ -235,8 +254,31 @@ public String getMakerOrderId() { return makerOrderId; } + /** + * @deprecated Use {@link #getTakerOrderId()} + */ public String getTakenOrderId() { - return takenOrderId; + return takerOrderId; + } + + public String getTakerOrderId() { + return takerOrderId; + } + + public String getTakerUserId() { + return takerUserId; + } + + public String getUserId() { + return userId; + } + + public String getTakerProfileId() { + return takerProfileId; + } + + public String getProfileId() { + return profileId; } @Override @@ -259,6 +301,14 @@ public String toString() { sb.append(", productId='").append(productId).append('\''); sb.append(", sequence=").append(sequence); sb.append(", time='").append(time).append('\''); + if ( userId != null ) + sb.append(", userId='").append(userId).append('\''); + if ( profileId != null ) + sb.append(", profileId='").append(profileId).append('\''); + if ( takerUserId != null ) + sb.append(", takerUserId='").append(takerUserId).append('\''); + if ( takerProfileId != null ) + sb.append(", takerProfileId='").append(takerProfileId).append('\''); sb.append('}'); return sb.toString(); } diff --git a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java index 279ac81df..9b754f0da 100644 --- a/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java +++ b/xchange-gdax/src/test/java/info/bitrich/xchangestream/gdax/dto/GDAXWebSocketSubscriptionMessageTest.java @@ -18,7 +18,7 @@ public void testWebSocketMessageSerialization() throws JsonProcessingException { ProductSubscription productSubscription = ProductSubscription.create().addOrderbook(CurrencyPair.BTC_USD) .addTrades(CurrencyPair.BTC_USD).addTicker(CurrencyPair.BTC_USD).build(); - GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription); + GDAXWebSocketSubscriptionMessage message = new GDAXWebSocketSubscriptionMessage("subscribe", productSubscription, null); final ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);