From acac79ef11d3c444ebe608656ed5871cd2c653d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Vitor=20de=20Lima=20Matos?= Date: Sat, 4 Sep 2021 15:57:01 -0300 Subject: [PATCH] sdk: split channelOpenEpic for better code readability --- raiden-ts/src/channels/epics/open.ts | 154 ++++++++++++++++----------- 1 file changed, 91 insertions(+), 63 deletions(-) diff --git a/raiden-ts/src/channels/epics/open.ts b/raiden-ts/src/channels/epics/open.ts index 502501e62e..2654e559af 100644 --- a/raiden-ts/src/channels/epics/open.ts +++ b/raiden-ts/src/channels/epics/open.ts @@ -1,13 +1,14 @@ import constant from 'lodash/constant'; import findKey from 'lodash/findKey'; import type { Observable } from 'rxjs'; -import { defer, EMPTY, merge, of } from 'rxjs'; +import { EMPTY, merge, of } from 'rxjs'; import { catchError, filter, + first, ignoreElements, + mapTo, mergeMap, - mergeMapTo, raceWith, take, takeUntil, @@ -16,6 +17,7 @@ import { import type { RaidenAction } from '../../actions'; import { intervalFromConfig } from '../../config'; +import type { HumanStandardToken, TokenNetwork } from '../../contracts'; import { chooseOnchainAccount, getContractWithSigner } from '../../helpers'; import type { RaidenState } from '../../state'; import type { RaidenEpicDeps } from '../../types'; @@ -23,10 +25,70 @@ import { isActionOf } from '../../utils/actions'; import { commonAndFailTxErrors, ErrorCodes, RaidenError } from '../../utils/error'; import { checkContractHasMethod$ } from '../../utils/ethers'; import { retryWhile } from '../../utils/rx'; -import type { Address } from '../../utils/types'; +import type { Address, UInt } from '../../utils/types'; import { channelDeposit, channelOpen } from '../actions'; import { assertTx, channelKey, ensureApprovedBalance$ } from '../utils'; +function openWithDeposit$( + [tokenNetworkContract, tokenContract]: readonly [TokenNetwork, HumanStandardToken], + [partner, deposit, settleTimeout, raced$]: readonly [ + Address, + UInt<32>, + number, + Observable, + ], + deps: RaidenEpicDeps, +): Observable { + const { address, latest$, config$, log } = deps; + const tokenNetwork = tokenNetworkContract.address as Address; + // if we need to deposit and contract supports 'openChannelWithDeposit' (0.39+), + // we must ensureApprovedBalance$ ourselves and then call the method to open+deposit + return ensureApprovedBalance$(tokenContract, tokenNetwork, deposit, deps).pipe( + withLatestFrom(latest$), + mergeMap(async ([, { gasPrice }]) => + tokenNetworkContract.openChannelWithDeposit(address, partner, settleTimeout, deposit, { + gasPrice, + }), + ), + assertTx('openChannelWithDeposit', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps), + retryWhile(intervalFromConfig(config$), { + onErrors: commonAndFailTxErrors, + log: log.info, + }), + mapTo(false), // should not deposit, as tx succeeded + // raceWith acts like takeUntil, but besides unsubscribing from retryWhile if channel + // gets opened by partner, also requests channelDeposit then + raceWith(raced$.pipe(take(1), mapTo(true))), // should deposit, as open raced + ); +} + +function openAndThenDeposit$( + tokenNetworkContract: TokenNetwork, + [partner, settleTimeout, raced$]: readonly [Address, number, Observable], + deps: RaidenEpicDeps, +) { + const { address, log, latest$, config$ } = deps; + return merge( + of(true), // should deposit in parallel (approve + deposit[waitOpen]) + latest$.pipe( + first(), + mergeMap(async ({ gasPrice }) => + tokenNetworkContract.openChannel(address, partner, settleTimeout, { gasPrice }), + ), + assertTx('openChannel', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps), + // also retry txFailErrors on open$ only; deposit$ (if not EMPTY) is handled by + // channelDepositEpic + retryWhile(intervalFromConfig(config$), { + onErrors: commonAndFailTxErrors, + log: log.info, + }), + ignoreElements(), // ignore success so it's picked by channelEventsEpic + // if channel gets opened while retrying (e.g. by partner), give up retry + takeUntil(raced$), + ), + ); +} + /** * A channelOpen action requested by user * Needs to be called on a previously monitored tokenNetwork. Calls TokenNetwork.openChannel @@ -53,11 +115,11 @@ export function channelOpenEpic( state$: Observable, deps: RaidenEpicDeps, ): Observable { - const { log, address, getTokenContract, getTokenNetworkContract, config$, latest$ } = deps; + const { getTokenContract, getTokenNetworkContract, config$ } = deps; return action$.pipe( filter(isActionOf(channelOpen.request)), - withLatestFrom(state$, config$, latest$), - mergeMap(([action, state, { settleTimeout, subkey: configSubkey }, { gasPrice }]) => { + withLatestFrom(state$, config$), + mergeMap(([action, state, { settleTimeout: configSettleTimeout, subkey: configSubkey }]) => { const { tokenNetwork, partner } = action.meta; const channelState = state.channels[channelKey(action.meta)]?.state; // fails if channel already exist @@ -76,6 +138,7 @@ export function channelOpenEpic( getTokenNetworkContract(tokenNetwork), onchainSigner, ); + const settleTimeout = action.payload.settleTimeout ?? configSettleTimeout; return checkContractHasMethod$(tokenNetworkContract, 'openChannelWithDeposit').pipe( catchError(constant(of(false))), @@ -86,70 +149,35 @@ export function channelOpenEpic( filter((a) => a.meta.tokenNetwork === tokenNetwork && a.meta.partner === partner), ); - let deposit$: Observable = EMPTY; - // in case we need to deposit and contract doesn't support 'openChannelWithDeposit' - // method (legacy 0.37), emit channelDeposit.request with waitOpen=true, which will - // ensureApprovedBalance$ (in parallel with open) and deposit once open tx is confirmed - if (deposit?.gt(0)) - deposit$ = of( - channelDeposit.request( - { deposit, subkey: action.payload.subkey, waitOpen: true }, - action.meta, - ), - ); - + let open$: Observable; if (deposit?.gt(0) && hasMethod) { const token = findKey(state.tokens, (tn) => tn === tokenNetwork)! as Address; const tokenContract = getContractWithSigner(getTokenContract(token), onchainSigner); - // if we need to deposit and contract supports 'openChannelWithDeposit' (0.39+), - // we must ensureApprovedBalance$ ourselves and then call the method to open+deposit - return ensureApprovedBalance$(tokenContract, tokenNetwork, deposit, deps).pipe( - mergeMap(async () => - tokenNetworkContract.openChannelWithDeposit( - address, - partner, - action.payload.settleTimeout ?? settleTimeout, - deposit, - { gasPrice }, - ), - ), - assertTx('openChannelWithDeposit', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps), - retryWhile(intervalFromConfig(config$), { - onErrors: commonAndFailTxErrors, - log: log.info, - }), - ignoreElements(), // ignore success so it's picked by channelEventsEpic - // raceWith acts like takeUntil, but besides unsubscribing from retryWhile if channel - // gets opened by partner, also requests channelDeposit then - raceWith(openedByPartner$.pipe(take(1), mergeMapTo(deposit$))), - catchError((error) => of(channelOpen.failure(error, action.meta))), + open$ = openWithDeposit$( + [tokenNetworkContract, tokenContract], + [partner, deposit, settleTimeout, openedByPartner$], + deps, ); } else { - return merge( - deposit$, - defer(async () => - tokenNetworkContract.openChannel( - address, - partner, - action.payload.settleTimeout ?? settleTimeout, - { gasPrice }, - ), - ).pipe( - assertTx('openChannel', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps), - // also retry txFailErrors on open$ only; deposit$ (if not EMPTY) is handled by - // channelDepositEpic - retryWhile(intervalFromConfig(config$), { - onErrors: commonAndFailTxErrors, - log: log.info, - }), - // ignore success so it's picked by channelEventsEpic - ignoreElements(), - // if channel gets opened while retrying (e.g. by partner), give up to avoid erroring - takeUntil(openedByPartner$), - catchError((error) => of(channelOpen.failure(error, action.meta))), - ), + open$ = openAndThenDeposit$( + tokenNetworkContract, + [partner, settleTimeout, openedByPartner$], + deps, ); } + return open$.pipe( + mergeMap((shouldDeposit) => + shouldDeposit && deposit?.gt(0) + ? of( + channelDeposit.request( + { deposit, subkey: action.payload.subkey, waitOpen: true }, + action.meta, + ), + ) + : EMPTY, + ), + catchError((error) => of(channelOpen.failure(error, action.meta))), + ); }), ); }),