diff --git a/raiden-ts/CHANGELOG.md b/raiden-ts/CHANGELOG.md index 4a55645754..cf6c46fbf9 100644 --- a/raiden-ts/CHANGELOG.md +++ b/raiden-ts/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] +### Fixed +- [#2831] Force PFS to acknowledge our capabilities updates + +[#2831]: https://github.com/raiden-network/light-client/issues/2831 + ## [1.0.0] - 2021-06-16 ### Removed - [#2571] **BREAKING** Remove ability to join and send messages to global service rooms diff --git a/raiden-ts/src/epics.ts b/raiden-ts/src/epics.ts index 87e38cbf37..6625dca83e 100644 --- a/raiden-ts/src/epics.ts +++ b/raiden-ts/src/epics.ts @@ -156,6 +156,7 @@ export function getLatest$( const initialStale = false; const udcDeposit$ = action$.pipe( filter(udcDeposit.success.is), + filter((action) => !('confirmed' in action.payload) || !!action.payload.confirmed), map((action) => ({ balance: action.payload.balance, totalDeposit: action.meta.totalDeposit })), // starts with max, to prevent receiving starting as disabled before actual balance is fetched startWith(initialUdcDeposit), diff --git a/raiden-ts/src/services/epics/udc.ts b/raiden-ts/src/services/epics/udc.ts index 77ebfaa3ef..8deb72dcbb 100644 --- a/raiden-ts/src/services/epics/udc.ts +++ b/raiden-ts/src/services/epics/udc.ts @@ -29,14 +29,7 @@ import { dispatchAndWait$ } from '../../transfers/epics/utils'; import type { RaidenEpicDeps } from '../../types'; import { isConfirmationResponseOf } from '../../utils/actions'; import { assert, commonTxErrors, ErrorCodes, networkErrors } from '../../utils/error'; -import { - catchAndLog, - completeWith, - mergeWith, - retryAsync$, - retryWhile, - takeIf, -} from '../../utils/rx'; +import { catchAndLog, completeWith, mergeWith, retryWhile, takeIf } from '../../utils/rx'; import type { Address, UInt } from '../../utils/types'; import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions'; @@ -49,32 +42,34 @@ import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions'; * @param deps.address - Our address * @param deps.latest$ - Latest observable * @param deps.userDepositContract - UserDeposit contract instance - * @param deps.provider - Eth provider + * @param deps.config$ - Config observable * @returns Observable of udcDeposited actions */ export function monitorUdcBalanceEpic( action$: Observable, {}: Observable, - { address, latest$, provider, userDepositContract }: RaidenEpicDeps, + { address, latest$, config$, userDepositContract }: RaidenEpicDeps, ): Observable { return action$.pipe( filter(newBlock.is), - startWith(null), + pluck('payload', 'blockNumber'), + withLatestFrom(config$), // it's seems ugly to call on each block, but UserDepositContract doesn't expose deposits as // events, and ethers actually do that to monitor token balances, so it's equivalent - exhaustMap(() => + exhaustMap(([blockNumber, { confirmationBlocks }]) => /* This contract's function is pure (doesn't depend on user's confirmation, gas availability, * etc), but merged on the top-level observable, therefore connectivity issues can cause * exceptions which would shutdown the SDK. Let's swallow the error here, since this will be * retried on next block, which should only be emitted after connectivity is reestablished */ - retryAsync$( - () => - Promise.all([ - userDepositContract.callStatic.effectiveBalance(address) as Promise>, - userDepositContract.callStatic.total_deposit(address) as Promise>, - ]), - provider.pollingInterval, - { onErrors: networkErrors }, + defer(async () => + Promise.all([ + userDepositContract.callStatic.effectiveBalance(address, { + blockTag: blockNumber - confirmationBlocks, + }) as Promise>, + userDepositContract.callStatic.total_deposit(address, { + blockTag: blockNumber - confirmationBlocks, + }) as Promise>, + ]), ).pipe(catchError(constant(EMPTY))), ), withLatestFrom(latest$), @@ -90,13 +85,13 @@ function makeUdcDeposit$( deps: Pick, ) { const { log, provider, config$, latest$ } = deps; + let finalBalance: UInt<32>; return defer(async () => Promise.all([ tokenContract.callStatic.balanceOf(sender) as Promise>, tokenContract.callStatic.allowance(sender, userDepositContract.address) as Promise>, ]), ).pipe( - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), withLatestFrom(config$, latest$), mergeWith(([[balance, allowance], { minimumAllowance }, { gasPrice }]) => approveIfNeeded$( @@ -108,23 +103,30 @@ function makeUdcDeposit$( { minimumAllowance, gasPrice }, ), ), - mergeMap(([[, , { gasPrice }]]) => - defer(async () => userDepositContract.callStatic.total_deposit(address)).pipe( - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), - mergeMap(async (deposited) => { - assert(deposited.add(deposit).eq(totalDeposit), [ - ErrorCodes.UDC_DEPOSIT_OUTDATED, - { requested: totalDeposit.toString(), current: deposited.toString() }, - ]); - // send setTotalDeposit transaction - return userDepositContract.deposit(address, totalDeposit, { gasPrice }); - }), - ), - ), + mergeMap(([[, , { gasPrice, udcDeposit: prev }]]) => { + assert(prev.totalDeposit.add(deposit).eq(totalDeposit), [ + ErrorCodes.UDC_DEPOSIT_OUTDATED, + { requested: totalDeposit, current: prev.totalDeposit }, + ]); + finalBalance = prev.balance.add(deposit) as UInt<32>; + // send setTotalDeposit transaction + return userDepositContract.deposit(address, totalDeposit, { gasPrice }); + }), assertTx('deposit', ErrorCodes.RDN_DEPOSIT_TRANSACTION_FAILED, { log, provider }), // retry also txFail errors, since estimateGas can lag behind just-opened channel or // just-approved allowance retryWhile(intervalFromConfig(config$), { onErrors: commonTxErrors, log: log.debug }), + map(([, receipt]) => + udcDeposit.success( + { + balance: finalBalance, + txHash: receipt.transactionHash, + txBlock: receipt.blockNumber, + confirmed: undefined, // let confirmationEpic confirm this action + }, + { totalDeposit }, + ), + ), ); } @@ -141,15 +143,12 @@ export function udcDepositEpic( {}: Observable, deps: RaidenEpicDeps, ): Observable { - const { userDepositContract, getTokenContract, address, signer, main, config$, provider } = deps; + const { userDepositContract, getTokenContract, address, signer, main, config$, log } = deps; return action$.pipe( filter(udcDeposit.request.is), concatMap((action) => - retryAsync$( - () => userDepositContract.callStatic.token() as Promise
, - provider.pollingInterval, - { onErrors: networkErrors }, - ).pipe( + defer(async () => userDepositContract.callStatic.token() as Promise
).pipe( + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, log: log.debug }), withLatestFrom(config$), mergeMap(([token, config]) => { const { signer: onchainSigner, address: onchainAddress } = chooseOnchainAccount( @@ -166,25 +165,6 @@ export function udcDepositEpic( deps, ); }), - mergeMap(([, receipt]) => - retryAsync$( - () => userDepositContract.callStatic.effectiveBalance(address) as Promise>, - provider.pollingInterval, - { onErrors: networkErrors }, - ).pipe( - map((balance) => - udcDeposit.success( - { - balance, - txHash: receipt.transactionHash, - txBlock: receipt.blockNumber, - confirmed: undefined, // let confirmationEpic confirm this action - }, - action.meta, - ), - ), - ), - ), catchError((error) => of(udcDeposit.failure(error, action.meta))), ), ), diff --git a/raiden-ts/src/transport/epics/init.ts b/raiden-ts/src/transport/epics/init.ts index 6b0553e8ca..465e6e943e 100644 --- a/raiden-ts/src/transport/epics/init.ts +++ b/raiden-ts/src/transport/epics/init.ts @@ -1,17 +1,16 @@ import * as t from 'io-ts'; -import constant from 'lodash/constant'; -import isEmpty from 'lodash/isEmpty'; import sortBy from 'lodash/sortBy'; -import type { Filter, LoginPayload, MatrixClient } from 'matrix-js-sdk'; +import type { Filter, MatrixClient } from 'matrix-js-sdk'; import { createClient } from 'matrix-js-sdk'; import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger'; -import type { AsyncSubject, Observable } from 'rxjs'; -import { combineLatest, defer, EMPTY, from, merge, of, throwError, timer } from 'rxjs'; +import type { Observable } from 'rxjs'; +import { combineLatest, defer, EMPTY, from, merge, of, throwError } from 'rxjs'; import { fromFetch } from 'rxjs/fetch'; import { catchError, concatMap, delayWhen, + endWith, filter, first, ignoreElements, @@ -28,7 +27,6 @@ import { } from 'rxjs/operators'; import type { RaidenAction } from '../../actions'; -import type { RaidenConfig } from '../../config'; import { intervalFromConfig } from '../../config'; import { RAIDEN_DEVICE_ID } from '../../constants'; import type { RaidenState } from '../../state'; @@ -40,8 +38,6 @@ import { completeWith, lastMap, pluckDistinct, retryWhile } from '../../utils/rx import { decode } from '../../utils/types'; import { matrixSetup } from '../actions'; import type { RaidenMatrixSetup } from '../state'; -import type { Caps } from '../types'; -import { stringifyCaps } from '../utils'; /** * Creates and returns a matrix filter. The filter reduces the size of the initial sync by @@ -71,8 +67,7 @@ async function createMatrixFilter(matrix: MatrixClient, notRooms: string[] = []) function startMatrixSync( action$: Observable, matrix: MatrixClient, - matrix$: AsyncSubject, - config$: Observable, + { matrix$, config$, init$ }: Pick, ) { return action$.pipe( filter(matrixSetup.is), @@ -81,25 +76,17 @@ function startMatrixSync( matrix$.next(matrix); matrix$.complete(); }), - withLatestFrom(config$), - // wait 1s before starting matrix, so event listeners can be registered - delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))), - mergeMap(([, config]) => - defer(async () => createMatrixFilter(matrix)).pipe( - mergeMap(async (filter) => { - await matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true); - return filter; - }), - mergeMap(async (filter) => matrix.startClient({ filter })), - mergeMap(() => { - // after [15-45]s (default) random delay after starting, update/reload presence - return timer(Math.round((Math.random() + 0.5) * config.httpTimeout)).pipe( - mergeMap(async () => - matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), - ), - ); - }), - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), + mergeMap(() => + defer(async () => + Promise.all([ + createMatrixFilter(matrix), + matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true), + ]), + ).pipe( + // delay startClient (going online) to after raiden is synced + delayWhen(() => init$.pipe(ignoreElements(), endWith(true))), + mergeMap(async ([filter]) => matrix.startClient({ filter })), + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }), ), ), ignoreElements(), @@ -171,14 +158,12 @@ function fetchSortedMatrixServers$(matrixServerLookup: string, httpTimeout: numb * @param deps.address - Our address (to compose matrix user) * @param deps.signer - Signer to be used to sign password and displayName * @param deps.config$ - Config observable - * @param caps - Transport capabilities to set in user's avatar_url * @returns Observable of one { matrix, server, setup } object */ function setupMatrixClient$( server: string, setup: RaidenMatrixSetup | undefined, { address, signer, config$ }: Pick, - caps: Caps | null, ) { const homeserver = getServerName(server); assert(homeserver, [ErrorCodes.TRNS_NO_SERVERNAME, { server }]); @@ -211,16 +196,18 @@ function setupMatrixClient$( }), ).pipe( catchError(async (err) => { - return matrix - .registerRequest({ + try { + return await matrix.registerRequest({ username, password, device_id: RAIDEN_DEVICE_ID, - }) - .catch(constant(err)) as Promise; - // if register fails, throws login error as it's more informative + }); + } catch (e) { + // if register fails, throws login error as it's more informative + throw err; + } }), - retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), + retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }), ), ), mergeMap(({ access_token, device_id, user_id }) => { @@ -253,12 +240,7 @@ function setupMatrixClient$( // the APIs below are authenticated, and therefore also act as validator mergeMap(({ matrix, server, setup }) => // set these properties before starting sync - defer(() => - Promise.all([ - matrix.setDisplayName(setup.displayName), - matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), - ]), - ).pipe( + defer(async () => matrix.setDisplayName(setup.displayName)).pipe( retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }), mapTo({ matrix, server, setup }), // return triplet again ), @@ -285,11 +267,12 @@ function setupMatrixClient$( export function initMatrixEpic( action$: Observable, {}: Observable, - { address, signer, matrix$, latest$, config$, init$ }: RaidenEpicDeps, + deps: RaidenEpicDeps, ): Observable { + const { matrix$, latest$, config$, init$ } = deps; return combineLatest([latest$, config$]).pipe( first(), // at startup - mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout, caps }]) => { + mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout }]) => { const server = state.transport.server, setup = state.transport.setup; // when matrix$ async subject completes, transport init task is completed @@ -321,7 +304,7 @@ export function initMatrixEpic( catchError(andSuppress), // servers$ may error, so store lastError concatMap(({ server, setup }) => // serially, try setting up client and validate its credential - setupMatrixClient$(server, setup, { address, signer, config$ }, caps).pipe( + setupMatrixClient$(server, setup, deps).pipe( // store and suppress any 'setupMatrixClient$' error catchError(andSuppress), ), @@ -341,7 +324,7 @@ export function initMatrixEpic( mergeMap(({ matrix, server, setup }) => merge( // wait for matrixSetup through reducer, then resolves matrix$ with client and starts it - startMatrixSync(action$, matrix, matrix$, config$), + startMatrixSync(action$, matrix, deps), // emit matrixSetup in parallel to be persisted in state of(matrixSetup({ server, setup })), // monitor config.logger & disable or re-enable matrix's logger accordingly diff --git a/raiden-ts/src/transport/epics/presence.ts b/raiden-ts/src/transport/epics/presence.ts index d27474cb35..88d3585f26 100644 --- a/raiden-ts/src/transport/epics/presence.ts +++ b/raiden-ts/src/transport/epics/presence.ts @@ -2,11 +2,12 @@ import isEmpty from 'lodash/isEmpty'; import isEqual from 'lodash/isEqual'; import uniq from 'lodash/uniq'; import type { Observable } from 'rxjs'; -import { combineLatest, defer, from, merge, of } from 'rxjs'; +import { combineLatest, from, merge, of } from 'rxjs'; import { catchError, concatMap, distinctUntilChanged, + endWith, exhaustMap, filter, first, @@ -15,7 +16,7 @@ import { map, mergeMap, pluck, - skip, + startWith, switchMap, tap, timeout, @@ -31,7 +32,7 @@ import type { RaidenState } from '../../state'; import type { RaidenEpicDeps } from '../../types'; import { isActionOf } from '../../utils/actions'; import { networkErrors } from '../../utils/error'; -import { catchAndLog, completeWith, retryWhile } from '../../utils/rx'; +import { catchAndLog, completeWith, mergeWith, retryWhile } from '../../utils/rx'; import type { Address } from '../../utils/types'; import { matrixPresence } from '../actions'; import { stringifyCaps } from '../utils'; @@ -156,40 +157,39 @@ export function matrixMonitorChannelPresenceEpic( } /** - * Update our matrix's avatarUrl on config.caps changes + * Update our matrix's avatarUrl on config.caps on startup and changes * * @param action$ - Observable of RaidenActions * @param state$ - Observable of RaidenStates * @param deps - Epics dependencies * @param deps.matrix$ - MatrixClient async subject * @param deps.config$ - Config object + * @param deps.init$ - Init$ subject * @returns Observable which never emits */ export function matrixUpdateCapsEpic( action$: Observable, {}: Observable, - { matrix$, config$ }: RaidenEpicDeps, + { matrix$, config$, init$ }: RaidenEpicDeps, ): Observable { return config$.pipe( completeWith(action$), pluck('caps'), distinctUntilChanged(isEqual), - skip(1), // skip replay(1) and act only on changes - switchMap((caps) => + withLatestFrom(init$.pipe(ignoreElements(), startWith(false), endWith(true))), + switchMap(([caps, synced]) => matrix$.pipe( - mergeMap((matrix) => - defer(async () => - matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), - ).pipe( - // trigger immediate presence updates on peers - mergeMap(async () => - matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), - ), - ), + mergeWith(async (matrix) => + matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''), + ), + filter(() => synced), + mergeWith(async ([matrix]) => matrix.setPresence({ presence: 'offline', status_msg: '' })), + mergeWith(async ([[matrix]]) => + matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }), ), retryWhile(intervalFromConfig(config$)), - ignoreElements(), ), ), + ignoreElements(), ); } diff --git a/raiden-ts/tests/integration/transport.spec.ts b/raiden-ts/tests/integration/transport.spec.ts index 1bff94fbe9..cc96ea06c8 100644 --- a/raiden-ts/tests/integration/transport.spec.ts +++ b/raiden-ts/tests/integration/transport.spec.ts @@ -108,7 +108,7 @@ describe('initMatrixEpic', () => { afterEach(() => jest.restoreAllMocks()); test('matrix stored setup', async () => { - expect.assertions(4); + expect.assertions(3); const userId = `@${raiden.address.toLowerCase()}:${matrixServer}`; const displayName = await raiden.deps.signer.signMessage(userId); @@ -138,14 +138,6 @@ describe('initMatrixEpic', () => { ); // ensure if stored setup works, servers list don't need to be fetched expect(fetch).not.toHaveBeenCalled(); - - // test presence got set again after some time, to overcome presence bug - await sleep(2 * raiden.config.httpTimeout); - // test presence got set again after some time, to overcome presence bug - expect(matrix.setPresence).toHaveBeenCalledWith({ - presence: 'online', - status_msg: expect.any(String), - }); }); test('matrix server config set without stored setup', async () => { diff --git a/raiden-ts/tests/integration/udc.spec.ts b/raiden-ts/tests/integration/udc.spec.ts index bcc9d5dd1e..55d49c9558 100644 --- a/raiden-ts/tests/integration/udc.spec.ts +++ b/raiden-ts/tests/integration/udc.spec.ts @@ -170,7 +170,6 @@ describe('udcDepositEpic', () => { const depositTx = makeTransaction(undefined, { to: userDepositContract.address }); userDepositContract.deposit.mockResolvedValue(depositTx); - userDepositContract.effectiveBalance.mockResolvedValue(balance); await raiden.start(); raiden.store.dispatch(udcDeposit.request({ deposit }, { totalDeposit: balance }));