Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve capabilities updates and transport syncing order #2838

Merged
merged 6 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions raiden-ts/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [Unreleased]
### Fixed
- [#2831] Force PFS to acknowledge our capabilities updates

[#2831]: https://github.com/raiden-network/light-client/issues/2831

## [1.0.0] - 2021-06-16
### Removed
- [#2571] **BREAKING** Remove ability to join and send messages to global service rooms
Expand Down
1 change: 1 addition & 0 deletions raiden-ts/src/epics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ export function getLatest$(
const initialStale = false;
const udcDeposit$ = action$.pipe(
filter(udcDeposit.success.is),
filter((action) => !('confirmed' in action.payload) || !!action.payload.confirmed),
map((action) => ({ balance: action.payload.balance, totalDeposit: action.meta.totalDeposit })),
// starts with max, to prevent receiving starting as disabled before actual balance is fetched
startWith(initialUdcDeposit),
Expand Down
98 changes: 39 additions & 59 deletions raiden-ts/src/services/epics/udc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ import { dispatchAndWait$ } from '../../transfers/epics/utils';
import type { RaidenEpicDeps } from '../../types';
import { isConfirmationResponseOf } from '../../utils/actions';
import { assert, commonTxErrors, ErrorCodes, networkErrors } from '../../utils/error';
import {
catchAndLog,
completeWith,
mergeWith,
retryAsync$,
retryWhile,
takeIf,
} from '../../utils/rx';
import { catchAndLog, completeWith, mergeWith, retryWhile, takeIf } from '../../utils/rx';
import type { Address, UInt } from '../../utils/types';
import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions';

Expand All @@ -49,32 +42,34 @@ import { udcDeposit, udcWithdraw, udcWithdrawPlan } from '../actions';
* @param deps.address - Our address
* @param deps.latest$ - Latest observable
* @param deps.userDepositContract - UserDeposit contract instance
* @param deps.provider - Eth provider
* @param deps.config$ - Config observable
* @returns Observable of udcDeposited actions
*/
export function monitorUdcBalanceEpic(
action$: Observable<RaidenAction>,
{}: Observable<RaidenState>,
{ address, latest$, provider, userDepositContract }: RaidenEpicDeps,
{ address, latest$, config$, userDepositContract }: RaidenEpicDeps,
): Observable<udcDeposit.success> {
return action$.pipe(
filter(newBlock.is),
startWith(null),
pluck('payload', 'blockNumber'),
withLatestFrom(config$),
// it's seems ugly to call on each block, but UserDepositContract doesn't expose deposits as
// events, and ethers actually do that to monitor token balances, so it's equivalent
exhaustMap(() =>
exhaustMap(([blockNumber, { confirmationBlocks }]) =>
/* This contract's function is pure (doesn't depend on user's confirmation, gas availability,
* etc), but merged on the top-level observable, therefore connectivity issues can cause
* exceptions which would shutdown the SDK. Let's swallow the error here, since this will be
* retried on next block, which should only be emitted after connectivity is reestablished */
retryAsync$(
() =>
Promise.all([
userDepositContract.callStatic.effectiveBalance(address) as Promise<UInt<32>>,
userDepositContract.callStatic.total_deposit(address) as Promise<UInt<32>>,
]),
provider.pollingInterval,
{ onErrors: networkErrors },
defer(async () =>
Promise.all([
userDepositContract.callStatic.effectiveBalance(address, {
blockTag: blockNumber - confirmationBlocks,
}) as Promise<UInt<32>>,
userDepositContract.callStatic.total_deposit(address, {
blockTag: blockNumber - confirmationBlocks,
}) as Promise<UInt<32>>,
]),
).pipe(catchError(constant(EMPTY))),
),
withLatestFrom(latest$),
Expand All @@ -90,13 +85,13 @@ function makeUdcDeposit$(
deps: Pick<RaidenEpicDeps, 'log' | 'provider' | 'config$' | 'latest$'>,
) {
const { log, provider, config$, latest$ } = deps;
let finalBalance: UInt<32>;
return defer(async () =>
Promise.all([
tokenContract.callStatic.balanceOf(sender) as Promise<UInt<32>>,
tokenContract.callStatic.allowance(sender, userDepositContract.address) as Promise<UInt<32>>,
]),
).pipe(
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }),
weilbith marked this conversation as resolved.
Show resolved Hide resolved
withLatestFrom(config$, latest$),
mergeWith(([[balance, allowance], { minimumAllowance }, { gasPrice }]) =>
approveIfNeeded$(
Expand All @@ -108,23 +103,30 @@ function makeUdcDeposit$(
{ minimumAllowance, gasPrice },
),
),
mergeMap(([[, , { gasPrice }]]) =>
defer(async () => userDepositContract.callStatic.total_deposit(address)).pipe(
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }),
mergeMap(async (deposited) => {
assert(deposited.add(deposit).eq(totalDeposit), [
ErrorCodes.UDC_DEPOSIT_OUTDATED,
{ requested: totalDeposit.toString(), current: deposited.toString() },
]);
// send setTotalDeposit transaction
return userDepositContract.deposit(address, totalDeposit, { gasPrice });
}),
),
),
mergeMap(([[, , { gasPrice, udcDeposit: prev }]]) => {
weilbith marked this conversation as resolved.
Show resolved Hide resolved
assert(prev.totalDeposit.add(deposit).eq(totalDeposit), [
ErrorCodes.UDC_DEPOSIT_OUTDATED,
{ requested: totalDeposit, current: prev.totalDeposit },
]);
finalBalance = prev.balance.add(deposit) as UInt<32>;
// send setTotalDeposit transaction
return userDepositContract.deposit(address, totalDeposit, { gasPrice });
}),
assertTx('deposit', ErrorCodes.RDN_DEPOSIT_TRANSACTION_FAILED, { log, provider }),
// retry also txFail errors, since estimateGas can lag behind just-opened channel or
// just-approved allowance
retryWhile(intervalFromConfig(config$), { onErrors: commonTxErrors, log: log.debug }),
map(([, receipt]) =>
udcDeposit.success(
weilbith marked this conversation as resolved.
Show resolved Hide resolved
{
balance: finalBalance,
txHash: receipt.transactionHash,
txBlock: receipt.blockNumber,
confirmed: undefined, // let confirmationEpic confirm this action
},
{ totalDeposit },
),
),
);
}

Expand All @@ -141,15 +143,12 @@ export function udcDepositEpic(
{}: Observable<RaidenState>,
deps: RaidenEpicDeps,
): Observable<udcDeposit.failure | udcDeposit.success> {
const { userDepositContract, getTokenContract, address, signer, main, config$, provider } = deps;
const { userDepositContract, getTokenContract, address, signer, main, config$, log } = deps;
return action$.pipe(
filter(udcDeposit.request.is),
concatMap((action) =>
retryAsync$(
() => userDepositContract.callStatic.token() as Promise<Address>,
provider.pollingInterval,
{ onErrors: networkErrors },
).pipe(
defer(async () => userDepositContract.callStatic.token() as Promise<Address>).pipe(
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, log: log.debug }),
withLatestFrom(config$),
mergeMap(([token, config]) => {
const { signer: onchainSigner, address: onchainAddress } = chooseOnchainAccount(
Expand All @@ -166,25 +165,6 @@ export function udcDepositEpic(
deps,
);
}),
mergeMap(([, receipt]) =>
retryAsync$(
() => userDepositContract.callStatic.effectiveBalance(address) as Promise<UInt<32>>,
provider.pollingInterval,
{ onErrors: networkErrors },
).pipe(
map((balance) =>
udcDeposit.success(
{
balance,
txHash: receipt.transactionHash,
txBlock: receipt.blockNumber,
confirmed: undefined, // let confirmationEpic confirm this action
},
action.meta,
),
),
),
),
catchError((error) => of(udcDeposit.failure(error, action.meta))),
),
),
Expand Down
77 changes: 30 additions & 47 deletions raiden-ts/src/transport/epics/init.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import * as t from 'io-ts';
import constant from 'lodash/constant';
import isEmpty from 'lodash/isEmpty';
import sortBy from 'lodash/sortBy';
import type { Filter, LoginPayload, MatrixClient } from 'matrix-js-sdk';
import type { Filter, MatrixClient } from 'matrix-js-sdk';
import { createClient } from 'matrix-js-sdk';
import { logger as matrixLogger } from 'matrix-js-sdk/lib/logger';
import type { AsyncSubject, Observable } from 'rxjs';
import { combineLatest, defer, EMPTY, from, merge, of, throwError, timer } from 'rxjs';
import type { Observable } from 'rxjs';
import { combineLatest, defer, EMPTY, from, merge, of, throwError } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import {
catchError,
concatMap,
delayWhen,
endWith,
filter,
first,
ignoreElements,
Expand All @@ -28,7 +27,6 @@ import {
} from 'rxjs/operators';

import type { RaidenAction } from '../../actions';
import type { RaidenConfig } from '../../config';
import { intervalFromConfig } from '../../config';
import { RAIDEN_DEVICE_ID } from '../../constants';
import type { RaidenState } from '../../state';
Expand All @@ -40,8 +38,6 @@ import { completeWith, lastMap, pluckDistinct, retryWhile } from '../../utils/rx
import { decode } from '../../utils/types';
import { matrixSetup } from '../actions';
import type { RaidenMatrixSetup } from '../state';
import type { Caps } from '../types';
import { stringifyCaps } from '../utils';

/**
* Creates and returns a matrix filter. The filter reduces the size of the initial sync by
Expand Down Expand Up @@ -71,8 +67,7 @@ async function createMatrixFilter(matrix: MatrixClient, notRooms: string[] = [])
function startMatrixSync(
action$: Observable<RaidenAction>,
matrix: MatrixClient,
matrix$: AsyncSubject<MatrixClient>,
config$: Observable<RaidenConfig>,
{ matrix$, config$, init$ }: Pick<RaidenEpicDeps, 'matrix$' | 'config$' | 'init$'>,
) {
return action$.pipe(
filter(matrixSetup.is),
Expand All @@ -81,25 +76,17 @@ function startMatrixSync(
matrix$.next(matrix);
matrix$.complete();
}),
withLatestFrom(config$),
// wait 1s before starting matrix, so event listeners can be registered
delayWhen(([, { pollingInterval }]) => timer(Math.ceil(pollingInterval / 5))),
mergeMap(([, config]) =>
defer(async () => createMatrixFilter(matrix)).pipe(
mergeMap(async (filter) => {
await matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true);
return filter;
}),
mergeMap(async (filter) => matrix.startClient({ filter })),
mergeMap(() => {
// after [15-45]s (default) random delay after starting, update/reload presence
return timer(Math.round((Math.random() + 0.5) * config.httpTimeout)).pipe(
mergeMap(async () =>
matrix.setPresence({ presence: 'online', status_msg: Date.now().toString() }),
),
);
}),
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }),
mergeMap(() =>
defer(async () =>
Promise.all([
createMatrixFilter(matrix),
matrix.setPushRuleEnabled('global', 'override', '.m.rule.master', true),
]),
).pipe(
// delay startClient (going online) to after raiden is synced
delayWhen(() => init$.pipe(ignoreElements(), endWith(true))),
mergeMap(async ([filter]) => matrix.startClient({ filter })),
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }),
),
),
ignoreElements(),
Expand Down Expand Up @@ -171,14 +158,12 @@ function fetchSortedMatrixServers$(matrixServerLookup: string, httpTimeout: numb
* @param deps.address - Our address (to compose matrix user)
* @param deps.signer - Signer to be used to sign password and displayName
* @param deps.config$ - Config observable
* @param caps - Transport capabilities to set in user's avatar_url
* @returns Observable of one { matrix, server, setup } object
*/
function setupMatrixClient$(
server: string,
setup: RaidenMatrixSetup | undefined,
{ address, signer, config$ }: Pick<RaidenEpicDeps, 'address' | 'signer' | 'config$'>,
caps: Caps | null,
) {
const homeserver = getServerName(server);
assert(homeserver, [ErrorCodes.TRNS_NO_SERVERNAME, { server }]);
Expand Down Expand Up @@ -211,16 +196,18 @@ function setupMatrixClient$(
}),
).pipe(
catchError(async (err) => {
return matrix
.registerRequest({
try {
return await matrix.registerRequest({
username,
password,
device_id: RAIDEN_DEVICE_ID,
})
.catch(constant(err)) as Promise<LoginPayload>;
// if register fails, throws login error as it's more informative
});
} catch (e) {
// if register fails, throws login error as it's more informative
throw err;
}
}),
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }),
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors, maxRetries: 3 }),
),
),
mergeMap(({ access_token, device_id, user_id }) => {
Expand Down Expand Up @@ -253,12 +240,7 @@ function setupMatrixClient$(
// the APIs below are authenticated, and therefore also act as validator
mergeMap(({ matrix, server, setup }) =>
// set these properties before starting sync
defer(() =>
Promise.all([
matrix.setDisplayName(setup.displayName),
matrix.setAvatarUrl(caps && !isEmpty(caps) ? stringifyCaps(caps) : ''),
]),
).pipe(
defer(async () => matrix.setDisplayName(setup.displayName)).pipe(
retryWhile(intervalFromConfig(config$), { onErrors: networkErrors }),
mapTo({ matrix, server, setup }), // return triplet again
),
Expand All @@ -285,11 +267,12 @@ function setupMatrixClient$(
export function initMatrixEpic(
action$: Observable<RaidenAction>,
{}: Observable<RaidenState>,
{ address, signer, matrix$, latest$, config$, init$ }: RaidenEpicDeps,
deps: RaidenEpicDeps,
): Observable<matrixSetup> {
const { matrix$, latest$, config$, init$ } = deps;
return combineLatest([latest$, config$]).pipe(
first(), // at startup
mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout, caps }]) => {
mergeMap(([{ state }, { matrixServer, matrixServerLookup, httpTimeout }]) => {
const server = state.transport.server,
setup = state.transport.setup;
// when matrix$ async subject completes, transport init task is completed
Expand Down Expand Up @@ -321,7 +304,7 @@ export function initMatrixEpic(
catchError(andSuppress), // servers$ may error, so store lastError
concatMap(({ server, setup }) =>
// serially, try setting up client and validate its credential
setupMatrixClient$(server, setup, { address, signer, config$ }, caps).pipe(
setupMatrixClient$(server, setup, deps).pipe(
// store and suppress any 'setupMatrixClient$' error
catchError(andSuppress),
),
Expand All @@ -341,7 +324,7 @@ export function initMatrixEpic(
mergeMap(({ matrix, server, setup }) =>
merge(
// wait for matrixSetup through reducer, then resolves matrix$ with client and starts it
startMatrixSync(action$, matrix, matrix$, config$),
startMatrixSync(action$, matrix, deps),
// emit matrixSetup in parallel to be persisted in state
of(matrixSetup({ server, setup })),
// monitor config.logger & disable or re-enable matrix's logger accordingly
Expand Down
Loading