From e579f7a20102eaf94dc8354afcb16b924944339f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Mon, 28 Jun 2021 19:44:27 -0300 Subject: [PATCH 1/6] sdk: fix outdated unconfirmed udcDeposit.success actions When depositing to UDC, after the tx just got mined, it emits a udcDeposit.success. So far, it was being emitted with the value it just got from a new effectiveBalance call, which may be outdated because the tx didn't get confirmed yet. Instead, we now calculate the actual "final" effectiveBalance to be used there since we know it from previous balance. --- raiden-ts/src/services/epics/udc.ts | 63 ++++++++++--------------- raiden-ts/tests/integration/udc.spec.ts | 1 - 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/raiden-ts/src/services/epics/udc.ts b/raiden-ts/src/services/epics/udc.ts index 77ebfaa3ef..0ba7c87578 100644 --- a/raiden-ts/src/services/epics/udc.ts +++ b/raiden-ts/src/services/epics/udc.ts @@ -90,13 +90,13 @@ function makeUdcDeposit$( deps: Pick, ) { const { log, provider, config$, latest$ } = deps; + let finalBalance: UInt<32>; return defer(async () => Promise.all([ tokenContract.callStatic.balanceOf(sender) as Promise>, tokenContract.callStatic.allowance(sender, userDepositContract.address) as Promise>, ]), ).pipe( - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), withLatestFrom(config$, latest$), mergeWith(([[balance, allowance], { minimumAllowance }, { gasPrice }]) => approveIfNeeded$( @@ -108,23 +108,30 @@ function makeUdcDeposit$( { minimumAllowance, gasPrice }, ), ), - mergeMap(([[, , { gasPrice }]]) => - defer(async () => userDepositContract.callStatic.total_deposit(address)).pipe( - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), - mergeMap(async (deposited) => { - assert(deposited.add(deposit).eq(totalDeposit), [ - ErrorCodes.UDC_DEPOSIT_OUTDATED, - { requested: totalDeposit.toString(), current: deposited.toString() }, - ]); - // send setTotalDeposit transaction - return userDepositContract.deposit(address, totalDeposit, { gasPrice }); - }), - ), - ), + mergeMap(([[, , { gasPrice, udcDeposit: prev }]]) => { + assert(prev.totalDeposit.add(deposit).eq(totalDeposit), [ + ErrorCodes.UDC_DEPOSIT_OUTDATED, + { requested: totalDeposit, current: prev.totalDeposit }, + ]); + finalBalance = prev.balance.add(deposit) as UInt<32>; + // send setTotalDeposit transaction + return userDepositContract.deposit(address, totalDeposit, { gasPrice }); + }), assertTx('deposit', ErrorCodes.RDN_DEPOSIT_TRANSACTION_FAILED, { log, provider }), // retry also txFail errors, since estimateGas can lag behind just-opened channel or // just-approved allowance retryWhile(intervalFromConfig(config$), { onErrors: commonTxErrors, log: log.debug }), + map(([, receipt]) => + udcDeposit.success( + { + balance: finalBalance, + txHash: receipt.transactionHash, + txBlock: receipt.blockNumber, + confirmed: undefined, // let confirmationEpic confirm this action + }, + { totalDeposit }, + ), + ), ); } @@ -141,15 +148,12 @@ export function udcDepositEpic( {}: Observable, deps: RaidenEpicDeps, ): Observable { - const { userDepositContract, getTokenContract, address, signer, main, config$, provider } = deps; + const { userDepositContract, getTokenContract, address, signer, main, config$, log } = deps; return action$.pipe( filter(udcDeposit.request.is), concatMap((action) => - retryAsync$( - () => userDepositContract.callStatic.token() as Promise
, - provider.pollingInterval, - { onErrors: networkErrors }, - ).pipe( + defer(async () => userDepositContract.callStatic.token() as Promise
).pipe( + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, log: log.debug }), withLatestFrom(config$), mergeMap(([token, config]) => { const { signer: onchainSigner, address: onchainAddress } = chooseOnchainAccount( @@ -166,25 +170,6 @@ export function udcDepositEpic( deps, ); }), - mergeMap(([, receipt]) => - retryAsync$( - () => userDepositContract.callStatic.effectiveBalance(address) as Promise>, - provider.pollingInterval, - { onErrors: networkErrors }, - ).pipe( - map((balance) => - udcDeposit.success( - { - balance, - txHash: receipt.transactionHash, - txBlock: receipt.blockNumber, - confirmed: undefined, // let confirmationEpic confirm this action - }, - action.meta, - ), - ), - ), - ), catchError((error) => of(udcDeposit.failure(error, action.meta))), ), ), diff --git a/raiden-ts/tests/integration/udc.spec.ts b/raiden-ts/tests/integration/udc.spec.ts index bcc9d5dd1e..55d49c9558 100644 --- a/raiden-ts/tests/integration/udc.spec.ts +++ b/raiden-ts/tests/integration/udc.spec.ts @@ -170,7 +170,6 @@ describe('udcDepositEpic', () => { const depositTx = makeTransaction(undefined, { to: userDepositContract.address }); userDepositContract.deposit.mockResolvedValue(depositTx); - userDepositContract.effectiveBalance.mockResolvedValue(balance); await raiden.start(); raiden.store.dispatch(udcDeposit.request({ deposit }, { totalDeposit: balance })); From 0279123647606024668eb5bce95bb614adb50ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Mon, 28 Jun 2021 20:52:28 -0300 Subject: [PATCH 2/6] sdk: update capabilities only after matrix is ready ensures there's some time after raiden start before updating capabilities on matrix for the first time; also, matrix gets started only after raiden is synced, to avoid receiving messages while we're not ready yet. --- raiden-ts/src/transport/epics/init.ts | 60 +++++++------------ raiden-ts/src/transport/epics/presence.ts | 4 +- raiden-ts/tests/integration/transport.spec.ts | 10 +--- 3 files changed, 23 insertions(+), 51 deletions(-) diff --git a/raiden-ts/src/transport/epics/init.ts b/raiden-ts/src/transport/epics/init.ts index 6b0553e8ca..095a7deb6d 100644 --- a/raiden-ts/src/transport/epics/init.ts +++ b/raiden-ts/src/transport/epics/init.ts @@ -1,17 +1,17 @@ import * as t from 'io-ts'; import constant from 'lodash/constant'; -import isEmpty from 'lodash/isEmpty'; import sortBy from 'lodash/sortBy'; import type { Filter, LoginPayload, MatrixClient } from 'matrix-js-sdk'; import { createClient } from 'matrix-js-sdk'; import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; -import type { AsyncSubject, Observable } from 'rxjs'; -import { combineLatest, defer, EMPTY, from, merge, of, throwError, timer } from 'rxjs'; +import type { Observable } from 'rxjs'; +import { combineLatest, defer, EMPTY, from, merge, of, throwError } from 'rxjs'; import { fromFetch } from 'rxjs/fetch'; import { catchError, concatMap, delayWhen, + endWith, filter, first, ignoreElements, @@ -28,7 +28,6 @@ import { } from 'rxjs/operators'; import type { RaidenAction } from '../../actions'; -import type { RaidenConfig } from '../../config'; import { intervalFromConfig } from '../../config'; import { RAIDEN_DEVICE_ID } from '../../constants'; import type { RaidenState } from '../../state'; @@ -40,8 +39,6 @@ import { completeWith, lastMap, pluckDistinct, retryWhile } from '../../utils/rx import { decode } from '../../utils/types'; import { matrixSetup } from '../actions'; import type { RaidenMatrixSetup } from '../state'; -import type { Caps } from '../types'; -import { stringifyCaps } from '../utils'; /** * Creates and returns a matrix filter. The filter reduces the size of the initial sync by @@ -71,8 +68,7 @@ async function createMatrixFilter(matrix: MatrixClient, notRooms: string[] = []) function startMatrixSync( action$: Observable, matrix: MatrixClient, - matrix$: AsyncSubject, - config$: Observable, + { matrix$, config$, init$ }: Pick, ) { return action$.pipe( filter(matrixSetup.is), @@ -81,25 +77,17 @@ function startMatrixSync( matrix$.next(matrix); matrix$.complete(); }), - withLatestFrom(config$), - // wait 1s before starting matrix, so event listeners can be registered - delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))), - mergeMap(([, config]) => - defer(async () => createMatrixFilter(matrix)).pipe( - mergeMap(async (filter) => { - await matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true); - return filter; - }), - mergeMap(async (filter) => matrix.startClient({ filter })), - mergeMap(() => { - // after [15-45]s (default) random delay after starting, update/reload presence - return timer(Math.round((Math.random() + 0.5) * config.httpTimeout)).pipe( - mergeMap(async () => - matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), - ), - ); - }), - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), + mergeMap(() => + defer(async () => + Promise.all([ + createMatrixFilter(matrix), + matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true), + ]), + ).pipe( + // delay startClient (going online) to after raiden is synced + delayWhen(() => init$.pipe(ignoreElements(), endWith(true))), + mergeMap(async ([filter]) => matrix.startClient({ filter })), + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }), ), ), ignoreElements(), @@ -171,14 +159,12 @@ function fetchSortedMatrixServers$(matrixServerLookup: string, httpTimeout: numb * @param deps.address - Our address (to compose matrix user) * @param deps.signer - Signer to be used to sign password and displayName * @param deps.config$ - Config observable - * @param caps - Transport capabilities to set in user's avatar_url * @returns Observable of one { matrix, server, setup } object */ function setupMatrixClient$( server: string, setup: RaidenMatrixSetup | undefined, { address, signer, config$ }: Pick, - caps: Caps | null, ) { const homeserver = getServerName(server); assert(homeserver, [ErrorCodes.TRNS_NO_SERVERNAME, { server }]); @@ -253,12 +239,7 @@ function setupMatrixClient$( // the APIs below are authenticated, and therefore also act as validator mergeMap(({ matrix, server, setup }) => // set these properties before starting sync - defer(() => - Promise.all([ - matrix.setDisplayName(setup.displayName), - matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), - ]), - ).pipe( + defer(async () => matrix.setDisplayName(setup.displayName)).pipe( retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), mapTo({ matrix, server, setup }), // return triplet again ), @@ -285,11 +266,12 @@ function setupMatrixClient$( export function initMatrixEpic( action$: Observable, {}: Observable, - { address, signer, matrix$, latest$, config$, init$ }: RaidenEpicDeps, + deps: RaidenEpicDeps, ): Observable { + const { matrix$, latest$, config$, init$ } = deps; return combineLatest([latest$, config$]).pipe( first(), // at startup - mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout, caps }]) => { + mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout }]) => { const server = state.transport.server, setup = state.transport.setup; // when matrix$ async subject completes, transport init task is completed @@ -321,7 +303,7 @@ export function initMatrixEpic( catchError(andSuppress), // servers$ may error, so store lastError concatMap(({ server, setup }) => // serially, try setting up client and validate its credential - setupMatrixClient$(server, setup, { address, signer, config$ }, caps).pipe( + setupMatrixClient$(server, setup, deps).pipe( // store and suppress any 'setupMatrixClient$' error catchError(andSuppress), ), @@ -341,7 +323,7 @@ export function initMatrixEpic( mergeMap(({ matrix, server, setup }) => merge( // wait for matrixSetup through reducer, then resolves matrix$ with client and starts it - startMatrixSync(action$, matrix, matrix$, config$), + startMatrixSync(action$, matrix, deps), // emit matrixSetup in parallel to be persisted in state of(matrixSetup({ server, setup })), // monitor config.logger & disable or re-enable matrix's logger accordingly diff --git a/raiden-ts/src/transport/epics/presence.ts b/raiden-ts/src/transport/epics/presence.ts index d27474cb35..20d32cf414 100644 --- a/raiden-ts/src/transport/epics/presence.ts +++ b/raiden-ts/src/transport/epics/presence.ts @@ -15,7 +15,6 @@ import { map, mergeMap, pluck, - skip, switchMap, tap, timeout, @@ -156,7 +155,7 @@ export function matrixMonitorChannelPresenceEpic( } /** - * Update our matrix's avatarUrl on config.caps changes + * Update our matrix's avatarUrl on config.caps on startup and changes * * @param action$ - Observable of RaidenActions * @param state$ - Observable of RaidenStates @@ -174,7 +173,6 @@ export function matrixUpdateCapsEpic( completeWith(action$), pluck('caps'), distinctUntilChanged(isEqual), - skip(1), // skip replay(1) and act only on changes switchMap((caps) => matrix$.pipe( mergeMap((matrix) => diff --git a/raiden-ts/tests/integration/transport.spec.ts b/raiden-ts/tests/integration/transport.spec.ts index 1bff94fbe9..cc96ea06c8 100644 --- a/raiden-ts/tests/integration/transport.spec.ts +++ b/raiden-ts/tests/integration/transport.spec.ts @@ -108,7 +108,7 @@ describe('initMatrixEpic', () => { afterEach(() => jest.restoreAllMocks()); test('matrix stored setup', async () => { - expect.assertions(4); + expect.assertions(3); const userId = `@${raiden.address.toLowerCase()}:${matrixServer}`; const displayName = await raiden.deps.signer.signMessage(userId); @@ -138,14 +138,6 @@ describe('initMatrixEpic', () => { ); // ensure if stored setup works, servers list don't need to be fetched expect(fetch).not.toHaveBeenCalled(); - - // test presence got set again after some time, to overcome presence bug - await sleep(2 * raiden.config.httpTimeout); - // test presence got set again after some time, to overcome presence bug - expect(matrix.setPresence).toHaveBeenCalledWith({ - presence: 'online', - status_msg: expect.any(String), - }); }); test('matrix server config set without stored setup', async () => { From abe62fbf4bfba7c20558a4d75661ea3c074f3ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Mon, 28 Jun 2021 21:41:04 -0300 Subject: [PATCH 3/6] sdk: quickfix re-throw login error if register also fails --- raiden-ts/src/transport/epics/init.ts | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/raiden-ts/src/transport/epics/init.ts b/raiden-ts/src/transport/epics/init.ts index 095a7deb6d..465e6e943e 100644 --- a/raiden-ts/src/transport/epics/init.ts +++ b/raiden-ts/src/transport/epics/init.ts @@ -1,7 +1,6 @@ import * as t from 'io-ts'; -import constant from 'lodash/constant'; import sortBy from 'lodash/sortBy'; -import type { Filter, LoginPayload, MatrixClient } from 'matrix-js-sdk'; +import type { Filter, MatrixClient } from 'matrix-js-sdk'; import { createClient } from 'matrix-js-sdk'; import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; import type { Observable } from 'rxjs'; @@ -197,16 +196,18 @@ function setupMatrixClient$( }), ).pipe( catchError(async (err) => { - return matrix - .registerRequest({ + try { + return await matrix.registerRequest({ username, password, device_id: RAIDEN_DEVICE_ID, - }) - .catch(constant(err)) as Promise; - // if register fails, throws login error as it's more informative + }); + } catch (e) { + // if register fails, throws login error as it's more informative + throw err; + } }), - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }), ), ), mergeMap(({ access_token, device_id, user_id }) => { From 90334f388ba17409d87671efdcd77f66320fe0e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Mon, 28 Jun 2021 22:02:56 -0300 Subject: [PATCH 4/6] sdk: force PFS presence update when capabilities change Do this only after we're synced and matrix is started. To force this update, we quickly appear offline then back online, although syncing isn't interrupted. --- raiden-ts/src/transport/epics/presence.ts | 30 ++++++++++++----------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/raiden-ts/src/transport/epics/presence.ts b/raiden-ts/src/transport/epics/presence.ts index 20d32cf414..88d3585f26 100644 --- a/raiden-ts/src/transport/epics/presence.ts +++ b/raiden-ts/src/transport/epics/presence.ts @@ -2,11 +2,12 @@ import isEmpty from 'lodash/isEmpty'; import isEqual from 'lodash/isEqual'; import uniq from 'lodash/uniq'; import type { Observable } from 'rxjs'; -import { combineLatest, defer, from, merge, of } from 'rxjs'; +import { combineLatest, from, merge, of } from 'rxjs'; import { catchError, concatMap, distinctUntilChanged, + endWith, exhaustMap, filter, first, @@ -15,6 +16,7 @@ import { map, mergeMap, pluck, + startWith, switchMap, tap, timeout, @@ -30,7 +32,7 @@ import type { RaidenState } from '../../state'; import type { RaidenEpicDeps } from '../../types'; import { isActionOf } from '../../utils/actions'; import { networkErrors } from '../../utils/error'; -import { catchAndLog, completeWith, retryWhile } from '../../utils/rx'; +import { catchAndLog, completeWith, mergeWith, retryWhile } from '../../utils/rx'; import type { Address } from '../../utils/types'; import { matrixPresence } from '../actions'; import { stringifyCaps } from '../utils'; @@ -162,32 +164,32 @@ export function matrixMonitorChannelPresenceEpic( * @param deps - Epics dependencies * @param deps.matrix$ - MatrixClient async subject * @param deps.config$ - Config object + * @param deps.init$ - Init$ subject * @returns Observable which never emits */ export function matrixUpdateCapsEpic( action$: Observable, {}: Observable, - { matrix$, config$ }: RaidenEpicDeps, + { matrix$, config$, init$ }: RaidenEpicDeps, ): Observable { return config$.pipe( completeWith(action$), pluck('caps'), distinctUntilChanged(isEqual), - switchMap((caps) => + withLatestFrom(init$.pipe(ignoreElements(), startWith(false), endWith(true))), + switchMap(([caps, synced]) => matrix$.pipe( - mergeMap((matrix) => - defer(async () => - matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), - ).pipe( - // trigger immediate presence updates on peers - mergeMap(async () => - matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), - ), - ), + mergeWith(async (matrix) => + matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), + ), + filter(() => synced), + mergeWith(async ([matrix]) => matrix.setPresence({ presence: 'offline', status_msg: '' })), + mergeWith(async ([[matrix]]) => + matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), ), retryWhile(intervalFromConfig(config$)), - ignoreElements(), ), ), + ignoreElements(), ); } From a775107189a4b0b303cf2289a811bbea333d1ee9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Tue, 29 Jun 2021 12:09:29 -0300 Subject: [PATCH 5/6] sdk: add changelog to capabilities update fix --- raiden-ts/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/raiden-ts/CHANGELOG.md b/raiden-ts/CHANGELOG.md index 4a55645754..cf6c46fbf9 100644 --- a/raiden-ts/CHANGELOG.md +++ b/raiden-ts/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] +### Fixed +- [#2831] Force PFS to acknowledge our capabilities updates + +[#2831]: https://github.com/raiden-network/light-client/issues/2831 + ## [1.0.0] - 2021-06-16 ### Removed - [#2571] **BREAKING** Remove ability to join and send messages to global service rooms From d6d74518764d05f44ebfdea5302c069cfb0b96a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Wed, 30 Jun 2021 10:46:46 -0300 Subject: [PATCH 6/6] sdk: update udcDeposit only from confirmed balances --- raiden-ts/src/epics.ts | 1 + raiden-ts/src/services/epics/udc.ts | 35 +++++++++++++---------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/raiden-ts/src/epics.ts b/raiden-ts/src/epics.ts index 87e38cbf37..6625dca83e 100644 --- a/raiden-ts/src/epics.ts +++ b/raiden-ts/src/epics.ts @@ -156,6 +156,7 @@ export function getLatest$( const initialStale = false; const udcDeposit$ = action$.pipe( filter(udcDeposit.success.is), + filter((action) => !('confirmed' in action.payload) || !!action.payload.confirmed), map((action) => ({ balance: action.payload.balance, totalDeposit: action.meta.totalDeposit })), // starts with max, to prevent receiving starting as disabled before actual balance is fetched startWith(initialUdcDeposit), diff --git a/raiden-ts/src/services/epics/udc.ts b/raiden-ts/src/services/epics/udc.ts index 0ba7c87578..8deb72dcbb 100644 --- a/raiden-ts/src/services/epics/udc.ts +++ b/raiden-ts/src/services/epics/udc.ts @@ -29,14 +29,7 @@ import { dispatchAndWait$ } from '../../transfers/epics/utils'; import type { RaidenEpicDeps } from '../../types'; import { isConfirmationResponseOf } from '../../utils/actions'; import { assert, commonTxErrors, ErrorCodes, networkErrors } from '../../utils/error'; -import { - catchAndLog, - completeWith, - mergeWith, - retryAsync$, - retryWhile, - takeIf, -} from '../../utils/rx'; +import { catchAndLog, completeWith, mergeWith, retryWhile, takeIf } from '../../utils/rx'; import type { Address, UInt } from '../../utils/types'; import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions'; @@ -49,32 +42,34 @@ import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions'; * @param deps.address - Our address * @param deps.latest$ - Latest observable * @param deps.userDepositContract - UserDeposit contract instance - * @param deps.provider - Eth provider + * @param deps.config$ - Config observable * @returns Observable of udcDeposited actions */ export function monitorUdcBalanceEpic( action$: Observable, {}: Observable, - { address, latest$, provider, userDepositContract }: RaidenEpicDeps, + { address, latest$, config$, userDepositContract }: RaidenEpicDeps, ): Observable { return action$.pipe( filter(newBlock.is), - startWith(null), + pluck('payload', 'blockNumber'), + withLatestFrom(config$), // it's seems ugly to call on each block, but UserDepositContract doesn't expose deposits as // events, and ethers actually do that to monitor token balances, so it's equivalent - exhaustMap(() => + exhaustMap(([blockNumber, { confirmationBlocks }]) => /* This contract's function is pure (doesn't depend on user's confirmation, gas availability, * etc), but merged on the top-level observable, therefore connectivity issues can cause * exceptions which would shutdown the SDK. Let's swallow the error here, since this will be * retried on next block, which should only be emitted after connectivity is reestablished */ - retryAsync$( - () => - Promise.all([ - userDepositContract.callStatic.effectiveBalance(address) as Promise>, - userDepositContract.callStatic.total_deposit(address) as Promise>, - ]), - provider.pollingInterval, - { onErrors: networkErrors }, + defer(async () => + Promise.all([ + userDepositContract.callStatic.effectiveBalance(address, { + blockTag: blockNumber - confirmationBlocks, + }) as Promise>, + userDepositContract.callStatic.total_deposit(address, { + blockTag: blockNumber - confirmationBlocks, + }) as Promise>, + ]), ).pipe(catchError(constant(EMPTY))), ), withLatestFrom(latest$),