Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix opened positions consideration in balances and win/loss reports #420

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions workers/loc.api/sync/balance.history/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,12 @@ class BalanceHistory {
}

const _mts = mtsMoment.valueOf() - 1
const symbSeparator = symb.length > 3
? ':'
: ''

const price = this.currencyConverter.getPriceFromData(
symb,
`t${symb}${symbSeparator}USD`,
_mts,
{ candles, currenciesSynonymous }
)
Expand Down Expand Up @@ -312,7 +315,7 @@ class BalanceHistory {
candles,
mts,
timeframe,
`t${currency}USD`,
currency,
currenciesSynonymous
)

Expand Down Expand Up @@ -379,7 +382,7 @@ class BalanceHistory {
candles,
mts,
timeframe,
`t${symb}USD`,
symb,
currenciesSynonymous
)

Expand Down
32 changes: 32 additions & 0 deletions workers/loc.api/sync/dao/dao.better.sqlite.js
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,38 @@ class BetterSqliteDAO extends DAO {
params
}, { withWorkerThreads: true })
}

/**
* @override
*/
getActivePositionsAtStart (args) {
const {
userId,
start = 0
} = args ?? {}

if (!Number.isInteger(userId)) {
throw new AuthError()
}

const _sort = getOrderQuery([['mtsUpdate', -1], ['id', -1]])
const params = { user_id: userId, mtsUpdate: start }

const sql = `\
SELECT *, max(mtsUpdate) FROM ${this.TABLES_NAMES.POSITIONS_SNAPSHOT}
WHERE user_id = $user_id AND mtsUpdate <= $mtsUpdate AND id NOT IN (
SELECT id FROM ${this.TABLES_NAMES.POSITIONS_HISTORY}
WHERE user_id = $user_id AND mtsUpdate <= $mtsUpdate
)
GROUP BY id
${_sort}`

return this.query({
action: MAIN_DB_WORKER_ACTIONS.ALL,
sql,
params
}, { withWorkerThreads: true })
}
}

decorateInjectable(BetterSqliteDAO, depsTypes)
Expand Down
5 changes: 5 additions & 0 deletions workers/loc.api/sync/dao/dao.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ class DAO {
* @abstract
*/
async getLastFinishedSyncQueueJob () { throw new ImplementationError() }

/**
* @abstract
*/
async getActivePositionsAtStart () { throw new ImplementationError() }
}

decorateInjectable(DAO)
Expand Down
168 changes: 119 additions & 49 deletions workers/loc.api/sync/positions.snapshot/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const {
} = require('bfx-report/workers/loc.api/helpers')
const {
groupByTimeframe,
getStartMtsByTimeframe
getMtsGroupedByTimeframe,
calcGroupedData
} = require('../helpers')

const { decorateInjectable } = require('../../di/utils')
Expand Down Expand Up @@ -557,46 +558,57 @@ class PositionsSnapshot {
}
}

_calcPlFromPositionsSnapshots (positionsHistory) {
return (
positionsSnapshots = [],
args = {}
) => {
const { mts, timeframe } = args

// Need to filter duplicate and closed positions as it can be for
// week and month and year timeframe in daily positions snapshots
// if daily timeframe no need to filter it
const positions = this._filterPositionsSnapshots(
_getPLByTimeframe (activePositionsAtStart) {
let prevActivePositions = activePositionsAtStart ?? []

return ({
positionsHistoryGroupedByTimeframe = {},
plGroupedByTimeframe = {},
mtsGroupedByTimeframe: { mts } = {}
} = {}) => {
const positionsSnapshots = [
...plGroupedByTimeframe?.res ?? [],
...prevActivePositions
]
prevActivePositions = this._filterPositionsSnapshots(
positionsSnapshots,
positionsHistory,
timeframe,
positionsHistoryGroupedByTimeframe?.res ?? [],
mts
)

return positions.reduce((accum, curr) => {
const { plUsd } = { ...curr }
const symb = 'USD'
const accumPLUsd = prevActivePositions.reduce((accum, curr) => {
const { plUsd } = curr ?? {}

if (!Number.isFinite(plUsd)) {
return accum
}

return {
...accum,
[symb]: Number.isFinite(accum[symb])
? accum[symb] + plUsd
: plUsd
}
accum.USD = Number.isFinite(accum.USD)
? accum.USD + plUsd
: plUsd

return accum
}, {})

return accumPLUsd
}
}

_aggregatePositionsSnapshots () {
return (data = []) => data.reduce((accum, curr = {}) => {
if (!Array.isArray(accum.res)) {
accum.res = []
}

accum.res.push(curr)

return accum
}, {})
}

_filterPositionsSnapshots (
positionsSnapshots,
positionsHistory,
timeframe,
mts
positionsHistory
) {
if (
!Array.isArray(positionsSnapshots) ||
Expand All @@ -609,10 +621,7 @@ class PositionsSnapshot {
if (
Number.isFinite(position?.id) &&
accum.every((item) => item?.id !== position?.id) &&
(
timeframe === 'day' ||
!this._isClosedPosition(positionsHistory, mts, position?.id)
)
!this._isClosedPosition(positionsHistory, position?.id)
) {
accum.push(position)
}
Expand All @@ -621,17 +630,51 @@ class PositionsSnapshot {
}, [])
}

_isClosedPosition (positionsHistory, mts, id) {
_isClosedPosition (positionsHistory, id) {
return (
Array.isArray(positionsHistory) &&
positionsHistory.length > 0 &&
positionsHistory.some((item) => (
item.id === id &&
item.mts === mts
item.id === id
))
)
}

async _getActivePositionsAtStart (args) {
const user = args?.auth ?? {}
const start = args?.params?.start

const emptyRes = []

if (
!Number.isFinite(start) ||
start <= 0
) {
return emptyRes
}

const activePositionsSnapshot = await this.dao.getActivePositionsAtStart({
userId: user._id, start
})

if (
!Array.isArray(activePositionsSnapshot) ||
activePositionsSnapshot.length === 0
) {
return emptyRes
}

const {
positionsSnapshot
} = await this._getCalculatedPositions(
activePositionsSnapshot,
null,
{ isNotTickersRequired: true }
)

return positionsSnapshot
}

async getPLSnapshot ({
auth = {},
params = {}
Expand Down Expand Up @@ -669,36 +712,63 @@ class PositionsSnapshot {
isExcludePrivate: true
}
)
const activePositionsAtStartPromise = this
._getActivePositionsAtStart(args)

const [
dailyPositionsSnapshots,
positionsHistory
positionsHistory,
activePositionsAtStart
] = await Promise.all([
dailyPositionsSnapshotsPromise,
positionsHistoryPromise
positionsHistoryPromise,
activePositionsAtStartPromise
])

const positionsHistoryNormByMts = positionsHistory.map((pos) => {
if (Number.isFinite(pos?.mtsUpdate)) {
pos.mts = getStartMtsByTimeframe(
pos.mtsUpdate,
timeframe
)
}

return pos
})

const plGroupedByTimeframePromise = await groupByTimeframe(
const positionsHistoryGroupedByTimeframePromise = groupByTimeframe(
positionsHistory,
{ timeframe, start, end },
this.FOREX_SYMBS,
'mtsUpdate',
this.positionsSnapshotSymbolFieldName,
this._aggregatePositionsSnapshots()
)
const plGroupedByTimeframePromise = groupByTimeframe(
dailyPositionsSnapshots,
{ timeframe, start, end },
this.FOREX_SYMBS,
'mtsUpdate',
this.positionsSnapshotSymbolFieldName,
this._calcPlFromPositionsSnapshots(positionsHistoryNormByMts)
this._aggregatePositionsSnapshots()
)

const [
positionsHistoryGroupedByTimeframe,
plGroupedByTimeframe
] = await Promise.all([
positionsHistoryGroupedByTimeframePromise,
plGroupedByTimeframePromise
])

const mtsGroupedByTimeframe = getMtsGroupedByTimeframe(
start,
end,
timeframe,
true
)

const res = await calcGroupedData(
{
positionsHistoryGroupedByTimeframe,
plGroupedByTimeframe,
mtsGroupedByTimeframe
},
true,
this._getPLByTimeframe(activePositionsAtStart),
true
)

return plGroupedByTimeframePromise
return res
}

async getSyncedPositionsSnapshot (args) {
Expand Down