diff --git a/bfx-report-ui b/bfx-report-ui index a11ddc2e5..c707a10f3 160000 --- a/bfx-report-ui +++ b/bfx-report-ui @@ -1 +1 @@ -Subproject commit a11ddc2e57543e83ac17741b7c34d37abe3c2618 +Subproject commit c707a10f3b064fb1ecff263ea7e7a2efc3167fad diff --git a/package.json b/package.json index 3e15365db..8b7ff1c72 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bfx-reports-framework", - "version": "4.11.0", + "version": "4.12.0", "description": "Bitfinex reports framework", "main": "worker.js", "license": "Apache-2.0", diff --git a/workers/loc.api/bfx.api.router/index.js b/workers/loc.api/bfx.api.router/index.js index 400fede08..5a011cd60 100644 --- a/workers/loc.api/bfx.api.router/index.js +++ b/workers/loc.api/bfx.api.router/index.js @@ -41,9 +41,9 @@ class BfxApiRouter extends BaseBfxApiRouter { ['payInvoiceList', 90], ['accountTrades', 90], ['fundingTrades', 90], - ['trades', 15], + ['trades', 10], ['statusMessages', 90], - ['candles', 30], + ['candles', 20], ['orderTrades', 90], ['orderHistory', 90], ['activeOrders', 90], diff --git a/workers/loc.api/di/factories/sync-user-step-data-factory.js b/workers/loc.api/di/factories/sync-user-step-data-factory.js index 9ca198666..99cdd3a8d 100644 --- a/workers/loc.api/di/factories/sync-user-step-data-factory.js +++ b/workers/loc.api/di/factories/sync-user-step-data-factory.js @@ -9,6 +9,7 @@ module.exports = (ctx) => { ) const paramsOrder = [ + 'syncedAt', 'symbol', 'timeframe', 'baseStart', diff --git a/workers/loc.api/sync/authenticator/index.js b/workers/loc.api/sync/authenticator/index.js index d69d06228..718290be0 100644 --- a/workers/loc.api/sync/authenticator/index.js +++ b/workers/loc.api/sync/authenticator/index.js @@ -1082,19 +1082,25 @@ class Authenticator { } = user ?? {} const tokenKey = this._getTokenKeyByEmailField(user) + const userSession = this.userSessions.get(token) ?? {} const authTokenRefreshInterval = authToken ? this.setupAuthTokenRefreshInterval(user) : null this.userSessions.set( - token, { - ...user, - authTokenFn: () => { - return this.userSessions.get(token)?.authToken - }, - authTokenRefreshInterval, - authTokenInvalidateIntervals: new Map() - } + token, + Object.assign( + userSession, + user, + { + authTokenFn: () => { + return this.userSessions.get(token)?.authToken + }, + authTokenRefreshInterval, + authTokenInvalidateIntervals: userSession + ?.authTokenInvalidateIntervals ?? new Map() + } + ) ) this.userTokenMapByEmail.set(tokenKey, token) } @@ -1133,9 +1139,8 @@ class Authenticator { ? token : this.userTokenMapByEmail.get(tokenKey) - this.userTokenMapByEmail.delete(tokenKey) - const session = this.userSessions.get(_token) ?? {} + this.userTokenMapByEmail.delete(this._getTokenKeyByEmailField(session)) const { authTokenRefreshInterval, authTokenInvalidateIntervals = new Map() @@ -1272,8 +1277,12 @@ class Authenticator { const { authTokenInvalidateIntervals } = userSession let count = 0 + if (authTokenInvalidateIntervals.has(authToken)) { + return + } + const authTokenInvalidateInterval = setInterval(async () => { - const session = this.userSessions.get(token) + const session = this.userSessions.get(token) ?? userSession try { count += 1 @@ -1288,7 +1297,7 @@ class Authenticator { } catch (err) { if (count >= 3) { clearInterval(authTokenInvalidateInterval) - session.authTokenInvalidateIntervals?.delete(authToken) + session?.authTokenInvalidateIntervals?.delete(authToken) } this.logger.debug(err) diff --git a/workers/loc.api/sync/currency.converter/index.js b/workers/loc.api/sync/currency.converter/index.js index 22a138c89..1db45f31e 100644 --- a/workers/loc.api/sync/currency.converter/index.js +++ b/workers/loc.api/sync/currency.converter/index.js @@ -30,7 +30,8 @@ const depsTypes = (TYPES) => [ TYPES.SyncSchema, TYPES.FOREX_SYMBS, TYPES.ALLOWED_COLLS, - TYPES.SYNC_API_METHODS + TYPES.SYNC_API_METHODS, + TYPES.Logger ] class CurrencyConverter { constructor ( @@ -40,7 +41,8 @@ class CurrencyConverter { syncSchema, FOREX_SYMBS, ALLOWED_COLLS, - SYNC_API_METHODS + SYNC_API_METHODS, + logger ) { this.rService = rService this.getDataFromApi = getDataFromApi @@ -49,6 +51,7 @@ class CurrencyConverter { this.FOREX_SYMBS = FOREX_SYMBS this.ALLOWED_COLLS = ALLOWED_COLLS this.SYNC_API_METHODS = SYNC_API_METHODS + this.logger = logger this._COLL_NAMES = { PUBLIC_TRADES: 'publicTrades', @@ -114,6 +117,8 @@ class CurrencyConverter { return this.currenciesSynonymous } } catch (err) { + this.logger.debug(err) + return this.currenciesSynonymous } } diff --git a/workers/loc.api/sync/data.inserter/data.checker/index.js b/workers/loc.api/sync/data.inserter/data.checker/index.js index 7b099dafc..e6a90cb17 100644 --- a/workers/loc.api/sync/data.inserter/data.checker/index.js +++ b/workers/loc.api/sync/data.inserter/data.checker/index.js @@ -95,14 +95,17 @@ class DataChecker { return filterMethodCollMap(methodCollMap) } - async checkNewPublicData () { + /* + * `authMap` can be empty + */ + async checkNewPublicData (authMap) { const methodCollMap = this._getMethodCollMap() if (this._isInterrupted) { return filterMethodCollMap(methodCollMap, true) } - await this._checkNewDataPublicArrObjType(methodCollMap) + await this._checkNewDataPublicArrObjType(authMap, methodCollMap) await this._checkNewPublicUpdatableData(methodCollMap) return filterMethodCollMap(methodCollMap, true) @@ -179,7 +182,7 @@ class DataChecker { schema.start.push(freshSyncUserStepData) } - async _checkNewDataPublicArrObjType (methodCollMap) { + async _checkNewDataPublicArrObjType (authMap, methodCollMap) { for (const [method, schema] of methodCollMap) { if (this._isInterrupted) { return @@ -191,7 +194,21 @@ class DataChecker { this._resetSyncSchemaProps(schema) if (schema.name === this.ALLOWED_COLLS.CANDLES) { - await this._checkNewCandlesData(method, schema) + // If `authMap` is empty sync candles for all users + const _authMap = ( + !(authMap instanceof Map) || + authMap.size === 0 + ) + ? new Map([['ALL', {}]]) + : authMap + + for (const authItem of _authMap) { + await this._checkNewCandlesData( + method, + schema, + authItem[1] + ) + } } if ( schema.name === this.ALLOWED_COLLS.PUBLIC_TRADES || @@ -336,20 +353,29 @@ class DataChecker { */ async _checkNewCandlesData ( method, - schema + schema, + auth ) { if (this._isInterrupted) { return } + const { _id: userId, subUser } = auth ?? {} + const { _id: subUserId } = subUser ?? {} + const usersFilter = Number.isInteger(userId) + ? { $eq: { user_id: userId } } + : {} + const currMts = Date.now() - const firstLedgerMts = await this._getFirstLedgerMts() + const firstLedgerMts = await this._getFirstLedgerMts(usersFilter) if (!Number.isInteger(firstLedgerMts)) { return } - const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers() + const uniqueSymbsSet = await this._getUniqueSymbsFromLedgers( + usersFilter + ) const candlesPairsSet = new Set() for (const symbol of uniqueSymbsSet) { @@ -389,7 +415,9 @@ class DataChecker { collName: method, symbol, timeframe: CANDLES_TIMEFRAME, - defaultStart: firstLedgerMts + defaultStart: firstLedgerMts, + userId, + subUserId } ) @@ -398,7 +426,6 @@ class DataChecker { !syncUserStepData.isCurrStepReady ) { schema.hasNewData = true - schema.start.push(syncUserStepData) } const wasStartPointChanged = this._wasStartPointChanged( @@ -407,39 +434,42 @@ class DataChecker { ) const shouldFreshSyncBeAdded = this._shouldFreshSyncBeAdded( syncUserStepData, - currMts + currMts, + { dayOfYear: 1 } ) - if ( - !wasStartPointChanged && - !shouldFreshSyncBeAdded - ) { - continue - } - - const freshSyncUserStepData = this.syncUserStepDataFactory({ - ...syncUserStepData.getParams(), - isBaseStepReady: true, - isCurrStepReady: true - }) - if (wasStartPointChanged) { - freshSyncUserStepData.setParams({ + syncUserStepData.setParams({ baseStart: firstLedgerMts, - baseEnd: syncUserStepData.baseStart, isBaseStepReady: false }) + + schema.hasNewData = true } if (shouldFreshSyncBeAdded) { - freshSyncUserStepData.setParams({ + syncUserStepData.setParams({ currStart: lastElemMtsFromTables, currEnd: currMts, isCurrStepReady: false }) + + schema.hasNewData = true } - schema.hasNewData = true - schema.start.push(freshSyncUserStepData) + if (!schema.hasNewData) { + continue + } + + // To keep flow: one candles request per currency + if (!syncUserStepData.isBaseStepReady) { + syncUserStepData.setParams({ + baseEnd: syncUserStepData.currEnd ?? currMts, + isCurrStepReady: true + }) + } + + syncUserStepData.auth = auth + schema.start.push(syncUserStepData) } } @@ -502,7 +532,8 @@ class DataChecker { ) { const { measure = 'minutes', - allowedTimeDiff = 60 + allowedTimeDiff = 60, + dayOfYear } = allowedDiff ?? {} const baseEnd = ( @@ -517,15 +548,28 @@ class DataChecker { ) ? syncUserStepData.currEnd : 0 + const syncedAt = syncUserStepData.hasSyncedAt + ? syncUserStepData.syncedAt + : 0 const momentBaseEnd = moment.utc(baseEnd) const momentCurrEnd = moment.utc(currEnd) const momentCurrMts = moment.utc(currMts) + const momentSyncedAt = moment.utc(syncedAt) const momentMaxEnd = moment.max(momentBaseEnd, momentCurrEnd) const momentDiff = momentCurrMts.diff(momentMaxEnd, measure) - - return momentDiff > allowedTimeDiff + const yearsDiff = momentCurrMts.year() - momentSyncedAt.year() + const dayOfYearDiff = momentCurrMts.dayOfYear() - momentSyncedAt.dayOfYear() + + return ( + momentDiff > allowedTimeDiff && + ( + !Number.isFinite(dayOfYear) || + yearsDiff >= 1 || + dayOfYearDiff >= dayOfYear + ) + ) } _wasStartPointChanged ( @@ -553,8 +597,11 @@ class DataChecker { return momentDiff > allowedTimeDiff } - async _getFirstLedgerMts () { - const firstElemFilter = { $not: { currency: 'USD' } } + async _getFirstLedgerMts (usersFilter) { + const firstElemFilter = { + ...usersFilter, + $not: { currency: 'USD' } + } const firstElemOrder = [['mts', 1]] const tempLedgersTableName = SyncTempTablesManager.getTempTableName( @@ -600,9 +647,12 @@ class DataChecker { return mts } - async _getUniqueSymbsFromLedgers () { + async _getUniqueSymbsFromLedgers (usersFilter) { const ledgerParams = { - filter: { $not: { currency: this.FOREX_SYMBS } }, + filter: { + ...usersFilter, + $not: { currency: this.FOREX_SYMBS } + }, isDistinct: true, projection: ['currency'] } diff --git a/workers/loc.api/sync/data.inserter/index.js b/workers/loc.api/sync/data.inserter/index.js index 04b35617b..b8befa895 100644 --- a/workers/loc.api/sync/data.inserter/index.js +++ b/workers/loc.api/sync/data.inserter/index.js @@ -60,7 +60,8 @@ const depsTypes = (TYPES) => [ TYPES.WSEventEmitter, TYPES.GetDataFromApi, TYPES.SyncTempTablesManager, - TYPES.SyncUserStepManager + TYPES.SyncUserStepManager, + TYPES.Progress ] class DataInserter extends EventEmitter { constructor ( @@ -79,7 +80,8 @@ class DataInserter extends EventEmitter { wsEventEmitter, getDataFromApi, syncTempTablesManager, - syncUserStepManager + syncUserStepManager, + progress ) { super() @@ -99,6 +101,7 @@ class DataInserter extends EventEmitter { this.getDataFromApi = getDataFromApi this.syncTempTablesManager = syncTempTablesManager this.syncUserStepManager = syncUserStepManager + this.progress = progress this._asyncProgressHandlers = [] this._auth = null @@ -284,7 +287,7 @@ class DataInserter extends EventEmitter { await this.wsEventEmitter .emitSyncingStep('CHECKING_NEW_PUBLIC_DATA') const methodCollMap = await this.dataChecker - .checkNewPublicData() + .checkNewPublicData(this._auth) await this.syncTempTablesManager .createTempDBStructureForCurrSync(methodCollMap) const size = methodCollMap.size @@ -302,6 +305,12 @@ class DataInserter extends EventEmitter { const { type, start } = schema ?? {} + if (schema.name === this.ALLOWED_COLLS.CANDLES) { + // Considers 10 reqs/min for candles + const leftTime = Math.floor((60 / 10) * start.length * 1000) + this.progress.setCandlesLeftTime(leftTime) + } + for (const syncUserStepData of start) { if (isInsertableArrObj(schema?.type, { isPublic: true })) { await this._insertApiData( @@ -417,7 +426,14 @@ class DataInserter extends EventEmitter { return } - const { userId, subUserId } = this._getUserIds(auth) + const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES + const _auth = ( + hasCandlesSection && + syncUserStepData?.auth + ) + ? syncUserStepData.auth + : auth + const { userId, subUserId } = this._getUserIds(_auth) await this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ collName: methodApi, userId, @@ -436,7 +452,6 @@ class DataInserter extends EventEmitter { hasTimeframe, areAllSymbolsRequired } = syncUserStepData - const hasCandlesSection = schema.name === this.ALLOWED_COLLS.CANDLES const params = {} @@ -926,19 +941,47 @@ class DataInserter extends EventEmitter { continue } - const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ - collName, - syncedAt, - ...this.syncUserStepManager.wereStepsSynced( - schema.start, - { - shouldNotMtsBeChecked: isUpdatable(schema?.type), - shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES - } - ) - }, { doNotQueueQuery: true }) + const startWithAuth = schema.start + .filter((syncUserStepData) => syncUserStepData?.auth) + const startWithoutAuth = schema.start + .filter((syncUserStepData) => !syncUserStepData?.auth) + + if (startWithAuth.length > 0) { + for (const syncUserStepData of startWithAuth) { + const { userId, subUserId } = this._getUserIds(syncUserStepData.auth) + + const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ + collName, + userId, + subUserId, + syncedAt, + ...this.syncUserStepManager.wereStepsSynced( + [syncUserStepData], + { + shouldNotMtsBeChecked: isUpdatable(schema?.type), + shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES + } + ) + }, { doNotQueueQuery: true }) + + updatesForPubCollsPromises.push(promise) + } + } + if (startWithoutAuth.length > 0) { + const promise = this.syncUserStepManager.updateOrInsertSyncInfoForCurrColl({ + collName, + syncedAt, + ...this.syncUserStepManager.wereStepsSynced( + schema.start, + { + shouldNotMtsBeChecked: isUpdatable(schema?.type), + shouldStartMtsBeChecked: schema?.name === this.ALLOWED_COLLS.STATUS_MESSAGES + } + ) + }, { doNotQueueQuery: true }) - updatesForPubCollsPromises.push(promise) + updatesForPubCollsPromises.push(promise) + } } await Promise.all(updatesForPubCollsPromises) diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js index 2e060e6bb..3aaa7c812 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/index.js @@ -162,14 +162,16 @@ class SyncUserStepManager { const userIdFilter = hasUserIdField ? { $eq: { user_id: userId } } - : {} + : { $isNull: ['user_id'] } const subUserIdFilter = hasSubUserIdField ? { $eq: { subUserId } } - : {} + : { $isNull: ['subUserId'] } const filter = merge( { $eq: { collName } }, - userIdFilter, - subUserIdFilter + this._mergeUserFilters( + userIdFilter, + subUserIdFilter + ) ) const updateRes = await this.dao.updateCollBy( @@ -211,13 +213,17 @@ class SyncUserStepManager { const defaultStart = this._getMinStart(_defaultStart) const hasUserIdField = ( - typeof model?.user_id === 'string' && Number.isInteger(userId) ) + const hasUserIdFieldInModel = ( + typeof model?.user_id === 'string' + ) const hasSubUserIdField = ( - typeof model?.subUserId === 'string' && Number.isInteger(subUserId) ) + const hasSubUserIdFieldInModel = ( + typeof model?.subUserId === 'string' + ) const hasSymbolField = ( symbolFieldName && typeof model?.[symbolFieldName] === 'string' && @@ -230,13 +236,19 @@ class SyncUserStepManager { timeframe && typeof timeframe === 'string' ) - const shouldCollBePublic = !hasUserIdField + const shouldCollBePublic = ( + !hasUserIdField || + !hasUserIdFieldInModel + ) const shouldTableDataBeFetched = ( dateFieldName && typeof dateFieldName === 'string' ) - if (isPublic(syncSchema?.type) !== shouldCollBePublic) { + if ( + tableName !== this.TABLES_NAMES.CANDLES && + isPublic(syncSchema?.type) !== shouldCollBePublic + ) { throw new LastSyncedInfoGettingError() } @@ -245,10 +257,10 @@ class SyncUserStepManager { const userIdFilter = hasUserIdField ? { $eq: { user_id: userId } } - : {} + : { $isNull: ['user_id'] } const subUserIdFilter = hasSubUserIdField ? { $eq: { subUserId } } - : {} + : { $isNull: ['subUserId'] } const symbolFilter = hasSymbolField ? { $eq: { [symbolFieldName]: symbol } } : {} @@ -257,8 +269,10 @@ class SyncUserStepManager { : {} const dataFilter = merge( {}, - userIdFilter, - subUserIdFilter, + this._mergeUserFilters( + hasUserIdFieldInModel ? userIdFilter : {}, + hasSubUserIdFieldInModel ? subUserIdFilter : {} + ), symbolFilter, timeframeFilter ) @@ -267,7 +281,10 @@ class SyncUserStepManager { this.TABLES_NAMES.SYNC_USER_STEPS, { collName, - ...userIdFilter + ...this._mergeUserFilters( + userIdFilter, + subUserIdFilter + ) }, [['syncedAt', -1]] ) @@ -326,7 +343,8 @@ class SyncUserStepManager { currStart, currEnd, isBaseStepReady = false, - isCurrStepReady = false + isCurrStepReady = false, + syncedAt } = syncUserStepInfo ?? {} const baseStart = this._getMinStart(_baseStart) @@ -348,7 +366,8 @@ class SyncUserStepManager { baseEnd: baseEnd ?? currMts, isBaseStepReady, symbol, - timeframe + timeframe, + syncedAt }) return { @@ -365,7 +384,8 @@ class SyncUserStepManager { isBaseStepReady, isCurrStepReady, symbol, - timeframe + timeframe, + syncedAt }) if (!isCurrStepReady) { @@ -460,6 +480,22 @@ class SyncUserStepManager { ? MIN_START_MTS : start } + + _mergeUserFilters (userIdFilter, subUserIdFilter) { + const $isNull = [ + ...userIdFilter?.$isNull ?? [], + ...subUserIdFilter?.$isNull ?? [] + ] + const $isNullObj = $isNull.length > 0 + ? { $isNull } + : {} + + return merge( + userIdFilter, + subUserIdFilter, + $isNullObj + ) + } } decorateInjectable(SyncUserStepManager, depsTypes) diff --git a/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js b/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js index c3cea5630..91ffafa58 100644 --- a/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js +++ b/workers/loc.api/sync/data.inserter/sync.user.step.manager/sync.user.step.data.js @@ -9,6 +9,7 @@ const { decorateInjectable } = require('../../../di/utils') */ class SyncUserStepData { constructor () { + this.syncedAt = null this.symbol = null this.timeframe = null this.baseStart = null @@ -23,6 +24,7 @@ class SyncUserStepData { } /** + * @param {?number} [syncedAt] - Used to specify synced mts point * @param {?string} [symbol] - Used to specify synced symbol, can be `_ALL` if all ones are syncing * @param {?string} [timeframe] - Used to specify synced timeframe, eg. for candles collection * @param {?number} [baseStart] - Used to specify base start mts point to continue first sync @@ -34,6 +36,7 @@ class SyncUserStepData { */ setParams (params = {}) { const { + syncedAt = this.syncedAt, symbol = this.symbol, timeframe = this.timeframe, baseStart = this.baseStart, @@ -44,6 +47,7 @@ class SyncUserStepData { isCurrStepReady = this.isCurrStepReady } = params ?? {} + this.syncedAt = syncedAt this.symbol = symbol this.timeframe = timeframe this.baseStart = baseStart @@ -117,6 +121,10 @@ class SyncUserStepData { get hasCurrStart () { return Number.isInteger(this.currStart) } + + get hasSyncedAt () { + return Number.isInteger(this.syncedAt) + } } decorateInjectable(SyncUserStepData) diff --git a/workers/loc.api/sync/progress/index.js b/workers/loc.api/sync/progress/index.js index 16f8c7321..91b17b90b 100644 --- a/workers/loc.api/sync/progress/index.js +++ b/workers/loc.api/sync/progress/index.js @@ -41,6 +41,7 @@ class Progress extends EventEmitter { this._state = null this._hasNotProgressChanged = true this._leftTime = null + this._candlesLeftTime = null this._prevEstimatedLeftTime = Date.now() } @@ -145,7 +146,7 @@ class Progress extends EventEmitter { return this } - async _estimateSyncTime (params) { + _estimateSyncTime (params) { const { error, progress, @@ -218,6 +219,7 @@ class Progress extends EventEmitter { } = params ?? {} if (!hasNotProgressChanged) { + this._candlesLeftTime = null this._prevEstimatedLeftTime = nowMts this._leftTime = Math.floor((spentTime / progress) * (100 - progress)) @@ -233,7 +235,10 @@ class Progress extends EventEmitter { this._prevEstimatedLeftTime = nowMts this._leftTime = leftTime > 0 ? leftTime - : null + : this._candlesLeftTime ?? this._calcLeftTime({ + ...params, + hasNotProgressChanged: false + }) return this._leftTime } @@ -304,6 +309,10 @@ class Progress extends EventEmitter { ) ) } + + setCandlesLeftTime (leftTime) { + this._candlesLeftTime = leftTime + } } decorateInjectable(Progress, depsTypes) diff --git a/workers/loc.api/sync/sync.colls.manager/index.js b/workers/loc.api/sync/sync.colls.manager/index.js index 33816ae4c..57099b3b3 100644 --- a/workers/loc.api/sync/sync.colls.manager/index.js +++ b/workers/loc.api/sync/sync.colls.manager/index.js @@ -91,6 +91,7 @@ class SyncCollsManager { for (const [method, schema] of this._methodCollMap) { const { + name, type, isSyncRequiredAtLeastOnce } = schema @@ -101,7 +102,10 @@ class SyncCollsManager { ) { continue } - if (isPublic(type)) { + if ( + isPublic(type) && + name !== this.TABLES_NAMES.CANDLES + ) { const isDone = completedColls.some((completedColl) => ( completedColl?.isBaseStepReady && completedColl?.collName === method @@ -260,11 +264,21 @@ class SyncCollsManager { ) } - _getCompletedCollsBy (filter = {}) { - return this.dao.getElemsInCollBy( + async _getCompletedCollsBy (filter = {}) { + const completedColls = await this.dao.getElemsInCollBy( this.TABLES_NAMES.SYNC_USER_STEPS, { filter } ) + + if (!Array.isArray(completedColls)) { + return [] + } + + // Considers candles referenced to specific user + return completedColls.filter((completedColl) => ( + completedColl.collName !== this.SYNC_API_METHODS.CANDLES || + Number.isInteger(completedColl.user_id) + )) } }