Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PART-6] Add transaction tax report #385

Merged
29 changes: 25 additions & 4 deletions workers/loc.api/bfx.api.router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
const BaseBfxApiRouter = require(
'bfx-report/workers/loc.api/bfx.api.router'
)
const Interrupter = require(
'bfx-report/workers/loc.api/interrupter'
)

const RateLimitChecker = require('./rate.limit.checker')

Expand Down Expand Up @@ -62,7 +65,7 @@ class BfxApiRouter extends BaseBfxApiRouter {
/**
* @override
*/
route (methodName, method) {
route (methodName, method, interrupter) {
if (
!methodName ||
methodName.startsWith('_')
Expand All @@ -83,9 +86,27 @@ class BfxApiRouter extends BaseBfxApiRouter {

if (rateLimitChecker.check()) {
// Cool down delay
return new Promise((resolve) => (
setTimeout(resolve, this._coolDownDelayMs))
).then(() => {
return new Promise((resolve) => {
const onceInterruptHandler = () => {
clearTimeout(timeout)
resolve()
}
const setTimeoutHandler = () => {
if (interrupter instanceof Interrupter) {
interrupter.offInterrupt(onceInterruptHandler)
}

resolve()
}

const timeout = setTimeout(setTimeoutHandler, this._coolDownDelayMs)

if (!(interrupter instanceof Interrupter)) {
return
}

interrupter.onceInterrupt(onceInterruptHandler)
}).then(() => {
rateLimitChecker.add()

return method()
Expand Down
5 changes: 4 additions & 1 deletion workers/loc.api/di/app.deps.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ const {
syncFactory,
processMessageManagerFactory,
syncUserStepDataFactory,
wsEventEmitterFactory
wsEventEmitterFactory,
interrupterFactory
} = require('./factories')
const Crypto = require('../sync/crypto')
const Authenticator = require('../sync/authenticator')
Expand Down Expand Up @@ -343,6 +344,8 @@ module.exports = ({
bind(TYPES.SyncInterrupter)
.to(SyncInterrupter)
.inSingletonScope()
bind(TYPES.InterrupterFactory)
.toFactory(interrupterFactory)
bind(TYPES.Movements)
.to(Movements)
bind(TYPES.WinLossVSAccountBalance)
Expand Down
4 changes: 3 additions & 1 deletion workers/loc.api/di/factories/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const syncFactory = require('./sync-factory')
const processMessageManagerFactory = require('./process-message-manager-factory')
const syncUserStepDataFactory = require('./sync-user-step-data-factory')
const wsEventEmitterFactory = require('./ws-event-emitter-factory')
const interrupterFactory = require('./interrupter-factory')

module.exports = {
migrationsFactory,
Expand All @@ -15,5 +16,6 @@ module.exports = {
syncFactory,
processMessageManagerFactory,
syncUserStepDataFactory,
wsEventEmitterFactory
wsEventEmitterFactory,
interrupterFactory
}
29 changes: 29 additions & 0 deletions workers/loc.api/di/factories/interrupter-factory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict'

const {
AuthError
} = require('bfx-report/workers/loc.api/errors')

const TYPES = require('../types')

module.exports = (ctx) => {
const authenticator = ctx.container.get(TYPES.Authenticator)

return (params) => {
const { user, name } = params ?? {}

if (!user) {
throw new AuthError()
}

const interrupter = ctx.container.get(
TYPES.Interrupter
).setName(name)

authenticator.setInterrupterToUserSession(
user, interrupter
)

return interrupter
}
}
3 changes: 2 additions & 1 deletion workers/loc.api/di/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ module.exports = {
SyncUserStepDataFactory: Symbol.for('SyncUserStepDataFactory'),
HTTPRequest: Symbol.for('HTTPRequest'),
SummaryByAsset: Symbol.for('SummaryByAsset'),
TransactionTaxReport: Symbol.for('TransactionTaxReport')
TransactionTaxReport: Symbol.for('TransactionTaxReport'),
InterrupterFactory: Symbol.for('InterrupterFactory')
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ module.exports = (
const res = await getDataFromApi({
getData: rService[name].bind(rService),
args,
callerName: 'REPORT_FILE_WRITER'
callerName: 'REPORT_FILE_WRITER',
shouldNotInterrupt: true
})

const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ module.exports = (
const res = await getDataFromApi({
getData: rService[name].bind(rService),
args,
callerName: 'REPORT_FILE_WRITER'
callerName: 'REPORT_FILE_WRITER',
shouldNotInterrupt: true
})
const {
timestamps,
Expand Down
17 changes: 17 additions & 0 deletions workers/loc.api/helpers/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ const paramsSchemaForUpdateSubAccount = {
}
}

const paramsSchemaForInterruptOperations = {
type: 'object',
properties: {
names: {
type: 'array',
minItems: 1,
items: {
type: 'string',
enum: [
'TRX_TAX_REPORT_INTERRUPTER'
]
}
}
}
}

const paramsSchemaForCandlesApi = {
...cloneDeep(baseParamsSchemaForCandlesApi),
properties: {
Expand Down Expand Up @@ -482,6 +498,7 @@ module.exports = {
paramsSchemaForEditCandlesConf,
paramsSchemaForCreateSubAccount,
paramsSchemaForUpdateSubAccount,
paramsSchemaForInterruptOperations,
paramsSchemaForBalanceHistoryApi,
paramsSchemaForWinLossApi,
paramsSchemaForWinLossVSAccountBalanceApi,
Expand Down
18 changes: 15 additions & 3 deletions workers/loc.api/service.report.framework.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,9 @@ class FrameworkReportService extends ReportService {

getPlatformStatus (space, args, cb) {
return this._responder(async () => {
const rest = this._getREST({})
const rest = this._getREST({}, {
interrupter: args?.interrupter
})

const res = await rest.status()
const isMaintenance = !Array.isArray(res) || !res[0]
Expand Down Expand Up @@ -463,6 +465,14 @@ class FrameworkReportService extends ReportService {
}, 'stopSyncNow', args, cb)
}

interruptOperations (space, args = {}, cb) {
return this._privResponder(() => {
checkParams(args, 'paramsSchemaForInterruptOperations', ['names'])

return this._authenticator.interruptOperations(args)
}, 'interruptOperations', args, cb)
}

getPublicTradesConf (space, args = {}, cb) {
return this._privResponder(() => {
return this._publicCollsConfAccessors
Expand Down Expand Up @@ -720,7 +730,8 @@ class FrameworkReportService extends ReportService {
(args) => this._getDataFromApi({
getData: (space, args) => super.getActivePositions(space, args),
args,
callerName: 'ACTIVE_POSITIONS_GETTER'
callerName: 'ACTIVE_POSITIONS_GETTER',
shouldNotInterrupt: true
}),
args,
{
Expand Down Expand Up @@ -760,7 +771,8 @@ class FrameworkReportService extends ReportService {
return super.getPositionsAudit(space, args)
},
args,
callerName: 'POSITIONS_AUDIT_GETTER'
callerName: 'POSITIONS_AUDIT_GETTER',
shouldNotInterrupt: true
}),
args,
{
Expand Down
102 changes: 99 additions & 3 deletions workers/loc.api/sync/authenticator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const {
isENetError,
isAuthError
} = require('bfx-report/workers/loc.api/helpers')
const Interrupter = require(
'bfx-report/workers/loc.api/interrupter'
)

const { serializeVal } = require('../dao/helpers')
const {
Expand Down Expand Up @@ -501,6 +504,7 @@ class Authenticator {
throw new AuthError()
}

await this._processInterrupters(user)
this.removeUserSession({
email,
isSubAccount,
Expand Down Expand Up @@ -750,7 +754,7 @@ class Authenticator {
) {
const session = this.getUserSessionByToken(
token,
isReturnedPassword
{ isReturnedPassword }
)
const { authToken, apiKey, apiSecret } = session ?? {}

Expand Down Expand Up @@ -1105,6 +1109,40 @@ class Authenticator {
return true
}

setInterrupterToUserSession (user, interrupter) {
const userSession = this.getUserSessionByEmail(
user,
{ shouldOrigObjRefBeReturned: true }
)

if (!userSession) {
throw new AuthError()
}

userSession.interrupters = userSession.interrupters instanceof Set
? userSession.interrupters
: new Set()

if (!(interrupter instanceof Interrupter)) {
return
}

interrupter.onceInterrupted(() => {
userSession.interrupters.delete(interrupter)
})

userSession.interrupters.add(interrupter)
}

async interruptOperations (args) {
await this._processInterrupters(
args?.auth,
args?.params?.names
)

return true
}

setUserSession (user) {
const {
token,
Expand Down Expand Up @@ -1135,17 +1173,35 @@ class Authenticator {
this.userTokenMapByEmail.set(tokenKey, token)
}

getUserSessionByToken (token, isReturnedPassword) {
getUserSessionByToken (token, opts) {
const {
isReturnedPassword,
shouldOrigObjRefBeReturned
} = opts ?? {}

const session = this.userSessions.get(token)

if (shouldOrigObjRefBeReturned) {
return session
}

return pickSessionProps(session, isReturnedPassword)
}

getUserSessionByEmail (user, isReturnedPassword) {
getUserSessionByEmail (user, opts) {
const {
isReturnedPassword,
shouldOrigObjRefBeReturned
} = opts ?? {}

const tokenKey = this._getTokenKeyByEmailField(user)
const token = this.userTokenMapByEmail.get(tokenKey)
const session = this.userSessions.get(token)

if (shouldOrigObjRefBeReturned) {
return session
}

return pickSessionProps(session, isReturnedPassword)
}

Expand Down Expand Up @@ -1518,6 +1574,46 @@ class Authenticator {
)
)
}

async _processInterrupters (user, names) {
const _names = Array.isArray(names)
? names
: [names]
const filteredName = _names.filter((name) => (
name &&
typeof name === 'string'
))
const userSession = this.getUserSessionByEmail(
user,
{ shouldOrigObjRefBeReturned: true }
)

if (
!(userSession?.interrupters instanceof Set) ||
userSession.interrupters.size === 0
) {
return []
}

const promises = []
const interrupters = [...userSession.interrupters]
.filter((int) => (
filteredName.length === 0 ||
filteredName.some((name) => name === int.name)
))

for (const interrupter of interrupters) {
userSession.interrupters.delete(interrupter)

if (!(interrupter instanceof Interrupter)) {
continue
}

promises.push(interrupter.interrupt())
}

return await Promise.all(promises)
}
}

decorateInjectable(Authenticator, depsTypes)
Expand Down
3 changes: 2 additions & 1 deletion workers/loc.api/sync/sub.account.api.data/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ class SubAccountApiData {
const res = await this.getDataFromApi({
getData: (space, args) => method(args),
args,
callerName: 'SUB_ACCOUNT_API_DATA'
callerName: 'SUB_ACCOUNT_API_DATA',
shouldNotInterrupt: true
})

resArr.push(res)
Expand Down
Loading