Skip to content

Commit

Permalink
sdk: split channelOpenEpic for better code readability
Browse files Browse the repository at this point in the history
  • Loading branch information
andrevmatos committed Sep 4, 2021
1 parent b26803d commit acac79e
Showing 1 changed file with 91 additions and 63 deletions.
154 changes: 91 additions & 63 deletions raiden-ts/src/channels/epics/open.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import constant from 'lodash/constant';
import findKey from 'lodash/findKey';
import type { Observable } from 'rxjs';
import { defer, EMPTY, merge, of } from 'rxjs';
import { EMPTY, merge, of } from 'rxjs';
import {
catchError,
filter,
first,
ignoreElements,
mapTo,
mergeMap,
mergeMapTo,
raceWith,
take,
takeUntil,
Expand All @@ -16,17 +17,78 @@ import {

import type { RaidenAction } from '../../actions';
import { intervalFromConfig } from '../../config';
import type { HumanStandardToken, TokenNetwork } from '../../contracts';
import { chooseOnchainAccount, getContractWithSigner } from '../../helpers';
import type { RaidenState } from '../../state';
import type { RaidenEpicDeps } from '../../types';
import { isActionOf } from '../../utils/actions';
import { commonAndFailTxErrors, ErrorCodes, RaidenError } from '../../utils/error';
import { checkContractHasMethod$ } from '../../utils/ethers';
import { retryWhile } from '../../utils/rx';
import type { Address } from '../../utils/types';
import type { Address, UInt } from '../../utils/types';
import { channelDeposit, channelOpen } from '../actions';
import { assertTx, channelKey, ensureApprovedBalance$ } from '../utils';

function openWithDeposit$(
[tokenNetworkContract, tokenContract]: readonly [TokenNetwork, HumanStandardToken],
[partner, deposit, settleTimeout, raced$]: readonly [
Address,
UInt<32>,
number,
Observable<unknown>,
],
deps: RaidenEpicDeps,
): Observable<boolean> {
const { address, latest$, config$, log } = deps;
const tokenNetwork = tokenNetworkContract.address as Address;
// if we need to deposit and contract supports 'openChannelWithDeposit' (0.39+),
// we must ensureApprovedBalance$ ourselves and then call the method to open+deposit
return ensureApprovedBalance$(tokenContract, tokenNetwork, deposit, deps).pipe(
withLatestFrom(latest$),
mergeMap(async ([, { gasPrice }]) =>
tokenNetworkContract.openChannelWithDeposit(address, partner, settleTimeout, deposit, {
gasPrice,
}),
),
assertTx('openChannelWithDeposit', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps),
retryWhile(intervalFromConfig(config$), {
onErrors: commonAndFailTxErrors,
log: log.info,
}),
mapTo(false), // should not deposit, as tx succeeded
// raceWith acts like takeUntil, but besides unsubscribing from retryWhile if channel
// gets opened by partner, also requests channelDeposit then
raceWith(raced$.pipe(take(1), mapTo(true))), // should deposit, as open raced
);
}

function openAndThenDeposit$(
tokenNetworkContract: TokenNetwork,
[partner, settleTimeout, raced$]: readonly [Address, number, Observable<unknown>],
deps: RaidenEpicDeps,
) {
const { address, log, latest$, config$ } = deps;
return merge(
of(true), // should deposit in parallel (approve + deposit[waitOpen])
latest$.pipe(
first(),
mergeMap(async ({ gasPrice }) =>
tokenNetworkContract.openChannel(address, partner, settleTimeout, { gasPrice }),
),
assertTx('openChannel', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps),
// also retry txFailErrors on open$ only; deposit$ (if not EMPTY) is handled by
// channelDepositEpic
retryWhile(intervalFromConfig(config$), {
onErrors: commonAndFailTxErrors,
log: log.info,
}),
ignoreElements(), // ignore success so it's picked by channelEventsEpic
// if channel gets opened while retrying (e.g. by partner), give up retry
takeUntil(raced$),
),
);
}

/**
* A channelOpen action requested by user
* Needs to be called on a previously monitored tokenNetwork. Calls TokenNetwork.openChannel
Expand All @@ -53,11 +115,11 @@ export function channelOpenEpic(
state$: Observable<RaidenState>,
deps: RaidenEpicDeps,
): Observable<channelOpen.failure | channelDeposit.request> {
const { log, address, getTokenContract, getTokenNetworkContract, config$, latest$ } = deps;
const { getTokenContract, getTokenNetworkContract, config$ } = deps;
return action$.pipe(
filter(isActionOf(channelOpen.request)),
withLatestFrom(state$, config$, latest$),
mergeMap(([action, state, { settleTimeout, subkey: configSubkey }, { gasPrice }]) => {
withLatestFrom(state$, config$),
mergeMap(([action, state, { settleTimeout: configSettleTimeout, subkey: configSubkey }]) => {
const { tokenNetwork, partner } = action.meta;
const channelState = state.channels[channelKey(action.meta)]?.state;
// fails if channel already exist
Expand All @@ -76,6 +138,7 @@ export function channelOpenEpic(
getTokenNetworkContract(tokenNetwork),
onchainSigner,
);
const settleTimeout = action.payload.settleTimeout ?? configSettleTimeout;

return checkContractHasMethod$(tokenNetworkContract, 'openChannelWithDeposit').pipe(
catchError(constant(of(false))),
Expand All @@ -86,70 +149,35 @@ export function channelOpenEpic(
filter((a) => a.meta.tokenNetwork === tokenNetwork && a.meta.partner === partner),
);

let deposit$: Observable<channelDeposit.request> = EMPTY;
// in case we need to deposit and contract doesn't support 'openChannelWithDeposit'
// method (legacy 0.37), emit channelDeposit.request with waitOpen=true, which will
// ensureApprovedBalance$ (in parallel with open) and deposit once open tx is confirmed
if (deposit?.gt(0))
deposit$ = of(
channelDeposit.request(
{ deposit, subkey: action.payload.subkey, waitOpen: true },
action.meta,
),
);

let open$: Observable<boolean>;
if (deposit?.gt(0) && hasMethod) {
const token = findKey(state.tokens, (tn) => tn === tokenNetwork)! as Address;
const tokenContract = getContractWithSigner(getTokenContract(token), onchainSigner);
// if we need to deposit and contract supports 'openChannelWithDeposit' (0.39+),
// we must ensureApprovedBalance$ ourselves and then call the method to open+deposit
return ensureApprovedBalance$(tokenContract, tokenNetwork, deposit, deps).pipe(
mergeMap(async () =>
tokenNetworkContract.openChannelWithDeposit(
address,
partner,
action.payload.settleTimeout ?? settleTimeout,
deposit,
{ gasPrice },
),
),
assertTx('openChannelWithDeposit', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps),
retryWhile(intervalFromConfig(config$), {
onErrors: commonAndFailTxErrors,
log: log.info,
}),
ignoreElements(), // ignore success so it's picked by channelEventsEpic
// raceWith acts like takeUntil, but besides unsubscribing from retryWhile if channel
// gets opened by partner, also requests channelDeposit then
raceWith(openedByPartner$.pipe(take(1), mergeMapTo(deposit$))),
catchError((error) => of(channelOpen.failure(error, action.meta))),
open$ = openWithDeposit$(
[tokenNetworkContract, tokenContract],
[partner, deposit, settleTimeout, openedByPartner$],
deps,
);
} else {
return merge(
deposit$,
defer(async () =>
tokenNetworkContract.openChannel(
address,
partner,
action.payload.settleTimeout ?? settleTimeout,
{ gasPrice },
),
).pipe(
assertTx('openChannel', ErrorCodes.CNL_OPENCHANNEL_FAILED, deps),
// also retry txFailErrors on open$ only; deposit$ (if not EMPTY) is handled by
// channelDepositEpic
retryWhile(intervalFromConfig(config$), {
onErrors: commonAndFailTxErrors,
log: log.info,
}),
// ignore success so it's picked by channelEventsEpic
ignoreElements(),
// if channel gets opened while retrying (e.g. by partner), give up to avoid erroring
takeUntil(openedByPartner$),
catchError((error) => of(channelOpen.failure(error, action.meta))),
),
open$ = openAndThenDeposit$(
tokenNetworkContract,
[partner, settleTimeout, openedByPartner$],
deps,
);
}
return open$.pipe(
mergeMap((shouldDeposit) =>
shouldDeposit && deposit?.gt(0)
? of(
channelDeposit.request(
{ deposit, subkey: action.payload.subkey, waitOpen: true },
action.meta,
),
)
: EMPTY,
),
catchError((error) => of(channelOpen.failure(error, action.meta))),
);
}),
);
}),
Expand Down

0 comments on commit acac79e

Please sign in to comment.