Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

[bitfinex] Added some methods of authenticated api (version 2) #273

Merged
merged 9 commits into from
Jan 29, 2019
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package info.bitrich.xchangestream.service.netty;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -42,4 +44,12 @@ public void messageHandler(String message) {
handleMessage(jsonNode);
}
}

protected void sendObjectMessage(Object message) {
try {
sendMessage(objectMapper.writeValueAsString(message));
} catch (JsonProcessingException e) {
LOG.error("Error creating json message: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package info.bitrich.xchangestream.bitfinex;

import com.fasterxml.jackson.databind.JsonNode;

import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade;

import io.reactivex.annotations.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.stream.Stream;

import static java.util.stream.StreamSupport.stream;

class BitfinexStreamingAdapters {

private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingAdapters.class);

@Nullable
static BitfinexWebSocketAuthPreTrade adaptPreTrade(JsonNode preTrade) {
if (preTrade.size() < 12) {
LOG.error("addPreTrade unexpected record size={}, record={}", preTrade.size(), preTrade.toString());
return null;
}
long id = preTrade.get(0).longValue();
String pair = preTrade.get(1).textValue();
long mtsCreate = preTrade.get(2).longValue();
long orderId = preTrade.get(3).longValue();
BigDecimal execAmount = preTrade.get(4).decimalValue();
BigDecimal execPrice = preTrade.get(5).decimalValue();
String orderType = preTrade.get(6).textValue();
BigDecimal orderPrice = preTrade.get(7).decimalValue();
int maker = preTrade.get(8).intValue();
BitfinexWebSocketAuthPreTrade preTradeObject = new BitfinexWebSocketAuthPreTrade(id, pair, mtsCreate, orderId,
execAmount, execPrice, orderType, orderPrice, maker);
LOG.debug("New pre trade: {}", preTradeObject);
return preTradeObject;
}

@Nullable
static BitfinexWebSocketAuthTrade adaptTrade(JsonNode trade) {
if (trade.size() < 11) {
LOG.error("addTrade unexpected record size={}, record={}", trade.size(), trade.toString());
return null;
}
long id = trade.get(0).longValue();
String pair = trade.get(1).textValue();
long mtsCreate = trade.get(2).longValue();
long orderId = trade.get(3).longValue();
BigDecimal execAmount = trade.get(4).decimalValue();
BigDecimal execPrice = trade.get(5).decimalValue();
String orderType = trade.get(6).textValue();
BigDecimal orderPrice = trade.get(7).decimalValue();
int maker = trade.get(8).intValue();
BigDecimal fee = trade.get(9).decimalValue();
String currency = trade.get(10).textValue();
BitfinexWebSocketAuthTrade tradeObject = new BitfinexWebSocketAuthTrade(
id, pair, mtsCreate, orderId, execAmount, execPrice, orderType, orderPrice, maker, fee, currency
);
LOG.debug("New trade: {}", tradeObject);
return tradeObject;
}

static Stream<BitfinexWebSocketAuthOrder> adaptOrders(JsonNode orders) {
Iterable<JsonNode> iterator = () -> orders.iterator();
return stream(iterator.spliterator(), false)
.filter(o -> o.size() >= 32)
.map(BitfinexStreamingAdapters::createOrderObject)
.peek(o -> LOG.debug("New order: {}", o));
}

@Nullable
static BitfinexWebSocketAuthOrder adaptOrder(JsonNode order) {
BitfinexWebSocketAuthOrder orderObject = createOrderObject(order);
if (orderObject == null) {
return null;
}
LOG.debug("Updated order: {}", orderObject);
return orderObject;
}

@Nullable
static BitfinexWebSocketAuthBalance adaptBalance(JsonNode balance) {
BitfinexWebSocketAuthBalance balanceObject = createBalanceObject(balance);
if (balanceObject == null) {
return null;
}
LOG.debug("Balance: {}", balanceObject);
return balanceObject;
}

static Stream<BitfinexWebSocketAuthBalance> adaptBalances(JsonNode balances) {
Iterable<JsonNode> iterator = () -> balances.iterator();
return stream(iterator.spliterator(), false)
.filter(o -> o.size() >= 5)
.map(BitfinexStreamingAdapters::createBalanceObject)
.peek(o -> LOG.debug("Balance: {}", o));
}

@Nullable
static private BitfinexWebSocketAuthBalance createBalanceObject(JsonNode balance) {
if (balance.size() < 5) {
LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString());
return null;
}

String walletType = balance.get(0).textValue();
String currency = balance.get(1).textValue();
BigDecimal balanceValue = balance.get(2).decimalValue();
BigDecimal unsettledInterest = balance.get(3).decimalValue();
BigDecimal balanceAvailable = balance.get(4).asText().equals("null") ? null : balance.get(4).decimalValue();

return new BitfinexWebSocketAuthBalance(walletType, currency, balanceValue, unsettledInterest, balanceAvailable);
}

@Nullable
static private BitfinexWebSocketAuthOrder createOrderObject(JsonNode order) {
if (order.size() < 32) {
LOG.error("createOrderObject unexpected record size={}, record={}", order.size(), order.toString());
return null;
}

long id = order.get(0).longValue();
long groupId = order.get(1).longValue();
long cid = order.get(2).longValue();
String symbol = order.get(3).textValue();
long mtsCreate = order.get(4).longValue();
long mtsUpdate = order.get(5).longValue();
BigDecimal amount = order.get(6).decimalValue();
BigDecimal amountOrig = order.get(7).decimalValue();
String type = order.get(8).textValue();
String typePrev = order.get(9).textValue();
int flags = order.get(12).intValue();
String orderStatus = order.get(13).textValue();
BigDecimal price = order.get(16).decimalValue();
BigDecimal priceAvg = order.get(17).decimalValue();
BigDecimal priceTrailing = order.get(18).decimalValue();
BigDecimal priceAuxLimit = order.get(19).decimalValue();
long placedId = order.get(25).longValue();

return new BitfinexWebSocketAuthOrder(
id, groupId, cid, symbol, mtsCreate, mtsUpdate, amount, amountOrig,
type, typePrev, orderStatus, price, priceAvg, priceTrailing,
priceAuxLimit, placedId, flags
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,38 @@

import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;

import io.reactivex.Completable;
import io.reactivex.Observable;

import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.bitfinex.v1.BitfinexExchange;

/**
* Created by Lukas Zaoralek on 7.11.17.
*/
public class BitfinexStreamingExchange extends BitfinexExchange implements StreamingExchange {
private static final String API_URI = "wss://api.bitfinex.com/ws/2";

static final String API_URI = "wss://api.bitfinex.com/ws/2";

private BitfinexStreamingService streamingService;
private BitfinexStreamingMarketDataService streamingMarketDataService;

public BitfinexStreamingExchange() {
}

@Override
protected void initServices() {
super.initServices();
streamingService = createStreamingService();
streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService);
this.streamingService = createStreamingService();
this.streamingMarketDataService = new BitfinexStreamingMarketDataService(streamingService);
}

private BitfinexStreamingService createStreamingService() {
BitfinexStreamingService streamingService = new BitfinexStreamingService(API_URI);
BitfinexStreamingService streamingService = new BitfinexStreamingService(API_URI, getNonceFactory());
applyStreamingSpecification(getExchangeSpecification(), streamingService);
if (StringUtils.isNotEmpty(exchangeSpecification.getApiKey())) {
streamingService.setApiKey(exchangeSpecification.getApiKey());
streamingService.setApiSecret(exchangeSpecification.getSecretKey());
}
return streamingService;
}

Expand Down Expand Up @@ -67,11 +71,14 @@ public ExchangeSpecification getDefaultExchangeSpecification() {
}

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
public BitfinexStreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); }

public boolean isAuthenticatedAlive() {
return streamingService != null && streamingService.isAuthenticated();
}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
package info.bitrich.xchangestream.bitfinex;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitfinex.dto.*;

import info.bitrich.xchangestream.bitfinex.dto.BitfinexOrderbook;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthBalance;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthOrder;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthPreTrade;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketAuthTrade;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketOrderbookTransaction;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotOrderbook;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketSnapshotTrades;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketTickerTransaction;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketTradesTransaction;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebSocketUpdateOrderbook;
import info.bitrich.xchangestream.bitfinex.dto.BitfinexWebsocketUpdateTrade;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;

import io.reactivex.Observable;

import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.marketdata.Trades;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.*;
import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptOrderBook;
import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptTicker;
import static org.knowm.xchange.bitfinex.v1.BitfinexAdapters.adaptTrades;

/**
* Created by Lukas Zaoralek on 7.11.17.
*/
public class BitfinexStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOG = LoggerFactory.getLogger(BitfinexStreamingMarketDataService.class);

private final BitfinexStreamingService service;

private Map<CurrencyPair, BitfinexOrderbook> orderbooks = new HashMap<>();
private final Map<CurrencyPair, BitfinexOrderbook> orderbooks = new HashMap<>();

public BitfinexStreamingMarketDataService(BitfinexStreamingService service) {
this.service = service;
Expand Down Expand Up @@ -95,4 +107,20 @@ public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
return adaptedTrades.getTrades();
});
}

public Observable<BitfinexWebSocketAuthOrder> getRawAuthenticatedOrders() {
return service.getAuthenticatedOrders();
}

public Observable<BitfinexWebSocketAuthPreTrade> getRawAuthenticatedPreTrades() {
return service.getAuthenticatedPreTrades();
}

public Observable<BitfinexWebSocketAuthTrade> getRawAuthenticatedTrades() {
return service.getAuthenticatedTrades();
}

public Observable<BitfinexWebSocketAuthBalance> getRawAuthenticatedBalances() {
return service.getAuthenticatedBalances();
}
}
Loading