Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arbitrum: chose matrix server from PFS #3069

Merged
merged 3 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 8 additions & 62 deletions raiden-ts/src/services/epics/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import {
first,
map,
mergeMap,
pluck,
tap,
timeout,
toArray,
Expand All @@ -35,13 +34,13 @@ import { getCap, stringifyCaps } from '../../transport/utils';
import type { Latest, RaidenEpicDeps } from '../../types';
import { jsonParse, jsonStringify } from '../../utils/data';
import { assert, ErrorCodes, networkErrors, RaidenError } from '../../utils/error';
import { lastMap, retryWhile, withMergeFrom } from '../../utils/rx';
import { retryWhile, withMergeFrom } from '../../utils/rx';
import type { Address, Signature } from '../../utils/types';
import { decode, Signed, UInt } from '../../utils/types';
import { iouClear, iouPersist, pathFind } from '../actions';
import type { AddressMetadataMap, InputPaths, Paths, PFS } from '../types';
import { Fee, IOU, LastIOUResults, PfsError, PfsMode, PfsResult } from '../types';
import { packIOU, pfsInfo, pfsListInfo, signIOU } from '../utils';
import { choosePfs$, packIOU, signIOU } from '../utils';

type RouteResult = { iou: Signed<IOU> | undefined } & ({ paths: Paths } | { error: PfsError });

Expand Down Expand Up @@ -208,14 +207,14 @@ function getRouteFromPfs$(
action: pathFind.request,
deps: RaidenEpicDeps,
): Observable<RouteResult> {
return deps.config$.pipe(
first(),
withMergeFrom((config) => getPfsInfo$(action.payload.pfs, config, deps)),
withMergeFrom(([, pfs]) => prepareNextIOU$(pfs, action.meta.tokenNetwork, deps)),
withMergeFrom(([[config, pfs], iou]) =>
return choosePfs$(action.payload.pfs, deps).pipe(
first(), // pop first/best PFS
withMergeFrom((pfs) => prepareNextIOU$(pfs, action.meta.tokenNetwork, deps)),
withLatestFrom(deps.config$),
withMergeFrom(([[pfs, iou], config]) =>
requestPfs$(pfs, iou, action.meta, { address: deps.address, config }),
),
map(([[[config], iou], { response, text }]) => {
map(([[[, iou], config], { response, text }]) => {
// any decode error here will throw early and end up in catchError
const data = jsonParse(text);

Expand Down Expand Up @@ -289,59 +288,6 @@ function filterPaths$(
);
}

function getPfsInfo$(
pfsByAction: pathFind.request['payload']['pfs'],
config: Pick<RaidenConfig, 'pfsMode' | 'additionalServices'>,
deps: RaidenEpicDeps,
): Observable<PFS> {
const { log, latest$, init$ } = deps;
let pfs$: Observable<PFS> = EMPTY;
if (pfsByAction) pfs$ = of(pfsByAction);
else if (config.pfsMode === PfsMode.onlyAdditional)
pfs$ = defer(async () => {
let firstErr;
for (const pfsUrlOrAddr of config.additionalServices) {
try {
return await pfsInfo(pfsUrlOrAddr, deps);
} catch (e) {
firstErr ??= e;
}
}
throw firstErr;
});
else {
pfs$ = init$.pipe(
lastMap(() => latest$.pipe(first(), pluck('state', 'services'))),
// fetch pfsInfo from whole list & sort it
mergeMap((services) =>
pfsListInfo(config.additionalServices.concat(Object.keys(services)), deps).pipe(
map((pfsInfos) => {
log.info('Auto-selecting best PFS from:', pfsInfos);
assert(pfsInfos.length, [
ErrorCodes.PFS_INVALID_INFO,
{
services: Object.keys(services).join(','),
additionalServices: config.additionalServices.join(','),
},
]);
return pfsInfos[0]!; // pop best ranked
}),
),
),
);
}
return pfs$.pipe(
tap((pfs) => {
if (pfs.validTill < Date.now()) {
log.warn(
'WARNING: PFS registration not valid! This service deposit may have expired and it may not receive network updates anymore.',
pfs,
);
}
}),
);
}

function requestPfs$(
pfs: PFS,
iou: Signed<IOU> | undefined,
Expand Down
1 change: 1 addition & 0 deletions raiden-ts/src/services/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export const PFS = t.readonly(
t.type({
address: Address,
url: t.string,
matrixServer: t.string,
rtt: t.number,
price: UInt(32),
token: Address,
Expand Down
78 changes: 73 additions & 5 deletions raiden-ts/src/services/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,22 @@ import * as t from 'io-ts';
import memoize from 'lodash/memoize';
import uniqBy from 'lodash/uniqBy';
import type { Observable } from 'rxjs';
import { defer, EMPTY, firstValueFrom, from } from 'rxjs';
import { defer, EMPTY, firstValueFrom, from, of } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { catchError, map, mergeMap, toArray } from 'rxjs/operators';
import {
catchError,
concatMap,
delay,
first,
last,
map,
mergeAll,
mergeMap,
takeUntil,
tap,
throwIfEmpty,
toArray,
} from 'rxjs/operators';

import type { ServiceRegistry } from '../contracts';
import { MessageTypeId } from '../messages/utils';
Expand All @@ -19,11 +32,12 @@ import type { RaidenEpicDeps } from '../types';
import { encode, jsonParse } from '../utils/data';
import { assert, ErrorCodes, networkErrors, RaidenError } from '../utils/error';
import { LruCache } from '../utils/lru';
import { retryAsync$ } from '../utils/rx';
import { pluckDistinct, retryAsync$ } from '../utils/rx';
import type { PublicKey, Signature, Signed } from '../utils/types';
import { Address, decode, UInt } from '../utils/types';
import type { pathFind } from './actions';
import type { IOU, PFS } from './types';
import { AddressMetadata, PfsError } from './types';
import { AddressMetadata, PfsError, PfsMode } from './types';

const serviceRegistryToken = memoize(
async (serviceRegistryContract: ServiceRegistry, pollingInterval: number) =>
Expand Down Expand Up @@ -68,7 +82,7 @@ function validatePfsUrl(url: string) {
const pfsAddressCache_ = new LruCache<string, Promise<Address>>(32);

/**
* Returns a cold observable which fetch PFS info & validate for a given server address or URL
* Fetch PFS info & validate for a given server address or URL
*
* This is a memoized function which caches by url or address, network and registry used.
*
Expand Down Expand Up @@ -112,6 +126,7 @@ export async function pfsInfo(
*/
const PathInfo = t.type({
message: t.string,
matrix_server: t.string,
network_info: t.type({
// literals will fail if trying to decode anything different from these constants
chain_id: t.literal(network.chainId),
Expand Down Expand Up @@ -144,6 +159,7 @@ export async function pfsInfo(
return {
address,
url,
matrixServer: info.matrix_server,
rtt,
price,
token: await serviceRegistryToken(serviceRegistryContract, provider.pollingInterval),
Expand Down Expand Up @@ -351,3 +367,55 @@ export async function signIOU(signer: Signer, iou: IOU): Promise<Signed<IOU>> {
.signMessage(packIOU(iou))
.then((signature) => ({ ...iou, signature: signature as Signature }));
}

/**
* Choose best PFS and fetch info from it
*
* @param pfsByAction - Override config for this call: explicit PFS, disabled or undefined
* @param deps - Epics dependencies
* @returns Observable to choosen PFS
*/
export function choosePfs$(
pfsByAction: pathFind.request['payload']['pfs'],
deps: RaidenEpicDeps,
): Observable<PFS> {
const { log, config$, latest$, init$ } = deps;
return config$.pipe(
first(),
mergeMap(({ pfsMode, additionalServices }) => {
if (pfsByAction) return of(pfsByAction);
assert(pfsByAction !== null && pfsMode !== PfsMode.disabled, ErrorCodes.PFS_DISABLED);
if (pfsMode === PfsMode.onlyAdditional) {
let firstError: Error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for using full variable names without abbreviations. 🤗

return from(additionalServices).pipe(
concatMap((service) =>
defer(async () => pfsInfo(service, deps)).pipe(
catchError((e) => ((firstError ??= e), EMPTY)),
),
),
throwIfEmpty(() => firstError),
);
} else {
return latest$.pipe(
pluckDistinct('state', 'services'),
map((services) => [...additionalServices, ...Object.keys(services)]),
// takeUntil above first will error if, after init$ and concatenating additionalServices,
// we still could not find a valid service
takeUntil(init$.pipe(last(), delay(10))),
first((services) => services.length > 0),
// fetch pfsInfo from whole list & sort it
mergeMap((services) => pfsListInfo(services, deps)),
mergeAll(),
);
}
}),
tap((pfs) => {
if (pfs.validTill < Date.now()) {
log.warn(
'WARNING: PFS registration not valid! This service deposit may have expired and it may not receive network updates anymore.',
pfs,
);
}
}),
);
}
6 changes: 6 additions & 0 deletions raiden-ts/src/transport/epics/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
import type { RaidenAction } from '../../actions';
import { intervalFromConfig } from '../../config';
import { RAIDEN_DEVICE_ID } from '../../constants';
import { choosePfs$ } from '../../services/utils';
import type { RaidenState } from '../../state';
import type { RaidenEpicDeps } from '../../types';
import { assert } from '../../utils';
Expand Down Expand Up @@ -291,6 +292,11 @@ export function initMatrixEpic(
// previously used server
if (server) servers$Array.push(of({ server, setup }));

// server from PFSs, will prefer/pick matrixServer compatible with explicit PFS
servers$Array.push(
choosePfs$(undefined, deps).pipe(map(({ matrixServer: server }) => ({ server }))),
);

// fetched servers list
// notice it may include stored server again, but no stored setup, which could be the
// cause of the first failure, so we allow it to try again (not necessarily first)
Expand Down
2 changes: 2 additions & 0 deletions raiden-ts/tests/integration/path.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ describe('PFS: pfsRequestEpic', () => {
function makePfsInfoResponse() {
return {
message: 'pfs message',
matrix_server: 'http://transport.pfs.raiden.test',
network_info: {
chain_id: raiden.deps.network.chainId,
token_network_registry_address: raiden.deps.contractsInfo.TokenNetworkRegistry.address,
Expand Down Expand Up @@ -393,6 +394,7 @@ describe('PFS: pfsRequestEpic', () => {
pfs: {
address: pfsAddress,
url: pfsUrl,
matrixServer: makePfsInfoResponse().matrix_server,
rtt: 3,
price: One as UInt<32>,
token: (await raiden.deps.serviceRegistryContract.token()) as Address,
Expand Down
22 changes: 8 additions & 14 deletions raiden-ts/tests/integration/send.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -608,27 +608,21 @@ describe('transferRetryMessageEpic', () => {

describe('transferAutoExpireEpic', () => {
test('success!', async () => {
expect.assertions(1);
expect.assertions(2);

const [raiden, partner] = await makeRaidens(2);
const sentState = await ensureTransferPending([raiden, partner]);

// expiration confirmed, enough blocks after
await sleep(sentState.expiration * 1e3 - Date.now() + 10e3);
expect(raiden.output).toContainEqual(transferExpire.request(undefined, meta));
}, 10e3);

test("don't emit if transfer didn't expire", async () => {
expect.assertions(1);

const [raiden, partner] = await makeRaidens(2);
const sentState = await ensureTransferPending([raiden, partner]);
// not yet expired
await sleep(sentState.expiration * 1e3 - Date.now() - raiden.config.httpTimeout);
// don't emit if transfer didn't expire
await sleep(sentState.expiration * 1e3 - Date.now() - 2e3);
expect(raiden.output).not.toContainEqual(
transferExpire.request(expect.anything(), expect.anything()),
);
});

// expiration confirmed
await sleep(4e3);
expect(raiden.output).toContainEqual(transferExpire.request(undefined, meta));
}, 10e3);

test("don't expire if secret registered before expiration", async () => {
expect.assertions(1);
Expand Down
54 changes: 50 additions & 4 deletions raiden-ts/tests/integration/transport.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type { Delivered, Processed } from '@/messages/types';
import { MessageType } from '@/messages/types';
import { encodeJsonMessage, signMessage } from '@/messages/utils';
import { servicesValid } from '@/services/actions';
import { Service, ServiceDeviceId } from '@/services/types';
import { PfsMode, Service, ServiceDeviceId } from '@/services/types';
import { makeMessageId } from '@/transfers/utils';
import { matrixPresence, matrixSetup, rtcChannel } from '@/transport/actions';
import { getSortedAddresses } from '@/transport/utils';
Expand Down Expand Up @@ -195,12 +195,58 @@ describe('initMatrixEpic', () => {
expect(fetch).not.toHaveBeenCalled();
});

test('matrix fetch server from PFS', async () => {
expect.assertions(3);

// set PfsMode.onlyAdditional and ensure it uses matrixServer from PFS
raiden.store.dispatch(
raidenConfigUpdate({ matrixServer: '', pfsMode: PfsMode.onlyAdditional }),
);
const resp = {
message: '',
matrix_server: 'http://transport.raiden.test',
network_info: {
chain_id: raiden.deps.network.chainId,
token_network_registry_address: raiden.deps.contractsInfo.TokenNetworkRegistry.address,
},
operator: 'TestOp',
payment_address: makeAddress(),
price_info: '100',
version: '0.1.2',
};
fetch.mockResolvedValueOnce({
ok: true,
status: 200,
text: jest.fn(async () => jsonStringify(resp)),
json: jest.fn(async () => resp),
});
await raiden.start();
await sleep();

expect(raiden.output).toContainEqual(
matrixSetup({
server: resp.matrix_server,
setup: {
userId: `@${raiden.address.toLowerCase()}:${getServerName(resp.matrix_server)}`,
accessToken: expect.any(String),
deviceId: expect.any(String),
displayName: expect.any(String),
},
}),
);
expect(fetch).toHaveBeenCalledTimes(1);
expect(fetch).toHaveBeenCalledWith(
expect.stringMatching(/\/api\/v1\/info$/),
expect.anything(),
);
});

test('matrix fetch servers list', async () => {
expect.assertions(2);

// make the matrixServer falsy otherwise fetchSortedMatrixServers$
// inside initMatrixEpic is not called. This will force fetching server list
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '' }));
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '', pfsMode: PfsMode.disabled }));
await raiden.start();
await sleep();

Expand All @@ -227,7 +273,7 @@ describe('initMatrixEpic', () => {
status: 404,
json: jest.fn(async () => ({})),
});
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '' }));
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '', pfsMode: PfsMode.disabled }));

await raiden.start();
await lastValueFrom(raiden.action$);
Expand Down Expand Up @@ -263,7 +309,7 @@ describe('initMatrixEpic', () => {
});

// set fetch list from matrixServerLookup
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '' }));
raiden.store.dispatch(raidenConfigUpdate({ matrixServer: '', pfsMode: PfsMode.disabled }));
await raiden.start();
await lastValueFrom(raiden.action$);

Expand Down