Skip to content

Commit

Permalink
feat(recovery): improve Coinbase trade recovery and added recover a…
Browse files Browse the repository at this point in the history
…ction

- Added `recover` action for recovering missing trades within a specified range using CLI through pm2.
- Enhanced `getMissingTrades` in `Coinbase` to recover trades faster.
- Implemented `clearRange` in `FilesStorage` to handle the removal of trades within a specified time range.
  • Loading branch information
Tucsky committed Dec 20, 2024
1 parent 85450f0 commit 396737b
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 80 deletions.
53 changes: 53 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const Server = require('./src/server')
const alertService = require('./src/services/alert')
const { saveConnections } = require('./src/services/connections')
const socketService = require('./src/services/socket')
const FilesStorage = require('./src/storage/files')

/* Load available exchanges
*/
Expand Down Expand Up @@ -104,6 +105,58 @@ if (process.env.pmx) {
reply(`FAILED to disconnect ${markets} (${err.message})`)
})
})

tx2.action('recover', async function (params, reply) {
if (!params || params.split(' ').length < 3) {
reply('Invalid parameters. Expected format: <exchange:pair> <from> <to>')
return
}

const [market, from, to] = params.split(' ')
const [id, pair] = market.match(/([^:]*):(.*)/).slice(1, 3)
const exchange = server.exchanges.find(
e => e?.id === id
)

if (!exchange) {
reply(`Unknown exchange ${id}`)
return
}

if (typeof exchange?.getMissingTrades === 'function') {
try {
const range = {
pair,
from: +new Date(from),
to: +new Date(to)
}

/**
* @type {FilesStorage}
*/
const fileStorage = server.storages.find(s => s.constructor.name === 'FilesStorage')

if (fileStorage) {
fileStorage.clearRange(market, range.from, range.to)
}

const recoveredCount = await exchange.getMissingTrades(range)

if (recoveredCount) {
reply(`${recoveredCount} trades recovered`)
} else {
reply(`no trade were recovered`)
}
} catch (error) {
const message = `[${id}.recoverTrades] something went wrong while recovering ${pair}'s missing trades`
console.error(message, error.message)
reply(message, error.message)
}
} else {
reply(`Can't getMissingTrades on ${id}`)
}
})

tx2.action('triggeralert', function (user, reply) {
// offline webpush testing
try {
Expand Down
27 changes: 14 additions & 13 deletions src/exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,23 @@ class Exchange extends EventEmitter {
return this.promisesOfApiReconnections[api.id]
}

registerRangeForRecovery(range) {
console.log(
`[${this.id}.registerRangeForRecovery] register range (${range.pair}: ${new Date(+range.from).toISOString()}, ${new Date(+range.to).toISOString()}, ${getHms(range.from - range.to)}, ${range.missEstimate} estimated miss)`
)

this.recoveryRanges.push(range)

if (!recovering[this.id]) {
this.recoverNextRange()
}
}

/**
* Register a range for async recovery
* @param {Connection} connection to recover
*/
registerRangeForRecovery(connection) {
recoverSinceLastTrade(connection) {
if (!connection.timestamp) {
return
}
Expand All @@ -535,18 +547,7 @@ class Exchange extends EventEmitter {
missEstimate: connection.lastConnectionMissEstimate
}

console.log(
`[${this.id}.registerRangeForRecovery] register range (${range.pair}: ${new Date(+range.from).toISOString()}, ${new Date(+range.to).toISOString()}, ${getHms(range.from - range.to)}, ${range.missEstimate} estimated miss)`
)

this.recoveryRanges.push(range)

if (!recovering[this.id]) {
console.log(
`[${this.id}.registerRangeForRecovery] exchange isn't recovering yet -> start recovering`
)
this.recoverNextRange()
}
this.registerRangeForRecovery(range)
}

async recoverNextRange(sequencial) {
Expand Down
158 changes: 92 additions & 66 deletions src/exchanges/coinbase.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,77 +142,103 @@ class Coinbase extends Exchange {
side: trade.side === 'buy' || trade.side === 'BUY' ? 'sell' : 'buy'
}
}

async getMissingTrades(range, totalRecovered = 0) {
const endpoint = `https://api.coinbase.com/api/v3/brokerage/market/products/${
range.pair
}/ticker?limit=100&end=${Math.round(range.to / 1000)}&start=${Math.round(
range.from / 1000
)}`

if (+new Date() - range.to < 10000) {
// coinbase api lags a lot
// wait 10s before fetching initial results
await sleep(10000)
let endpoint
if (!range.earliestTradeId) {
endpoint = `https://api.coinbase.com/api/v3/brokerage/market/products/${
range.pair
}/ticker?limit=100&end=${Math.round(range.to / 1000)}&start=${Math.round(
range.from / 1000
)}`

// If close to current time, wait to allow trades to accumulate
if (+new Date() - range.to < 10000) {
await sleep(10000)
}
} else {
endpoint = `https://api.exchange.coinbase.com/products/${range.pair}/trades?limit=1000&after=${range.earliestTradeId}`
}

return axios
.get(endpoint)
.then(response => {
return response.data.trades
})
.then(data => {
if (data.length) {
const earliestTradeId = data[data.length - 1].trade_id
const earliestTradeTime = +new Date(data[data.length - 1].time)

const trades = data
.map(trade => this.formatTrade(trade, range.pair))
.filter(
a => a.timestamp >= range.from + 1 && a.timestamp < range.to
)

if (trades.length) {
this.emitTrades(null, trades)

totalRecovered += trades.length
range.to = trades[trades.length - 1].timestamp
}

const remainingMissingTime = range.to - range.from

if (
trades.length &&
remainingMissingTime > 1000 &&
earliestTradeTime >= range.from
) {
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} ... but theres more (${getHms(remainingMissingTime)} remaining)`
)
return this.waitBeforeContinueRecovery().then(() =>
this.getMissingTrades(range, totalRecovered)
)
} else {
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} (${getHms(remainingMissingTime)} remaining)`
)
}
}

return totalRecovered
})
.catch(err => {
console.error(
`[${this.id}] failed to get missing trades on ${range.pair}`,
err.message
try {
const response = await axios.get(endpoint)
const rawData = Array.isArray(response.data)
? response.data
: response.data.trades || []
if (!rawData.length) {
console.log(
`[${this.id}.recoverMissingTrades] no more trades for ${range.pair}`
)

return totalRecovered
})
}

const trades = rawData
.map(trade => this.formatTrade(trade, range.pair))
.filter(a => a.timestamp >= range.from + 1 && a.timestamp < range.to)

if (trades.length) {
this.emitTrades(null, trades)
totalRecovered += trades.length
range.to = trades[trades.length - 1].timestamp
}

const remainingMissingTime = range.to - range.from

const earliestRawTrade = rawData[rawData.length - 1]
const earliestTradeTime = +new Date(earliestRawTrade.time)

if (!range.earliestTradeId) {
if (
trades.length &&
remainingMissingTime > 1000 &&
earliestTradeTime >= range.from
) {
range.earliestTradeId = parseInt(earliestRawTrade.trade_id, 10)
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} ... switching to Exchange API (${getHms(
remainingMissingTime
)} remaining)`
)
await this.waitBeforeContinueRecovery()
return this.getMissingTrades(range, totalRecovered)
} else {
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} (${getHms(remainingMissingTime)} remaining)`
)
return totalRecovered
}
}

if (range.earliestTradeId) {
if (remainingMissingTime > 1000 && earliestTradeTime >= range.from) {
range.earliestTradeId = earliestRawTrade.trade_id
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} ... but there's more (${getHms(remainingMissingTime)} remaining)`
)
await this.waitBeforeContinueRecovery()
return this.getMissingTrades(range, totalRecovered)
} else {
// No more needed or no more available
console.log(
`[${this.id}.recoverMissingTrades] +${trades.length} ${
range.pair
} (${getHms(remainingMissingTime)} remaining)`
)
return totalRecovered
}
}
} catch (err) {
console.error(
`[${this.id}] failed to get missing trades on ${range.pair}`,
err.message
)
return totalRecovered
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class Server extends EventEmitter {
registerConnection(id, exchange.id, pair, apiLength)

if (typeof exchange.getMissingTrades === 'function') {
exchange.registerRangeForRecovery(connections[id])
exchange.recoverSinceLastTrade(connections[id])
}

let lastPing = ''
Expand Down
58 changes: 58 additions & 0 deletions src/storage/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,64 @@ class FilesStorage {
})
}

async clearRange(identifier, from, to) {
const startInterval =
Math.floor(from / config.filesInterval) * config.filesInterval
const endInterval =
Math.floor(to / config.filesInterval) * config.filesInterval

for (
let ts = startInterval;
ts <= endInterval;
ts += config.filesInterval
) {
const basePath = this.getBackupFilename(identifier, new Date(ts))

let stat = await this.statFile(basePath)
let finalPath = basePath
if (!stat) {
// Try gz
stat = await this.statFile(basePath + '.gz')
if (stat) {
finalPath = basePath + '.gz'
} else {
// No file at all, skip
continue
}
}

let fileContent = await this.readFile(finalPath)
if (!fileContent) {
continue
}

let lines = fileContent.split('\n')
if (lines[lines.length - 1] === '') {
// Remove trailing empty line if any
lines.pop()
}

// Parse and sort by timestamp (just in case, though they are likely already sorted)
// Format: "timestamp price volume side"
lines = lines.map(line => line.trim()).filter(line => line !== '')
lines.sort((a, b) => {
const aTs = Number(a.split(' ')[0])
const bTs = Number(b.split(' ')[0])
return aTs - bTs
})

// Filter out trades that intersect with [from, to]
lines = lines.filter(line => {
const tradeTs = Number(line.split(' ')[0])
return tradeTs < from || tradeTs > to
})

// Write the updated content back to the file
const newContent = lines.length ? lines.join('\n') + '\n' : ''
await this.writeFile(finalPath, newContent)
}
}

fetch() {
// unsupported
console.error(
Expand Down

0 comments on commit 396737b

Please sign in to comment.