From c616b1bd41ff92919e5bd3eb5354a79d16b6e055 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Wed, 15 Nov 2023 15:06:35 +0200 Subject: [PATCH 01/16] Add ability to sync candles once per day --- .../factories/sync-user-step-data-factory.js | 1 + .../sync/data.inserter/data.checker/index.js | 23 +++++++++++++++---- .../sync.user.step.manager/index.js | 9 +++++--- .../sync.user.step.data.js | 8 +++++++ 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/workers/loc.api/di/factories/sync-user-step-data-factory.js b/workers/loc.api/di/factories/sync-user-step-data-factory.js index 9ca198666..99cdd3a8d 100644 --- a/workers/loc.api/di/factories/sync-user-step-data-factory.js +++ b/workers/loc.api/di/factories/sync-user-step-data-factory.js @@ -9,6 +9,7 @@ module.exports = (ctx) => { ) const paramsOrder = [ + 'syncedAt', 'symbol', 'timeframe', 'baseStart', diff --git a/workers/loc.api/sync/data.inserter/data.checker/index.js b/workers/loc.api/sync/data.inserter/data.checker/index.js index 7b099dafc..7c4d51fb6 100644 --- a/workers/loc.api/sync/data.inserter/data.checker/index.js +++ b/workers/loc.api/sync/data.inserter/data.checker/index.js @@ -407,7 +407,8 @@ class DataChecker { ) const shouldFreshSyncBeAdded = this._shouldFreshSyncBeAdded( syncUserStepData, - currMts + currMts, + { dayOfYear: 1 } ) if ( @@ -502,7 +503,8 @@ class DataChecker { ) { const { measure = 'minutes', - allowedTimeDiff = 60 + allowedTimeDiff = 60, + dayOfYear } = allowedDiff ?? {} const baseEnd = ( @@ -517,15 +519,28 @@ class DataChecker { ) ? syncUserStepData.currEnd : 0 + const syncedAt = syncUserStepData.hasSyncedAt + ? syncUserStepData.syncedAt + : 0 const momentBaseEnd = moment.utc(baseEnd) const momentCurrEnd = moment.utc(currEnd) const momentCurrMts = moment.utc(currMts) + const momentSyncedAt = moment.utc(syncedAt) const momentMaxEnd = moment.max(momentBaseEnd, momentCurrEnd) const momentDiff = momentCurrMts.diff(momentMaxEnd, measure) - - return momentDiff > allowedTimeDiff + const yearsDiff = momentCurrMts.year() - momentSyncedAt.year() + const dayOfYearDiff = momentCurrMts.dayOfYear() - momentSyncedAt.dayOfYear() + + return ( + momentDiff > allowedTimeDiff && + ( + !Number.isFinite(dayOfYear) || + yearsDiff >= 1 || + dayOfYearDiff >= dayOfYear + ) + ) } _wasStartPointChanged ( diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js index 2e060e6bb..44ac032b1 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js @@ -326,7 +326,8 @@ class SyncUserStepManager { currStart, currEnd, isBaseStepReady = false, - isCurrStepReady = false + isCurrStepReady = false, + syncedAt } = syncUserStepInfo ?? {} const baseStart = this._getMinStart(_baseStart) @@ -348,7 +349,8 @@ class SyncUserStepManager { baseEnd: baseEnd ?? currMts, isBaseStepReady, symbol, - timeframe + timeframe, + syncedAt }) return { @@ -365,7 +367,8 @@ class SyncUserStepManager { isBaseStepReady, isCurrStepReady, symbol, - timeframe + timeframe, + syncedAt }) if (!isCurrStepReady) { diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js index c3cea5630..91ffafa58 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js @@ -9,6 +9,7 @@ const { decorateInjectable } = require('../../../di/utils') */ class SyncUserStepData { constructor () { + this.syncedAt = null this.symbol = null this.timeframe = null this.baseStart = null @@ -23,6 +24,7 @@ class SyncUserStepData { } /** + * @param {?number} [syncedAt] - Used to specify synced mts point * @param {?string} [symbol] - Used to specify synced symbol, can be `_ALL` if all ones are syncing * @param {?string} [timeframe] - Used to specify synced timeframe, eg. for candles collection * @param {?number} [baseStart] - Used to specify base start mts point to continue first sync @@ -34,6 +36,7 @@ class SyncUserStepData { */ setParams (params = {}) { const { + syncedAt = this.syncedAt, symbol = this.symbol, timeframe = this.timeframe, baseStart = this.baseStart, @@ -44,6 +47,7 @@ class SyncUserStepData { isCurrStepReady = this.isCurrStepReady } = params ?? {} + this.syncedAt = syncedAt this.symbol = symbol this.timeframe = timeframe this.baseStart = baseStart @@ -117,6 +121,10 @@ class SyncUserStepData { get hasCurrStart () { return Number.isInteger(this.currStart) } + + get hasSyncedAt () { + return Number.isInteger(this.syncedAt) + } } decorateInjectable(SyncUserStepData) From 3291c401404494afa8674ccd31dabf5bff66ae9d Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Mon, 20 Nov 2023 14:04:45 +0200 Subject: [PATCH 02/16] Add debug logger to currency converter --- workers/loc.api/sync/currency.converter/index.js | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/workers/loc.api/sync/currency.converter/index.js b/workers/loc.api/sync/currency.converter/index.js index 22a138c89..1db45f31e 100644 --- a/workers/loc.api/sync/currency.converter/index.js +++ b/workers/loc.api/sync/currency.converter/index.js @@ -30,7 +30,8 @@ const depsTypes = (TYPES) => [ TYPES.SyncSchema, TYPES.FOREX_SYMBS, TYPES.ALLOWED_COLLS, - TYPES.SYNC_API_METHODS + TYPES.SYNC_API_METHODS, + TYPES.Logger ] class CurrencyConverter { constructor ( @@ -40,7 +41,8 @@ class CurrencyConverter { syncSchema, FOREX_SYMBS, ALLOWED_COLLS, - SYNC_API_METHODS + SYNC_API_METHODS, + logger ) { this.rService = rService this.getDataFromApi = getDataFromApi @@ -49,6 +51,7 @@ class CurrencyConverter { this.FOREX_SYMBS = FOREX_SYMBS this.ALLOWED_COLLS = ALLOWED_COLLS this.SYNC_API_METHODS = SYNC_API_METHODS + this.logger = logger this._COLL_NAMES = { PUBLIC_TRADES: 'publicTrades', @@ -114,6 +117,8 @@ class CurrencyConverter { return this.currenciesSynonymous } } catch (err) { + this.logger.debug(err) + return this.currenciesSynonymous } } From ad9eb77e20d81bb105fed3565f83b289d74deac7 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Mon, 20 Nov 2023 14:07:50 +0200 Subject: [PATCH 03/16] Set session auth for checking new pub data --- workers/loc.api/sync/data.inserter/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/loc.api/sync/data.inserter/index.js b/workers/loc.api/sync/data.inserter/index.js index 04b35617b..ba52c6835 100644 --- a/workers/loc.api/sync/data.inserter/index.js +++ b/workers/loc.api/sync/data.inserter/index.js @@ -284,7 +284,7 @@ class DataInserter extends EventEmitter { await this.wsEventEmitter .emitSyncingStep('CHECKING_NEW_PUBLIC_DATA') const methodCollMap = await this.dataChecker - .checkNewPublicData() + .checkNewPublicData(this._sessionAuth) await this.syncTempTablesManager .createTempDBStructureForCurrSync(methodCollMap) const size = methodCollMap.size From a951597628e67f7e779ec95949247a5fd5a57bcb Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Mon, 20 Nov 2023 14:11:04 +0200 Subject: [PATCH 04/16] Add ability to reference specific user currencies when candles sync --- .../sync/data.inserter/data.checker/index.js | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/data.checker/index.js b/workers/loc.api/sync/data.inserter/data.checker/index.js index 7c4d51fb6..d704b8873 100644 --- a/workers/loc.api/sync/data.inserter/data.checker/index.js +++ b/workers/loc.api/sync/data.inserter/data.checker/index.js @@ -95,14 +95,17 @@ class DataChecker { return filterMethodCollMap(methodCollMap) } - async checkNewPublicData () { + /* + * `sessionAuth` can be empty + */ + async checkNewPublicData (sessionAuth) { const methodCollMap = this._getMethodCollMap() if (this._isInterrupted) { return filterMethodCollMap(methodCollMap, true) } - await this._checkNewDataPublicArrObjType(methodCollMap) + await this._checkNewDataPublicArrObjType(sessionAuth, methodCollMap) await this._checkNewPublicUpdatableData(methodCollMap) return filterMethodCollMap(methodCollMap, true) @@ -179,7 +182,7 @@ class DataChecker { schema.start.push(freshSyncUserStepData) } - async _checkNewDataPublicArrObjType (methodCollMap) { + async _checkNewDataPublicArrObjType (sessionAuth, methodCollMap) { for (const [method, schema] of methodCollMap) { if (this._isInterrupted) { return @@ -191,7 +194,11 @@ class DataChecker { this._resetSyncSchemaProps(schema) if (schema.name === this.ALLOWED_COLLS.CANDLES) { - await this._checkNewCandlesData(method, schema) + await this._checkNewCandlesData( + method, + schema, + sessionAuth + ) } if ( schema.name === this.ALLOWED_COLLS.PUBLIC_TRADES || @@ -336,20 +343,24 @@ class DataChecker { */ async _checkNewCandlesData ( method, - schema + schema, + sessionAuth ) { if (this._isInterrupted) { return } const currMts = Date.now() - const firstLedgerMts = await this._getFirstLedgerMts() + const usersFilter = this._getUsersFilter(sessionAuth) + const firstLedgerMts = await this._getFirstLedgerMts(usersFilter) if (!Number.isInteger(firstLedgerMts)) { return } - const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers() + const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers( + usersFilter + ) const candlesPairsSet = new Set() for (const symbol of uniqueSymbsSet) { @@ -568,8 +579,11 @@ class DataChecker { return momentDiff > allowedTimeDiff } - async _getFirstLedgerMts () { - const firstElemFilter = { $not: { currency: 'USD' } } + async _getFirstLedgerMts (usersFilter) { + const firstElemFilter = { + ...usersFilter, + $not: { currency: 'USD' } + } const firstElemOrder = [['mts', 1]] const tempLedgersTableName = SyncTempTablesManager.getTempTableName( @@ -615,9 +629,12 @@ class DataChecker { return mts } - async _getUniqueSymbsFromLedgers () { + async _getUniqueSymbsFromLedgers (usersFilter) { const ledgerParams = { - filter: { $not: { currency: this.FOREX_SYMBS } }, + filter: { + ...usersFilter, + $not: { currency: this.FOREX_SYMBS } + }, isDistinct: true, projection: ['currency'] } @@ -698,6 +715,19 @@ class DataChecker { this._methodCollMap = this.syncSchema .getMethodCollMap(methodCollMap) } + + _getUsersFilter (sessionAuth) { + if ( + !(sessionAuth instanceof Map) || + sessionAuth.size === 0 + ) { + return {} + } + + const userIds = [...sessionAuth.keys()] + + return { $in: { user_id: userIds } } + } } decorateInjectable(DataChecker, depsTypes) From 5e84e98285caa566c809704cafbe6e887ae9f599 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Wed, 22 Nov 2023 15:54:08 +0200 Subject: [PATCH 05/16] Add ability to use SyncUserStepManager with candles considering user --- .../sync.user.step.manager/index.js | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js index 44ac032b1..a6bd5e4cf 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js @@ -211,13 +211,17 @@ class SyncUserStepManager { const defaultStart = this._getMinStart(_defaultStart) const hasUserIdField = ( - typeof model?.user_id === 'string' && Number.isInteger(userId) ) + const hasUserIdFieldInModel = ( + typeof model?.user_id === 'string' + ) const hasSubUserIdField = ( - typeof model?.subUserId === 'string' && Number.isInteger(subUserId) ) + const hasSubUserIdFieldInModel = ( + typeof model?.subUserId === 'string' + ) const hasSymbolField = ( symbolFieldName && typeof model?.[symbolFieldName] === 'string' && @@ -230,13 +234,19 @@ class SyncUserStepManager { timeframe && typeof timeframe === 'string' ) - const shouldCollBePublic = !hasUserIdField + const shouldCollBePublic = ( + !hasUserIdField || + !hasUserIdFieldInModel + ) const shouldTableDataBeFetched = ( dateFieldName && typeof dateFieldName === 'string' ) - if (isPublic(syncSchema?.type) !== shouldCollBePublic) { + if ( + tableName !== this.TABLES_NAMES.CANDLES && + isPublic(syncSchema?.type) !== shouldCollBePublic + ) { throw new LastSyncedInfoGettingError() } @@ -257,8 +267,8 @@ class SyncUserStepManager { : {} const dataFilter = merge( {}, - userIdFilter, - subUserIdFilter, + hasUserIdFieldInModel ? userIdFilter : {}, + subUserIdFilter ? hasSubUserIdFieldInModel : {}, symbolFilter, timeframeFilter ) @@ -267,7 +277,8 @@ class SyncUserStepManager { this.TABLES_NAMES.SYNC_USER_STEPS, { collName, - ...userIdFilter + ...userIdFilter, + ...subUserIdFilter }, [['syncedAt', -1]] ) From cbc9342267e1e12a4f102bb705581b8ad438c282 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Wed, 22 Nov 2023 15:54:39 +0200 Subject: [PATCH 06/16] Add ability to use DataChecker with candles considering user --- .../sync/data.inserter/data.checker/index.js | 91 ++++++++++--------- 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/data.checker/index.js b/workers/loc.api/sync/data.inserter/data.checker/index.js index d704b8873..8d983930a 100644 --- a/workers/loc.api/sync/data.inserter/data.checker/index.js +++ b/workers/loc.api/sync/data.inserter/data.checker/index.js @@ -96,16 +96,16 @@ class DataChecker { } /* - * `sessionAuth` can be empty + * `authMap` can be empty */ - async checkNewPublicData (sessionAuth) { + async checkNewPublicData (authMap) { const methodCollMap = this._getMethodCollMap() if (this._isInterrupted) { return filterMethodCollMap(methodCollMap, true) } - await this._checkNewDataPublicArrObjType(sessionAuth, methodCollMap) + await this._checkNewDataPublicArrObjType(authMap, methodCollMap) await this._checkNewPublicUpdatableData(methodCollMap) return filterMethodCollMap(methodCollMap, true) @@ -182,7 +182,7 @@ class DataChecker { schema.start.push(freshSyncUserStepData) } - async _checkNewDataPublicArrObjType (sessionAuth, methodCollMap) { + async _checkNewDataPublicArrObjType (authMap, methodCollMap) { for (const [method, schema] of methodCollMap) { if (this._isInterrupted) { return @@ -194,11 +194,21 @@ class DataChecker { this._resetSyncSchemaProps(schema) if (schema.name === this.ALLOWED_COLLS.CANDLES) { - await this._checkNewCandlesData( - method, - schema, - sessionAuth + // If `authMap` is empty sync candles for all users + const _authMap = ( + !(authMap instanceof Map) || + authMap.size === 0 ) + ? new Map([['ALL', {}]]) + : authMap + + for (const authItem of _authMap) { + await this._checkNewCandlesData( + method, + schema, + authItem[1] + ) + } } if ( schema.name === this.ALLOWED_COLLS.PUBLIC_TRADES || @@ -344,14 +354,19 @@ class DataChecker { async _checkNewCandlesData ( method, schema, - sessionAuth + auth ) { if (this._isInterrupted) { return } + const { _id: userId, subUser } = auth ?? {} + const { _id: subUserId } = subUser ?? {} + const usersFilter = Number.isInteger(userId) + ? { $eq: { user_id: userId } } + : {} + const currMts = Date.now() - const usersFilter = this._getUsersFilter(sessionAuth) const firstLedgerMts = await this._getFirstLedgerMts(usersFilter) if (!Number.isInteger(firstLedgerMts)) { @@ -400,7 +415,9 @@ class DataChecker { collName: method, symbol, timeframe: CANDLES_TIMEFRAME, - defaultStart: firstLedgerMts + defaultStart: firstLedgerMts, + userId, + subUserId } ) @@ -409,7 +426,6 @@ class DataChecker { !syncUserStepData.isCurrStepReady ) { schema.hasNewData = true - schema.start.push(syncUserStepData) } const wasStartPointChanged = this._wasStartPointChanged( @@ -422,36 +438,38 @@ class DataChecker { { dayOfYear: 1 } ) - if ( - !wasStartPointChanged && - !shouldFreshSyncBeAdded - ) { - continue - } - - const freshSyncUserStepData = this.syncUserStepDataFactory({ - ...syncUserStepData.getParams(), - isBaseStepReady: true, - isCurrStepReady: true - }) - if (wasStartPointChanged) { - freshSyncUserStepData.setParams({ + syncUserStepData.setParams({ baseStart: firstLedgerMts, - baseEnd: syncUserStepData.baseStart, isBaseStepReady: false }) + + schema.hasNewData = true } if (shouldFreshSyncBeAdded) { - freshSyncUserStepData.setParams({ + syncUserStepData.setParams({ currStart: lastElemMtsFromTables, currEnd: currMts, isCurrStepReady: false }) + + schema.hasNewData = true } - schema.hasNewData = true - schema.start.push(freshSyncUserStepData) + if (!schema.hasNewData) { + continue + } + + // To keep flow: one candles request per currency + if (!syncUserStepData.isBaseStepReady) { + syncUserStepData.setParams({ + baseEnd: syncUserStepData.currEnd, + isCurrStepReady: true + }) + } + + syncUserStepData.auth = auth + schema.start.push(syncUserStepData) } } @@ -715,19 +733,6 @@ class DataChecker { this._methodCollMap = this.syncSchema .getMethodCollMap(methodCollMap) } - - _getUsersFilter (sessionAuth) { - if ( - !(sessionAuth instanceof Map) || - sessionAuth.size === 0 - ) { - return {} - } - - const userIds = [...sessionAuth.keys()] - - return { $in: { user_id: userIds } } - } } decorateInjectable(DataChecker, depsTypes) From 1b99bc36d67023fc985b5c64673a01f425ee5e8a Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Wed, 22 Nov 2023 16:05:00 +0200 Subject: [PATCH 07/16] Add ability to use DataInserter with candles considering user --- workers/loc.api/sync/data.inserter/index.js | 64 ++++++++++++++++----- 1 file changed, 49 insertions(+), 15 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/index.js b/workers/loc.api/sync/data.inserter/index.js index ba52c6835..006b16466 100644 --- a/workers/loc.api/sync/data.inserter/index.js +++ b/workers/loc.api/sync/data.inserter/index.js @@ -284,7 +284,7 @@ class DataInserter extends EventEmitter { await this.wsEventEmitter .emitSyncingStep('CHECKING_NEW_PUBLIC_DATA') const methodCollMap = await this.dataChecker - .checkNewPublicData(this._sessionAuth) + .checkNewPublicData(this._auth) await this.syncTempTablesManager .createTempDBStructureForCurrSync(methodCollMap) const size = methodCollMap.size @@ -417,7 +417,14 @@ class DataInserter extends EventEmitter { return } - const { userId, subUserId } = this._getUserIds(auth) + const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES + const _auth = ( + hasCandlesSection && + syncUserStepData?.auth + ) + ? syncUserStepData.auth + : auth + const { userId, subUserId } = this._getUserIds(_auth) await this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ collName: methodApi, userId, @@ -436,7 +443,6 @@ class DataInserter extends EventEmitter { hasTimeframe, areAllSymbolsRequired } = syncUserStepData - const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES const params = {} @@ -926,19 +932,47 @@ class DataInserter extends EventEmitter { continue } - const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ - collName, - syncedAt, - ...this.syncUserStepManager.wereStepsSynced( - schema.start, - { - shouldNotMtsBeChecked: isUpdatable(schema?.type), - shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES - } - ) - }, { doNotQueueQuery: true }) + const startWithAuth = schema.start + .filter((syncUserStepData) => syncUserStepData?.auth) + const startWithoutAuth = schema.start + .filter((syncUserStepData) => !syncUserStepData?.auth) + + if (startWithAuth.length > 0) { + for (const syncUserStepData of startWithAuth) { + const { userId, subUserId } = this._getUserIds(syncUserStepData.auth) + + const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ + collName, + userId, + subUserId, + syncedAt, + ...this.syncUserStepManager.wereStepsSynced( + [syncUserStepData], + { + shouldNotMtsBeChecked: isUpdatable(schema?.type), + shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES + } + ) + }, { doNotQueueQuery: true }) + + updatesForPubCollsPromises.push(promise) + } + } + if (startWithoutAuth.length > 0) { + const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ + collName, + syncedAt, + ...this.syncUserStepManager.wereStepsSynced( + schema.start, + { + shouldNotMtsBeChecked: isUpdatable(schema?.type), + shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES + } + ) + }, { doNotQueueQuery: true }) - updatesForPubCollsPromises.push(promise) + updatesForPubCollsPromises.push(promise) + } } await Promise.all(updatesForPubCollsPromises) From 2b9e60bc8ef572b3a86aabf01eaf19520d923497 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 15:48:39 +0200 Subject: [PATCH 08/16] Fix base end mts for candles sync --- workers/loc.api/sync/data.inserter/data.checker/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/loc.api/sync/data.inserter/data.checker/index.js b/workers/loc.api/sync/data.inserter/data.checker/index.js index 8d983930a..e6a90cb17 100644 --- a/workers/loc.api/sync/data.inserter/data.checker/index.js +++ b/workers/loc.api/sync/data.inserter/data.checker/index.js @@ -463,7 +463,7 @@ class DataChecker { // To keep flow: one candles request per currency if (!syncUserStepData.isBaseStepReady) { syncUserStepData.setParams({ - baseEnd: syncUserStepData.currEnd, + baseEnd: syncUserStepData.currEnd ?? currMts, isCurrStepReady: true }) } From 7f40fd2d50bd0165ff93e4bb7964217e5a735298 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 15:51:14 +0200 Subject: [PATCH 09/16] Fix user filter for db sync info operations --- .../sync.user.step.manager/index.js | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js index a6bd5e4cf..3aaa7c812 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js @@ -162,14 +162,16 @@ class SyncUserStepManager { const userIdFilter = hasUserIdField ? { $eq: { user_id: userId } } - : {} + : { $isNull: ['user_id'] } const subUserIdFilter = hasSubUserIdField ? { $eq: { subUserId } } - : {} + : { $isNull: ['subUserId'] } const filter = merge( { $eq: { collName } }, - userIdFilter, - subUserIdFilter + this._mergeUserFilters( + userIdFilter, + subUserIdFilter + ) ) const updateRes = await this.dao.updateCollBy( @@ -255,10 +257,10 @@ class SyncUserStepManager { const userIdFilter = hasUserIdField ? { $eq: { user_id: userId } } - : {} + : { $isNull: ['user_id'] } const subUserIdFilter = hasSubUserIdField ? { $eq: { subUserId } } - : {} + : { $isNull: ['subUserId'] } const symbolFilter = hasSymbolField ? { $eq: { [symbolFieldName]: symbol } } : {} @@ -267,8 +269,10 @@ class SyncUserStepManager { : {} const dataFilter = merge( {}, - hasUserIdFieldInModel ? userIdFilter : {}, - subUserIdFilter ? hasSubUserIdFieldInModel : {}, + this._mergeUserFilters( + hasUserIdFieldInModel ? userIdFilter : {}, + hasSubUserIdFieldInModel ? subUserIdFilter : {} + ), symbolFilter, timeframeFilter ) @@ -277,8 +281,10 @@ class SyncUserStepManager { this.TABLES_NAMES.SYNC_USER_STEPS, { collName, - ...userIdFilter, - ...subUserIdFilter + ...this._mergeUserFilters( + userIdFilter, + subUserIdFilter + ) }, [['syncedAt', -1]] ) @@ -474,6 +480,22 @@ class SyncUserStepManager { ? MIN_START_MTS : start } + + _mergeUserFilters (userIdFilter, subUserIdFilter) { + const $isNull = [ + ...userIdFilter?.$isNull ?? [], + ...subUserIdFilter?.$isNull ?? [] + ] + const $isNullObj = $isNull.length > 0 + ? { $isNull } + : {} + + return merge( + userIdFilter, + subUserIdFilter, + $isNullObj + ) + } } decorateInjectable(SyncUserStepManager, depsTypes) From e71eeb42b4666639467dbd9cdb2a22dfddd43bef Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 15:53:37 +0200 Subject: [PATCH 10/16] Fix check have-colls-been-synced for candles --- .../loc.api/sync/sync.colls.manager/index.js | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/workers/loc.api/sync/sync.colls.manager/index.js b/workers/loc.api/sync/sync.colls.manager/index.js index 33816ae4c..57099b3b3 100644 --- a/workers/loc.api/sync/sync.colls.manager/index.js +++ b/workers/loc.api/sync/sync.colls.manager/index.js @@ -91,6 +91,7 @@ class SyncCollsManager { for (const [method, schema] of this._methodCollMap) { const { + name, type, isSyncRequiredAtLeastOnce } = schema @@ -101,7 +102,10 @@ class SyncCollsManager { ) { continue } - if (isPublic(type)) { + if ( + isPublic(type) && + name !== this.TABLES_NAMES.CANDLES + ) { const isDone = completedColls.some((completedColl) => ( completedColl?.isBaseStepReady && completedColl?.collName === method @@ -260,11 +264,21 @@ class SyncCollsManager { ) } - _getCompletedCollsBy (filter = {}) { - return this.dao.getElemsInCollBy( + async _getCompletedCollsBy (filter = {}) { + const completedColls = await this.dao.getElemsInCollBy( this.TABLES_NAMES.SYNC_USER_STEPS, { filter } ) + + if (!Array.isArray(completedColls)) { + return [] + } + + // Considers candles referenced to specific user + return completedColls.filter((completedColl) => ( + completedColl.collName !== this.SYNC_API_METHODS.CANDLES || + Number.isInteger(completedColl.user_id) + )) } } From f03efd666aa9c0cc03625ec8e7bd3d24e2857724 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 16:05:14 +0200 Subject: [PATCH 11/16] Set candles limit 20 reqs/min --- workers/loc.api/bfx.api.router/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/loc.api/bfx.api.router/index.js b/workers/loc.api/bfx.api.router/index.js index 400fede08..a82ee22bf 100644 --- a/workers/loc.api/bfx.api.router/index.js +++ b/workers/loc.api/bfx.api.router/index.js @@ -43,7 +43,7 @@ class BfxApiRouter extends BaseBfxApiRouter { ['fundingTrades', 90], ['trades', 15], ['statusMessages', 90], - ['candles', 30], + ['candles', 20], ['orderTrades', 90], ['orderHistory', 90], ['activeOrders', 90], From b182605ef72c1f794474d5a2bdb2df332a299c3c Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 16:06:38 +0200 Subject: [PATCH 12/16] Add approximate candles sync time estimation --- workers/loc.api/sync/data.inserter/index.js | 13 +++++++++++-- workers/loc.api/sync/progress/index.js | 13 +++++++++++-- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/workers/loc.api/sync/data.inserter/index.js b/workers/loc.api/sync/data.inserter/index.js index 006b16466..b8befa895 100644 --- a/workers/loc.api/sync/data.inserter/index.js +++ b/workers/loc.api/sync/data.inserter/index.js @@ -60,7 +60,8 @@ const depsTypes = (TYPES) => [ TYPES.WSEventEmitter, TYPES.GetDataFromApi, TYPES.SyncTempTablesManager, - TYPES.SyncUserStepManager + TYPES.SyncUserStepManager, + TYPES.Progress ] class DataInserter extends EventEmitter { constructor ( @@ -79,7 +80,8 @@ class DataInserter extends EventEmitter { wsEventEmitter, getDataFromApi, syncTempTablesManager, - syncUserStepManager + syncUserStepManager, + progress ) { super() @@ -99,6 +101,7 @@ class DataInserter extends EventEmitter { this.getDataFromApi = getDataFromApi this.syncTempTablesManager = syncTempTablesManager this.syncUserStepManager = syncUserStepManager + this.progress = progress this._asyncProgressHandlers = [] this._auth = null @@ -302,6 +305,12 @@ class DataInserter extends EventEmitter { const { type, start } = schema ?? {} + if (schema.name === this.ALLOWED_COLLS.CANDLES) { + // Considers 10 reqs/min for candles + const leftTime = Math.floor((60 / 10) * start.length * 1000) + this.progress.setCandlesLeftTime(leftTime) + } + for (const syncUserStepData of start) { if (isInsertableArrObj(schema?.type, { isPublic: true })) { await this._insertApiData( diff --git a/workers/loc.api/sync/progress/index.js b/workers/loc.api/sync/progress/index.js index 16f8c7321..91b17b90b 100644 --- a/workers/loc.api/sync/progress/index.js +++ b/workers/loc.api/sync/progress/index.js @@ -41,6 +41,7 @@ class Progress extends EventEmitter { this._state = null this._hasNotProgressChanged = true this._leftTime = null + this._candlesLeftTime = null this._prevEstimatedLeftTime = Date.now() } @@ -145,7 +146,7 @@ class Progress extends EventEmitter { return this } - async _estimateSyncTime (params) { + _estimateSyncTime (params) { const { error, progress, @@ -218,6 +219,7 @@ class Progress extends EventEmitter { } = params ?? {} if (!hasNotProgressChanged) { + this._candlesLeftTime = null this._prevEstimatedLeftTime = nowMts this._leftTime = Math.floor((spentTime / progress) * (100 - progress)) @@ -233,7 +235,10 @@ class Progress extends EventEmitter { this._prevEstimatedLeftTime = nowMts this._leftTime = leftTime > 0 ? leftTime - : null + : this._candlesLeftTime ?? this._calcLeftTime({ + ...params, + hasNotProgressChanged: false + }) return this._leftTime } @@ -304,6 +309,10 @@ class Progress extends EventEmitter { ) ) } + + setCandlesLeftTime (leftTime) { + this._candlesLeftTime = leftTime + } } decorateInjectable(Progress, depsTypes) From 17619842c2bb196072c0ef6dbeca909ab0b30d60 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Thu, 23 Nov 2023 16:30:01 +0200 Subject: [PATCH 13/16] Fix bfx auth token refreshing --- workers/loc.api/sync/authenticator/index.js | 33 +++++++++++++-------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/workers/loc.api/sync/authenticator/index.js b/workers/loc.api/sync/authenticator/index.js index d69d06228..718290be0 100644 --- a/workers/loc.api/sync/authenticator/index.js +++ b/workers/loc.api/sync/authenticator/index.js @@ -1082,19 +1082,25 @@ class Authenticator { } = user ?? {} const tokenKey = this._getTokenKeyByEmailField(user) + const userSession = this.userSessions.get(token) ?? {} const authTokenRefreshInterval = authToken ? this.setupAuthTokenRefreshInterval(user) : null this.userSessions.set( - token, { - ...user, - authTokenFn: () => { - return this.userSessions.get(token)?.authToken - }, - authTokenRefreshInterval, - authTokenInvalidateIntervals: new Map() - } + token, + Object.assign( + userSession, + user, + { + authTokenFn: () => { + return this.userSessions.get(token)?.authToken + }, + authTokenRefreshInterval, + authTokenInvalidateIntervals: userSession + ?.authTokenInvalidateIntervals ?? new Map() + } + ) ) this.userTokenMapByEmail.set(tokenKey, token) } @@ -1133,9 +1139,8 @@ class Authenticator { ? token : this.userTokenMapByEmail.get(tokenKey) - this.userTokenMapByEmail.delete(tokenKey) - const session = this.userSessions.get(_token) ?? {} + this.userTokenMapByEmail.delete(this._getTokenKeyByEmailField(session)) const { authTokenRefreshInterval, authTokenInvalidateIntervals = new Map() @@ -1272,8 +1277,12 @@ class Authenticator { const { authTokenInvalidateIntervals } = userSession let count = 0 + if (authTokenInvalidateIntervals.has(authToken)) { + return + } + const authTokenInvalidateInterval = setInterval(async () => { - const session = this.userSessions.get(token) + const session = this.userSessions.get(token) ?? userSession try { count += 1 @@ -1288,7 +1297,7 @@ class Authenticator { } catch (err) { if (count >= 3) { clearInterval(authTokenInvalidateInterval) - session.authTokenInvalidateIntervals?.delete(authToken) + session?.authTokenInvalidateIntervals?.delete(authToken) } this.logger.debug(err) From 648017050f8e8470ae07bc790898d4c39e273e33 Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Fri, 24 Nov 2023 12:08:38 +0200 Subject: [PATCH 14/16] Set 10 reqs per min for bfx api trades endpoint --- workers/loc.api/bfx.api.router/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/loc.api/bfx.api.router/index.js b/workers/loc.api/bfx.api.router/index.js index a82ee22bf..5a011cd60 100644 --- a/workers/loc.api/bfx.api.router/index.js +++ b/workers/loc.api/bfx.api.router/index.js @@ -41,7 +41,7 @@ class BfxApiRouter extends BaseBfxApiRouter { ['payInvoiceList', 90], ['accountTrades', 90], ['fundingTrades', 90], - ['trades', 15], + ['trades', 10], ['statusMessages', 90], ['candles', 20], ['orderTrades', 90], From b291eea98942f993dc6bdb984f19e856c4f11c3b Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Tue, 28 Nov 2023 15:25:54 +0200 Subject: [PATCH 15/16] Bump version up to 4.12.0 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 3e15365db..8b7ff1c72 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bfx-reports-framework", - "version": "4.11.0", + "version": "4.12.0", "description": "Bitfinex reports framework", "main": "worker.js", "license": "Apache-2.0", From 06de3cd12bcfad3068067d59687c3da5dbaa055e Mon Sep 17 00:00:00 2001 From: Vladimir Voronkov Date: Tue, 28 Nov 2023 15:26:19 +0200 Subject: [PATCH 16/16] Update bfx-report-ui sub-module --- bfx-report-ui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bfx-report-ui b/bfx-report-ui index a11ddc2e5..c707a10f3 160000 --- a/bfx-report-ui +++ b/bfx-report-ui @@ -1 +1 @@ -Subproject commit a11ddc2e57543e83ac17741b7c34d37abe3c2618 +Subproject commit c707a10f3b064fb1ecff263ea7e7a2efc3167fad