Skip to content

Commit

Permalink
fix: stalled connection in CI (#1758)
Browse files Browse the repository at this point in the history
* chore: updates ws

* fix: rm slashes

* feat: adds request relay log

* fix: events

* feat: disable ack on pairing delete

* feat: add small delay before switching off sockets

* chore: packagelog

* feat: rework tests

* feat: delete clients

* chore: rm client variable

* feat: single pairing for validation tests

* chore: updates ws dep

* chore: skip validation tests

* chore: adds logs for tests

* feat: adds a delay before releasing connect + before deleting clients

* chore: skip extend test

* chore: validation tests logs

* feat: adds clientId logs & reverts await removal

* feat: adds throttle after clients are created

* fix: add await

* chore: skip validation tests

* chore: delay

* feat: adds closing socket for test & published payload id

* chore: updates ws to log socket sending

* chore: more logs

* chore: log disconnect

* feat: adds small delay on connect

* feat: reconnect stalled socket

* chore: lower throttle

* chore: logs clientId to stalled sockets

* chore: rm delay in disconnect sockets

* feat: isolate canary

* chore: stop ws logs

* feat: implement reconnection in subscriber

* chore: rm connect throttle

* chore: rev while loop

* chore: better logs

* feat: restart connection

* fix: unused property

* chore: updates ws package

* chore: update ws to reconnect on socket hang up

* feat: increases connect timeout

* feat: adds logs on num retries + logs timeout test name

* feat: compelete restart of provider & socket ws

* refactor: rework subscribe reconnect & increase timeouts

* fix: remove listeners before recreating provider

* feat: implements publisher requests as subscribe

* feat: small delay

* feat: adds some delay to allow tranposrt to close

* chore: reduce noise

* feat: restore provider.connect()

* chore: skip push test

* chore: reenable push

* fix: restore transport close open

* fix: remove listeners on heartbeat

* refactor: reenable validation tests

* chore: updates ws

* feat: crank timeouts

* chore: update ws

* refactor: lower lifecycle timeout

* chore: update ws

* chore: updates ws

* chore: update ws

* feat: uses transportOpen to start connection in order to catch exceptions

* chore: updates ws

* feat: single retry

* feat: emit event `connection_stalled`

* chore: logs & resubscribe timeout

* chore: adds logs for subscribe heartbeat

* chore: more logs

* feat: ignore stalled requests

* chore: add logs on publish heartbeat

* feat: adds reject on reconnection

* feat: adds subscribeInProgress

* chore: rm no threads rule

* fix: typo

* fix: make sure publish is resolved

* chore: log test name

* chore: logger trace

* chore: update logs

* fix: uses .pushish

* chore: reenable no-threads

* chore: rm logger

* feat: skip reopening connection if its already active

* feat: restartTransport()

* feat: avoid restarting transport while restarting already in progress

* fix: delete clients

* feat: observer pattern for subscriptions

* chore: add logs

* feat: close pending transportOpen

* chore: log reject

* feat: adds growing timeout before reconnecting again

* fix: don't attempt transport restart if already running

* feat: exponential delay

* Revert "feat: exponential delay"

This reverts commit 839fb64.

* refactor: delay to 5s

* fix: 0ms

* chore: log received payload

* feat: growing delay between reconnections

* fix: sets transportRestartInProgress to false

* chore: log attemptToReconnect

* feat: add delay before restarts

* refator: moves this.transportExplicitlyClosed = false; after connection is opened

* feat: additional await after deleting sockets

* chore: log connected status

* feat: add a delay for replication

* feat: split tests into multiple files

* chore: logs to follow process

* feat: restart on transport error

* chore: larger delay between reconnecting

* feat: restructures tests

* refactor: events in integration

* feat: client tests

* feat: emit transport close

* fix: rm non id subscriptions for tests

* chore: cleanup

* fix: regex

* chore: rm test names

* refactor: debug lgos

* chore: rm eslint rules

* chore: restore default client seed

* fix: fake timers

* feat:  expiring promise

* feat: adds a retry on integration tests

* chore: updates ws

* fix: add ^ to ws version

* chore: revert name readonly

* refactor: magic numbers to const

* refactor: rm .only modifier

* refactor: logs to logger

* chore: fixes typo -> persistence

* refactor: logs to logger

* refactor: reenables expiry test

* refactor: move `resubscribed` listener with connect call to avoid possible race conditions

* refactor: reenables expiry test in client too
  • Loading branch information
ganchoradkov authored Dec 20, 2022
1 parent ed782ce commit 0cdf2a2
Show file tree
Hide file tree
Showing 26 changed files with 663 additions and 523 deletions.
47 changes: 31 additions & 16 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@walletconnect/heartbeat": "^1.0.1",
"@walletconnect/jsonrpc-provider": "^1.0.6",
"@walletconnect/jsonrpc-utils": "^1.0.4",
"@walletconnect/jsonrpc-ws-connection": "^1.0.5",
"@walletconnect/jsonrpc-ws-connection": "^1.0.6",
"@walletconnect/keyvaluestorage": "^1.0.2",
"@walletconnect/logger": "^2.0.1",
"@walletconnect/relay-api": "^1.0.7",
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/constants/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ export const RELAYER_EVENTS = {
connect: "relayer_connect",
disconnect: "relayer_disconnect",
error: "relayer_error",
connection_stalled: "relayer_connection_stalled",
transport_closed: "relayer_transport_closed",
};

export const RELAYER_SUBSCRIBER_SUFFIX = "_subscription";
Expand Down
26 changes: 16 additions & 10 deletions packages/core/src/controllers/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import {
getRelayProtocolName,
hashMessage,
isUndefined,
createExpiringPromise,
} from "@walletconnect/utils";
import { EventEmitter } from "events";
import { PUBLISHER_CONTEXT, PUBLISHER_DEFAULT_TTL } from "../constants";
import { PUBLISHER_CONTEXT, PUBLISHER_DEFAULT_TTL, RELAYER_EVENTS } from "../constants";

export class Publisher extends IPublisher {
public events = new EventEmitter();
public name = PUBLISHER_CONTEXT;
public queue = new Map<string, PublisherTypes.Params>();
private publishTimeout = 10_000;

constructor(public relayer: IRelayer, public logger: Logger) {
super(relayer, logger);
Expand All @@ -39,7 +41,17 @@ export class Publisher extends IPublisher {
const params = { topic, message, opts: { ttl, relay, prompt, tag } };
const hash = hashMessage(message);
this.queue.set(hash, params);
await this.rpcPublish(topic, message, ttl, relay, prompt, tag);
try {
const publish = await createExpiringPromise(
this.rpcPublish(topic, message, ttl, relay, prompt, tag),
this.publishTimeout,
);
await publish;
} catch (err) {
this.logger.debug(`Publishing Payload stalled`);
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
return;
}
this.onPublish(hash, params);
this.logger.debug(`Successfully Published Payload`);
this.logger.trace({ type: "method", method: "publish", params: { topic, message, opts } });
Expand Down Expand Up @@ -100,14 +112,8 @@ export class Publisher extends IPublisher {

private checkQueue() {
this.queue.forEach(async (params) => {
const {
topic,
message,
opts: { ttl, relay, prompt, tag },
} = params;
const hash = hashMessage(message);
await this.rpcPublish(topic, message, ttl, relay, prompt, tag);
this.onPublish(hash, params);
const { topic, message, opts } = params;
await this.publish(topic, message, opts);
});
}

Expand Down
74 changes: 61 additions & 13 deletions packages/core/src/controllers/relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
ISubscriber,
RelayerOptions,
RelayerTypes,
SubscriberTypes,
} from "@walletconnect/types";
import { formatRelayRpcUrl, getInternalError } from "@walletconnect/utils";

Expand Down Expand Up @@ -61,7 +62,6 @@ export class Relayer extends IRelayer {

private relayUrl: string;
private projectId: string | undefined;

constructor(opts: RelayerOptions) {
super(opts);
this.core = opts.core;
Expand All @@ -83,7 +83,7 @@ export class Relayer extends IRelayer {
public async init() {
this.logger.trace(`Initialized`);
this.provider = await this.createProvider();
await Promise.all([this.messages.init(), this.provider.connect(), this.subscriber.init()]);
await Promise.all([this.messages.init(), this.transportOpen(), this.subscriber.init()]);
this.registerEventListeners();
this.initialized = true;
}
Expand All @@ -108,7 +108,20 @@ export class Relayer extends IRelayer {

public async subscribe(topic: string, opts?: RelayerTypes.SubscribeOptions) {
this.isInitialized();
const id = await this.subscriber.subscribe(topic, opts);
let id = "";
await Promise.all([
new Promise<void>((resolve) => {
this.subscriber.once(SUBSCRIBER_EVENTS.created, (subscription: SubscriberTypes.Active) => {
if (subscription.topic === topic) {
resolve();
}
});
}),
new Promise<void>(async (resolve) => {
id = await this.subscriber.subscribe(topic, opts);
resolve();
}),
]);
return id;
}

Expand All @@ -135,20 +148,50 @@ export class Relayer extends IRelayer {

public async transportClose() {
this.transportExplicitlyClosed = true;
await this.provider.disconnect();
if (this.connected) await this.provider.disconnect();
this.events.emit(RELAYER_EVENTS.transport_closed);
}

public async transportOpen(relayUrl?: string) {
this.relayUrl = relayUrl || this.relayUrl;
this.transportExplicitlyClosed = false;
await this.provider.connect();
// wait for the subscriber to finish resubscribing to its topics
await new Promise<void>((resolve) => {
this.subscriber.once(SUBSCRIBER_EVENTS.resubscribed, () => {
resolve();
});
});
try {
await Promise.all([
new Promise<void>((resolve) => {
if (!this.initialized) resolve();

// wait for the subscriber to finish resubscribing to its topics
this.subscriber.once(SUBSCRIBER_EVENTS.resubscribed, () => {
resolve();
});
}),
await Promise.race([
this.provider.connect(),
new Promise<void>((_res, reject) =>
// rejects pending promise if transport is closed before connection is established
// useful when .connect() gets stuck resolving
this.once(RELAYER_EVENTS.transport_closed, () => {
reject();
}),
),
]),
]);
} catch (e: unknown | Error) {
const error = e as Error;
if (!/socket hang up/i.test(error.message)) {
throw new Error(error.message);
}
this.logger.error(error);
this.events.emit(RELAYER_EVENTS.transport_closed);
}
}

public async restartTransport(relayUrl?: string) {
await this.transportClose();
await new Promise<void>((resolve) => setTimeout(resolve, RELAYER_RECONNECT_TIMEOUT));
await this.transportOpen(relayUrl);
}

// ---------- Private ----------------------------------------------- //

private async createProvider() {
Expand Down Expand Up @@ -221,15 +264,20 @@ export class Relayer extends IRelayer {
this.provider.on(RELAYER_PROVIDER_EVENTS.error, (err: unknown) =>
this.events.emit(RELAYER_EVENTS.error, err),
);

this.events.on(RELAYER_EVENTS.connection_stalled, async () => {
await this.restartTransport();
});
}

private attemptToReconnect() {
if (this.transportExplicitlyClosed) {
return;
}

// Attempt reconnection after one second.
setTimeout(() => {
this.provider.connect();
setTimeout(async () => {
await this.transportOpen();
}, toMiliseconds(RELAYER_RECONNECT_TIMEOUT));
}

Expand Down
22 changes: 17 additions & 5 deletions packages/core/src/controllers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import {
getInternalError,
getRelayProtocolApi,
getRelayProtocolName,
createExpiringPromise,
} from "@walletconnect/utils";
import {
CORE_STORAGE_PREFIX,
SUBSCRIBER_CONTEXT,
SUBSCRIBER_EVENTS,
SUBSCRIBER_STORAGE_VERSION,
PENDING_SUB_RESOLUTION_TIMEOUT,
RELAYER_PROVIDER_EVENTS,
RELAYER_EVENTS,
} from "../constants";
import { SubscriberTopicMap } from "./topicmap";

Expand All @@ -40,7 +41,7 @@ export class Subscriber extends ISubscriber {
private pendingSubscriptionWatchLabel = "pending_sub_watch_label";
private pendingSubInterval = 20;
private storagePrefix = CORE_STORAGE_PREFIX;

private subscribeTimeout = 10_000;
constructor(public relayer: IRelayer, public logger: Logger) {
super(relayer, logger);
this.relayer = relayer;
Expand Down Expand Up @@ -206,7 +207,18 @@ export class Subscriber extends ISubscriber {
};
this.logger.debug(`Outgoing Relay Payload`);
this.logger.trace({ type: "payload", direction: "outgoing", request });
return await this.relayer.provider.request(request);
let result: any;
try {
const subscribe = await createExpiringPromise(
this.relayer.provider.request(request),
this.subscribeTimeout,
);
result = await subscribe;
} catch (err) {
this.logger.debug(`Outgoing Relay Payload stalled`);
this.relayer.events.emit(RELAYER_EVENTS.connection_stalled);
}
return result;
}

private rpcUnsubscribe(topic: string, id: string, relay: RelayerTypes.ProtocolOptions) {
Expand Down Expand Up @@ -362,10 +374,10 @@ export class Subscriber extends ISubscriber {
this.relayer.core.heartbeat.on(HEARTBEAT_EVENTS.pulse, () => {
this.checkPending();
});
this.relayer.provider.on(RELAYER_PROVIDER_EVENTS.connect, async () => {
this.relayer.on(RELAYER_EVENTS.connect, async () => {
await this.onConnect();
});
this.relayer.provider.on(RELAYER_PROVIDER_EVENTS.disconnect, () => {
this.relayer.on(RELAYER_EVENTS.disconnect, () => {
this.onDisconnect();
});
this.events.on(SUBSCRIBER_EVENTS.created, async (createdEvent: SubscriberEvents.Created) => {
Expand Down
Loading

0 comments on commit 0cdf2a2

Please sign in to comment.