Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: connection recovery #2108

Merged
merged 37 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
57e1e11
fix: implements connection recovery and moves all `.request` to be ma…
ganchoradkov Mar 15, 2023
f0470a1
refactor: removes redundant await
ganchoradkov Mar 15, 2023
a2ef387
chore: enables debug logs
ganchoradkov Mar 15, 2023
16e9e64
Merge branch 'v2.0' into fix/connection-recovery
ganchoradkov Mar 16, 2023
6f8b9f1
feat: reinitialize clients on failed connect
ganchoradkov Mar 16, 2023
6c95cd8
chore: updates vitest
ganchoradkov Mar 16, 2023
7ad4f37
chore: removes `--no-threads`
ganchoradkov Mar 16, 2023
10d5551
chore: removes `--no-threads`
ganchoradkov Mar 16, 2023
384c1e6
refactor: rm session extend test from integration
ganchoradkov Mar 16, 2023
4d69d10
refactor: restructure integration tests to reuse clients
ganchoradkov Mar 16, 2023
74314cd
refactor: implements `initTwoPairedClients` and uses it throughout te…
ganchoradkov Mar 16, 2023
7d659a5
refactor: lifecycle tests to use `initTwoPairedClients`
ganchoradkov Mar 16, 2023
38dbafe
fix: catch `socket hang up`
ganchoradkov Mar 16, 2023
51022c1
chore: log connection status
ganchoradkov Mar 16, 2023
05ad53f
chore: set isOnline timeout
ganchoradkov Mar 16, 2023
4f8c6ca
refactor: replaces isOnline with isReachable
ganchoradkov Mar 16, 2023
a0228ea
chore: log connection status on init
ganchoradkov Mar 17, 2023
d6c895e
chore: rm isReachable
ganchoradkov Mar 17, 2023
72405be
refactor: adds expiring promise to connect
ganchoradkov Mar 17, 2023
2a8b1cc
chore: prettier
ganchoradkov Mar 17, 2023
97a5713
feat: updates json-rpc provider
ganchoradkov Mar 17, 2023
8b27da4
refactor: adds initTwoClients to expiringPromise
ganchoradkov Mar 17, 2023
6ef31fc
chore: disables debug logs
ganchoradkov Mar 17, 2023
726cc2c
chore: updates json-rpc provider
ganchoradkov Mar 17, 2023
9174719
refactor: retry on flaky segfaults
ganchoradkov Mar 17, 2023
342e1f9
chore: set --segfault-retry=3 on all relay tests
ganchoradkov Mar 17, 2023
df8e091
refactor: --dangerouslyIgnoreUnhandledErrors on xregion & lifecycle t…
ganchoradkov Mar 17, 2023
294f2df
chore: rm invalid argument
ganchoradkov Mar 17, 2023
bb72c93
Merge branch 'v2.0' into fix/connection-recovery
ganchoradkov Mar 20, 2023
e605f93
chore: rm console logs
Mar 20, 2023
76859de
chore: updates json rpc provider package
Mar 20, 2023
f401117
Merge branch 'fix/connection-recovery' of github.com:WalletConnect/wa…
Mar 20, 2023
fbb89af
Merge branch 'v2.0' into fix/connection-recovery
ganchoradkov Mar 20, 2023
3f39143
Merge branch 'v2.0' into fix/connection-recovery
ganchoradkov Mar 20, 2023
eb6c4e1
Merge branch 'v2.0' into fix/connection-recovery
Mar 21, 2023
5c11984
refactor: uses `isValidArray` for batchSubscribe result
Mar 21, 2023
c71a6cf
feat: adds additional stalled connection errors
Mar 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
"rollup-plugin-esbuild": "4.9.3",
"sinon": "14.0.0",
"typescript": "4.7.4",
"vitest": "0.21.1",
"vitest": "^0.22.1",
"lerna": "5.4.2"
}
}
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
"dependencies": {
"@walletconnect/heartbeat": "1.2.0",
"@walletconnect/jsonrpc-provider": "1.0.9",
"@walletconnect/jsonrpc-provider": "1.0.10",
"@walletconnect/jsonrpc-utils": "^1.0.4",
"@walletconnect/jsonrpc-ws-connection": "1.0.10",
"@walletconnect/keyvaluestorage": "^1.0.2",
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/controllers/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
createExpiringPromise,
} from "@walletconnect/utils";
import { EventEmitter } from "events";

import { PUBLISHER_CONTEXT, PUBLISHER_DEFAULT_TTL, RELAYER_EVENTS } from "../constants";

export class Publisher extends IPublisher {
Expand Down Expand Up @@ -104,7 +105,7 @@ export class Publisher extends IPublisher {
if (isUndefined(request.params?.tag)) delete request.params?.tag;
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "message", direction: "outgoing", request });
return this.relayer.provider.request(request);
return this.relayer.request(request);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw your bullet on this:

all relay requests piped through relayer

I'm assuming this is simply removing unnecessary layering/indirection for .request by taking provider out of the equation or is there more to it I'm not seeing here at first glance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having all requests go through the relayer gives more control such as awaiting the connection to open before submitting the request

}

private onPublish(hash: string, _params: PublisherTypes.Params) {
Expand Down
67 changes: 49 additions & 18 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
isJsonRpcRequest,
JsonRpcPayload,
JsonRpcRequest,
RequestArguments,
} from "@walletconnect/jsonrpc-utils";
import WsConnection from "@walletconnect/jsonrpc-ws-connection";
import {
Expand All @@ -27,7 +28,7 @@ import {
RelayerTypes,
SubscriberTypes,
} from "@walletconnect/types";
import { formatRelayRpcUrl, getInternalError } from "@walletconnect/utils";
import { createExpiringPromise, formatRelayRpcUrl, getInternalError } from "@walletconnect/utils";

import {
RELAYER_SDK_VERSION,
Expand All @@ -43,7 +44,6 @@ import {
import { MessageTracker } from "./messages";
import { Publisher } from "./publisher";
import { Subscriber } from "./subscriber";

export class Relayer extends IRelayer {
public protocol = "wc";
public version = 2;
Expand All @@ -62,6 +62,7 @@ export class Relayer extends IRelayer {
private reconnecting = false;
private relayUrl: string;
private projectId: string | undefined;
private connectionStatusPollingInterval = 20;
constructor(opts: RelayerOptions) {
super(opts);
this.core = opts.core;
Expand All @@ -82,7 +83,7 @@ export class Relayer extends IRelayer {

public async init() {
this.logger.trace(`Initialized`);
this.provider = await this.createProvider();
await this.createProvider();
await Promise.all([this.messages.init(), this.transportOpen(), this.subscriber.init()]);
this.registerEventListeners();
this.initialized = true;
Expand Down Expand Up @@ -130,6 +131,18 @@ export class Relayer extends IRelayer {
return id;
}

public request = async (request: RequestArguments<RelayJsonRpc.SubscribeParams>) => {
this.logger.debug(`Publishing Request Payload`);
try {
await this.toEstablishConnection();
return await this.provider.request(request);
} catch (e) {
this.logger.debug(`Failed to Publish Request`);
this.logger.error(e as any);
throw e;
}
};

public async unsubscribe(topic: string, opts?: RelayerTypes.UnsubscribeOptions) {
this.isInitialized();
await this.subscriber.unsubscribe(topic, opts);
Expand Down Expand Up @@ -168,15 +181,14 @@ export class Relayer extends IRelayer {
await Promise.all([
new Promise<void>((resolve) => {
if (!this.initialized) resolve();

// wait for the subscriber to finish resubscribing to its topics
this.subscriber.once(SUBSCRIBER_EVENTS.resubscribed, () => {
resolve();
});
}),
await Promise.race([
new Promise<void>(async (resolve) => {
await this.provider.connect();
await createExpiringPromise(this.provider.connect(), 5_000, "socket hang up");
ganchoradkov marked this conversation as resolved.
Show resolved Hide resolved
this.removeListener(RELAYER_EVENTS.transport_closed, this.rejectTransportOpen);
resolve();
}),
Expand All @@ -188,25 +200,23 @@ export class Relayer extends IRelayer {
]),
]);
} catch (e: unknown | Error) {
this.logger.error(e);
const error = e as Error;
if (!/socket hang up/i.test(error.message)) {
throw e;
}
this.logger.error(e);
this.events.emit(RELAYER_EVENTS.transport_closed);
} finally {
this.reconnecting = false;
}
}

public async restartTransport(relayUrl?: string) {
if (this.transportExplicitlyClosed) {
return;
}

if (this.transportExplicitlyClosed) return;
this.relayUrl = relayUrl || this.relayUrl;
await this.transportClose();
await new Promise<void>((resolve) => setTimeout(resolve, RELAYER_RECONNECT_TIMEOUT));
await this.transportOpen(relayUrl);
await this.createProvider();
await this.transportOpen();
}

// ---------- Private ----------------------------------------------- //
Expand All @@ -217,7 +227,7 @@ export class Relayer extends IRelayer {

private async createProvider() {
const auth = await this.core.crypto.signJWT(this.relayUrl);
return new JsonRpcProvider(
this.provider = new JsonRpcProvider(
new WsConnection(
formatRelayRpcUrl({
sdkVersion: RELAYER_SDK_VERSION,
Expand All @@ -230,6 +240,7 @@ export class Relayer extends IRelayer {
}),
),
);
this.registerProviderListeners();
}

private async recordMessageEvent(messageEvent: RelayerTypes.MessageEvent) {
Expand Down Expand Up @@ -271,36 +282,41 @@ export class Relayer extends IRelayer {
await this.provider.connection.send(response);
}

private registerEventListeners() {
private registerProviderListeners() {
this.provider.on(RELAYER_PROVIDER_EVENTS.payload, (payload: JsonRpcPayload) =>
this.onProviderPayload(payload),
);
this.provider.on(RELAYER_PROVIDER_EVENTS.connect, () => {
this.events.emit(RELAYER_EVENTS.connect);
});
this.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, () => {
this.events.emit(RELAYER_EVENTS.disconnect);

this.attemptToReconnect();
this.onProviderDisconnect();
});
this.provider.on(RELAYER_PROVIDER_EVENTS.error, (err: unknown) => {
this.logger.error(err);
this.events.emit(RELAYER_EVENTS.error, err);
});
}

private registerEventListeners() {
this.events.on(RELAYER_EVENTS.connection_stalled, async () => {
await this.restartTransport();
});
}

private onProviderDisconnect() {
this.events.emit(RELAYER_EVENTS.disconnect);
this.attemptToReconnect();
}

private attemptToReconnect() {
if (this.transportExplicitlyClosed) {
return;
}

// Attempt reconnection after one second.
setTimeout(async () => {
await this.transportOpen();
await this.restartTransport();
}, toMiliseconds(RELAYER_RECONNECT_TIMEOUT));
}

Expand All @@ -310,4 +326,19 @@ export class Relayer extends IRelayer {
throw new Error(message);
}
}

private async toEstablishConnection() {
if (this.connected) return;
if (this.connecting) {
return await new Promise<void>((resolve) => {
const interval = setInterval(() => {
if (this.connected) {
clearInterval(interval);
resolve();
}
}, this.connectionStatusPollingInterval);
});
}
await this.restartTransport();
}
}
9 changes: 5 additions & 4 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ export class Subscriber extends ISubscriber {
this.logger.trace({ type: "payload", direction: "outgoing", request });
try {
const subscribe = await createExpiringPromise(
this.relayer.provider.request(request),
this.relayer.request(request),
this.subscribeTimeout,
);
await subscribe;
} catch (err) {
this.logger.debug(`Outgoing Relay Payload stalled`);
this.logger.debug(`Outgoing Relay Subscribe Payload stalled`);
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
}
return hashMessage(topic + this.clientId);
Expand All @@ -241,7 +241,7 @@ export class Subscriber extends ISubscriber {
this.logger.trace({ type: "payload", direction: "outgoing", request });
try {
const subscribe = await createExpiringPromise(
this.relayer.provider.request(request),
this.relayer.request(request),
this.subscribeTimeout,
);
return await subscribe;
Expand All @@ -262,7 +262,7 @@ export class Subscriber extends ISubscriber {
};
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "payload", direction: "outgoing", request });
return this.relayer.provider.request(request);
return this.relayer.request(request);
}

private onSubscribe(id: string, params: SubscriberTypes.Params) {
Expand Down Expand Up @@ -384,6 +384,7 @@ export class Subscriber extends ISubscriber {
private async batchSubscribe(subscriptions: SubscriberTypes.Params[]) {
if (!subscriptions.length) return;
const result = (await this.rpcBatchSubscribe(subscriptions)) as string[];
if (!result) return;
ganchoradkov marked this conversation as resolved.
Show resolved Hide resolved
this.onBatchSubscribe(result.map((id, i) => ({ ...subscriptions[i], id })));
}

Expand Down
Loading