Skip to content

Commit

Permalink
Merge pull request #166 from D8-X/track_rpc
Browse files Browse the repository at this point in the history
Track rpc
  • Loading branch information
Mantelijo authored Jul 10, 2024
2 parents e78bbdd + 8b9d26c commit 7430f50
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 4 deletions.
5 changes: 3 additions & 2 deletions packages/api/src/eventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
} from "utils/src/wsTypes";
import SturdyWebSocket from "sturdy-websocket";
import { Logger } from "winston";
import { TrackedWebsocketsProvider } from "./providers";

/**
* Class that listens to blockchain events on
Expand Down Expand Up @@ -201,7 +202,7 @@ export default class EventListener extends IndexPriceInterface {

// Attempt to establish a ws connection to new RPC
this.logger.info("creating new websocket rpc provider");
this.currentWSRpcProvider = new providers.WebSocketProvider(this.wsConn!);
this.currentWSRpcProvider = new TrackedWebsocketsProvider(this.wsConn!);

// On provider error - retry after short cooldown
this.currentWSRpcProvider.on("error", (error: Error) => () => {
Expand Down Expand Up @@ -632,7 +633,7 @@ export default class EventListener extends IndexPriceInterface {
* @param symbol order book symbol
*/
private addOrderBookEventHandlers(symbol: string) {
const provider = new providers.WebSocketProvider(this.wsRPC);
const provider = this.currentWSRpcProvider!;
this.orderBookContracts[symbol] = new Contract(
this.traderInterface.getOrderBookAddress(symbol),
this.traderInterface.getABI("lob")!,
Expand Down
23 changes: 23 additions & 0 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ import * as winston from "winston";
import { RPCConfig } from "utils/dist/wsTypes";
import RPCManager from "./rpcManager";
import fs from "fs";
import {
JsonRpcEthCalls,
NumJsonRpcProviders,
NumWssProviders,
ProvidersEthCallsStartTime,
WssEthCalls,
} from "./providers";

const defaultLogger = () => {
return winston.createLogger({
Expand Down Expand Up @@ -112,6 +119,22 @@ async function start() {
} else {
waitTime = 60_000;
}

// Print out eth calls statistics
const currentTime = new Date();
console.log("statistics of eth_ calls", {
JsonRpcEthCalls: JsonRpcEthCalls,
WssEthCalls: WssEthCalls,
CurrentTime: currentTime.toISOString(),
StartTime: ProvidersEthCallsStartTime.toISOString(),
RunningFor:
(currentTime.getTime() - ProvidersEthCallsStartTime.getTime()) /
1000 /
60 +
" minutes",
NumJsonRpcProviders,
NumWssProviders,
});
}
}
start();
51 changes: 51 additions & 0 deletions packages/api/src/providers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Networkish } from "@ethersproject/providers";
import { WebSocketLike } from "@ethersproject/providers/lib/websocket-provider";
import { providers } from "ethers";
import { ConnectionInfo } from "ethers/lib/utils";

export const ProvidersEthCallsStartTime = new Date();

/**
* List of eth_ * method calls and their counts for all TrackedJsonRpcProvider
*/
export const JsonRpcEthCalls = new Map<string, number>();

/**
* List of eth_ * method calls and their counts for all TrackedWebsocketsProvider
*/
export const WssEthCalls = new Map<string, number>();

export let NumJsonRpcProviders = 0;
export let NumWssProviders = 0;

export class TrackedWebsocketsProvider extends providers.WebSocketProvider {
constructor(url: string | WebSocketLike, network?: Networkish) {
super(url, network);
NumWssProviders++;
}

send(method: string, params?: Array<any>): Promise<any> {
if (!WssEthCalls.has(method)) {
WssEthCalls.set(method, 0);
}
WssEthCalls.set(method, WssEthCalls.get(method)! + 1);

return super.send(method, params);
}
}

export class TrackedJsonRpcProvider extends providers.StaticJsonRpcProvider {
constructor(url?: ConnectionInfo | string, network?: Networkish) {
super(url, network);
NumJsonRpcProviders++;
}

send(method: string, params: Array<any>): Promise<any> {
if (!JsonRpcEthCalls.has(method)) {
JsonRpcEthCalls.set(method, 0);
}
JsonRpcEthCalls.set(method, JsonRpcEthCalls.get(method)! + 1);

return super.send(method, params);
}
}
3 changes: 2 additions & 1 deletion packages/api/src/rpcManager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { JsonRpcProvider } from "@ethersproject/providers";
import { executeWithTimeout } from "utils";
import { TrackedJsonRpcProvider } from "./providers";

export default class RPCManager {
private healthy: Map<string, boolean> = new Map();
Expand Down Expand Up @@ -54,7 +55,7 @@ export default class RPCManager {
this.healthy.get(rpc) === undefined ||
(this.lastCheck.get(rpc) ?? 0) + this.CHECK_INTERVAL_MS < Date.now()
) {
const provider = new JsonRpcProvider(rpc);
const provider = new TrackedJsonRpcProvider(rpc);
try {
await executeWithTimeout(provider.detectNetwork(), this.NETWORK_READY_MS);
this.healthy.set(rpc, true);
Expand Down
5 changes: 4 additions & 1 deletion packages/api/src/sdkInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import Observable from "./observable";
import type { RedisClientType } from "redis";
import { extractErrorMsg, constructRedis } from "utils";
import RPCManager from "./rpcManager";
import { TrackedJsonRpcProvider } from "./providers";

export type OrderWithTraderAndId = Order & { orderId: string; trader: string };

Expand Down Expand Up @@ -48,7 +49,9 @@ export default class SDKInterface extends Observable {
public async initialize(sdkConfig: NodeSDKConfig, rpcManager: RPCManager) {
this.apiInterface = new TraderInterface(sdkConfig);
this.rpcManager = rpcManager;
await this.apiInterface.createProxyInstance();
await this.apiInterface.createProxyInstance(
new TrackedJsonRpcProvider(sdkConfig.nodeURL),
);
await this.broker.initialize(sdkConfig);
const brokerAddress = await this.broker.getBrokerAddress();
if (!this.redisClient.isOpen) {
Expand Down

0 comments on commit 7430f50

Please sign in to comment.