Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kerem/eth subscribe #113

Merged
merged 26 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9e337e6
reflect-metadata
mechanical-turk Jul 28, 2024
1a4b286
rpc-methods wip
mechanical-turk Jul 28, 2024
3c5f4da
rpc-methods wip
mechanical-turk Jul 28, 2024
e98730c
wspair -> wscontext
mechanical-turk Jul 28, 2024
d804336
remove @inject when unnecessary
mechanical-turk Jul 30, 2024
e996266
using autoBindInjectable
mechanical-turk Jul 30, 2024
4c00854
defaultScope=singleton + skipBaseClassChecks
mechanical-turk Jul 30, 2024
17f420f
config factory logging
mechanical-turk Jul 30, 2024
9eb3e88
zodifying subscription rpc methods
mechanical-turk Jul 31, 2024
b211ce3
wdatatojson to util
mechanical-turk Jul 31, 2024
24dd059
EthSubscribeRpcParamsType
mechanical-turk Jul 31, 2024
59b6ce2
subscriptions/index.ts
mechanical-turk Jul 31, 2024
51304e2
outbound subscription factory
mechanical-turk Jul 31, 2024
4b5cf78
todo added
mechanical-turk Jul 31, 2024
63f24fe
created an eth_unsubscribe method
mechanical-turk Aug 4, 2024
badc7a1
created 2 util ws functions
mechanical-turk Aug 4, 2024
deaeafa
introducing context.abort
mechanical-turk Aug 4, 2024
c3953cb
better logging for the outbound subs factory
mechanical-turk Aug 4, 2024
6d60e8b
outbound subscription
mechanical-turk Aug 4, 2024
4f57ff4
ws-context handles subscription sharing + unsubscribes
mechanical-turk Aug 4, 2024
4ad6244
docs(changeset): Introducing subscription sharing
mechanical-turk Aug 4, 2024
b15d78d
removed todo
mechanical-turk Aug 4, 2024
de3e792
removed @inject
mechanical-turk Aug 4, 2024
88eb242
linting fix
mechanical-turk Aug 4, 2024
d578959
todo converted
mechanical-turk Aug 4, 2024
e8aa952
more todos
mechanical-turk Aug 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/metal-tables-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@whatsgood/nexus": minor
---

Introducing subscription sharing
4 changes: 2 additions & 2 deletions packages/nexus/src/auth/authorization-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";

const AUTH_KEY_QUERY_PARAM_NAME = "key";
Expand All @@ -7,7 +7,7 @@ const AUTH_KEY_QUERY_PARAM_NAME = "key";
export class AuthorizationService {
private readonly authKey?: string;

constructor(@inject(NexusConfig) config: NexusConfig) {
constructor(config: NexusConfig) {
this.authKey = config.authKey;
}

Expand Down
6 changes: 5 additions & 1 deletion packages/nexus/src/dependency-injection/container.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import { Container } from "inversify";

export const container = new Container();
export const container = new Container({
autoBindInjectable: true,
skipBaseClassChecks: true,
defaultScope: "Singleton",
});
5 changes: 1 addition & 4 deletions packages/nexus/src/example/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import * as http from "node:http";
import { Nexus } from "@src/nexus";
import { NodeProvider } from "@src/node-provider";
import { CHAIN } from "@src/default-chains";
import { Chain } from "@src/chain";
import { Nexus, NodeProvider, CHAIN, Chain } from "@src/index";

// const llamaRpcNodeProvider = new NodeProvider({
// name: "llama-rpc",
Expand Down Expand Up @@ -34,7 +31,7 @@
const alchemyWsNodeProvider = new NodeProvider({
name: "alchemy-ws",
chain: CHAIN.ETHEREUM_MAINNET,
url: process.env.ALCHEMY_WS_URL!,

Check warning on line 34 in packages/nexus/src/example/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Forbidden non-null assertion
weight: 1,
});

Expand Down
10 changes: 4 additions & 6 deletions packages/nexus/src/http/http-controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";
import { RpcRequestPayloadSchema } from "@src/rpc-schema";
import type { RpcResponse } from "@src/rpc-response";
Expand All @@ -25,13 +25,11 @@ export class HttpController {
private readonly logger: Logger;

constructor(
@inject(NexusConfig) private readonly config: NexusConfig,
@inject(LoggerFactory) private readonly loggerFactory: LoggerFactory,
@inject(HttpRelayHandler)
private readonly config: NexusConfig,
private readonly loggerFactory: LoggerFactory,
private readonly httpRelayHandler: HttpRelayHandler,
@inject(NexusMiddlewareHandler)
private readonly middlewareHandler: NexusMiddlewareHandler,
@inject(EventBus) private readonly eventBus: EventBus
private readonly eventBus: EventBus
) {
this.logger = this.loggerFactory.get(HttpController.name);
}
Expand Down
3 changes: 1 addition & 2 deletions packages/nexus/src/http/http-relay-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import type { NexusRpcContext } from "@src/nexus-rpc-context";
import { NodeEndpointPoolFactory } from "@src/node-endpoint";
import type { NodeRpcResponseFailure } from "@src/node-endpoint/node-rpc-response";
Expand All @@ -15,7 +15,6 @@ import {
@injectable()
export class HttpRelayHandler {
constructor(
@inject(NodeEndpointPoolFactory)
private readonly nodeEndpointPoolFactory: NodeEndpointPoolFactory
) {}

Expand Down
4 changes: 2 additions & 2 deletions packages/nexus/src/logging/logger-factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";

@injectable()
export class LoggerFactory {
constructor(@inject(NexusConfig) private readonly config: NexusConfig) {}
constructor(private readonly config: NexusConfig) {}

public get(name: string, options: Record<string, any> = {}): Logger {
// TODO: redact node provider url, and start logging providers directly
Expand Down
7 changes: 2 additions & 5 deletions packages/nexus/src/middleware/nexus-middleware-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { errSerialize } from "@src/utils";
import type { NexusRpcContext } from "@src/nexus-rpc-context";
import { NexusConfig } from "@src/nexus-config";
Expand All @@ -12,10 +12,7 @@ export class NexusMiddlewareHandler {
private readonly middleware: NexusMiddleware[];
private readonly logger: Logger;

constructor(
@inject(NexusConfig) config: NexusConfig,
@inject(LoggerFactory) loggerFactory: LoggerFactory
) {
constructor(config: NexusConfig, loggerFactory: LoggerFactory) {
this.middleware = [...config.middleware, authMiddleware];
this.logger = loggerFactory.get(NexusMiddlewareHandler.name);
}
Expand Down
6 changes: 4 additions & 2 deletions packages/nexus/src/nexus-config/nexus-config-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ export interface NexusConfigOptions {
export class NexusConfigFactory {
private readonly options: NexusConfigOptions;
private readonly envConfig: EnvConfig;
private readonly baseLogger: Logger;
private readonly logger: Logger;

constructor(options?: NexusConfigOptions) {
this.options = options || {};
this.envConfig = getEnvConfig(this.getEnv());
this.logger = this.getLogger();
this.baseLogger = this.getLogger();
this.logger = this.baseLogger.child({ name: NexusConfigFactory.name });
}

private getEnv() {
Expand All @@ -63,7 +65,7 @@ export class NexusConfigFactory {
chains: new Map(uniqueChains.map((chain) => [chain.chainId, chain])),
relay: this.getRelayConfig(),
port: this.getPort(),
logger: this.logger,
logger: this.baseLogger,
middleware: this.getMiddleware(),
authKey: this.getAuthKey(),
});
Expand Down
36 changes: 9 additions & 27 deletions packages/nexus/src/nexus/nexus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,17 @@ import type {
} from "@whatwg-node/server";
import { createServerAdapter } from "@whatwg-node/server";
import type { Logger } from "pino";
import { decorate, inject, injectable } from "inversify";
import { EventEmitter } from "eventemitter3";
import { injectable } from "inversify";
import {
NexusConfig,
NexusConfigFactory,
type NexusConfigOptions,
} from "@src/nexus-config";
import { HttpController } from "@src/http";
import { WsPairHandler, WsRpcServer } from "@src/websockets";
import { WsContextHandler, WsRpcServer } from "@src/websockets";
import { LoggerFactory } from "@src/logging";
import { EventBus } from "@src/events";
import { container } from "@src/dependency-injection";
import { HttpRelayHandler } from "@src/http/http-relay-handler";
import { NodeEndpointPoolFactory } from "@src/node-endpoint";
import { NexusMiddlewareHandler } from "@src/middleware";
import { AuthorizationService } from "@src/auth";

decorate(injectable(), EventEmitter); // TODO: put this somewhere else

export type NexusServerInstance = ServerAdapter<unknown, Nexus>;

Expand All @@ -33,11 +26,11 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
public readonly on: EventBus["on"];

constructor(
@inject(HttpController) private readonly controller: HttpController,
@inject(WsPairHandler) private readonly wsPairHandler: WsPairHandler,
@inject(LoggerFactory) loggerFactory: LoggerFactory,
@inject(EventBus) eventBus: EventBus,
@inject(NexusConfig) config: NexusConfig
private readonly controller: HttpController,
private readonly wsContextHandler: WsContextHandler,
loggerFactory: LoggerFactory,
eventBus: EventBus,
config: NexusConfig
) {
this.logger = loggerFactory.get(Nexus.name);
this.on = eventBus.on.bind(eventBus);
Expand All @@ -52,8 +45,8 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
public ws(httpServer: NodeHttpServer) {
const wsServer = container.resolve(WsRpcServer);

wsServer.on("connection", (pair) => {
this.wsPairHandler.handleConnection(pair);
wsServer.on("connection", (context) => {
this.wsContextHandler.handleConnection(context);
});

httpServer.on("upgrade", wsServer.handleUpgrade.bind(wsServer));
Expand All @@ -64,17 +57,6 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
const config = nexusConfigFactory.getNexusConfig();

container.bind(NexusConfig).toConstantValue(config);
container.bind(HttpController).toSelf().inSingletonScope();
container.bind(Nexus).toSelf().inSingletonScope();
container.bind(LoggerFactory).toSelf().inSingletonScope();
container.bind(WsPairHandler).toSelf().inSingletonScope();
container.bind(WsRpcServer).toSelf().inSingletonScope();
container.bind(EventBus).toSelf().inSingletonScope();
container.bind(HttpRelayHandler).toSelf().inSingletonScope();
container.bind(NodeEndpointPoolFactory).toSelf().inSingletonScope();
container.bind(NexusMiddlewareHandler).toSelf().inSingletonScope();
container.bind(EventEmitter).toSelf().inSingletonScope();
container.bind(AuthorizationService).toSelf().inSingletonScope();

const nexus = container.resolve(Nexus);

Expand Down
10 changes: 6 additions & 4 deletions packages/nexus/src/node-endpoint/node-endpoint-pool-factory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import type { Chain } from "@src/chain";
import type { NodeProvider } from "@src/node-provider";
import { LoggerFactory } from "@src/logging";
Expand All @@ -15,8 +15,8 @@ export class NodeEndpointPoolFactory {
public readonly ws: Map<Chain, NodeEndpointPool>;

constructor(
@inject(NexusConfig) private readonly config: NexusConfig,
@inject(LoggerFactory) private readonly loggerFactory: LoggerFactory // TODO: this should not be a property.
private readonly config: NexusConfig,
private readonly loggerFactory: LoggerFactory
) {
this.nodeProviders = config.nodeProviders;
this.http = this.createChainToEndpointPoolMap("http");
Expand Down Expand Up @@ -63,7 +63,9 @@ export class NodeEndpointPoolFactory {
nodeProvider,
})
),
loggerFactory: this.loggerFactory,
logger: this.loggerFactory.get(NodeEndpointPool.name, {
chain,
}),
})
);
}
Expand Down
5 changes: 2 additions & 3 deletions packages/nexus/src/node-endpoint/node-endpoint-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Logger } from "pino";
import type { Chain } from "@src/chain";
import type { RpcRequestPayloadType } from "@src/rpc-schema";
import { generatorOf, take, weightedShuffleGenerator } from "@src/utils";
import type { LoggerFactory } from "@src/logging";
import type { NexusConfig } from "@src/nexus-config";
import type { RelayConfig } from "./relay-config";
import {
Expand All @@ -28,12 +27,12 @@ export class NodeEndpointPool {
chain: Chain;
config: NexusConfig;
nodeEndpoints: NodeEndpoint[];
loggerFactory: LoggerFactory;
logger: Logger;
}) {
this.chain = params.chain;
this.nodeEndpoints = params.nodeEndpoints;
this.config = params.config.relay;
this.logger = params.loggerFactory.get(NodeEndpointPool.name);
this.logger = params.logger;

if (this.config.failure.kind === "cycle-requests") {
this.maxRelayAttempts = this.config.failure.maxAttempts;
Expand Down
57 changes: 57 additions & 0 deletions packages/nexus/src/rpc-methods/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { z } from "zod";
import {
RpcRequestPayloadSchema,
RpcResponseSuccessPayloadSchema,
} from "@src/rpc-schema";

export const eth_subscribe_newHeads = RpcRequestPayloadSchema.extend({

Check warning on line 7 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe_newHeads' is not in camel case
method: z.literal("eth_subscribe"),
params: z.tuple([
z.literal("newHeads"),
// TODO: can this method have more params?
]),
});

export const eth_subscribe_newPendingTransactions =

Check warning on line 15 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe_newPendingTransactions' is not in camel case
RpcRequestPayloadSchema.extend({
method: z.literal("eth_subscribe"),
params: z.tuple([
z.literal("newPendingTransactions"),
// TODO: can this method have more params?
]),
});

export const eth_subscribe = z.union([

Check warning on line 24 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe' is not in camel case
eth_subscribe_newHeads,

Check warning on line 25 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe_newHeads' is not in camel case
eth_subscribe_newPendingTransactions,

Check warning on line 26 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe_newPendingTransactions' is not in camel case
]);

export type EthSubscribeRpcPayloadType = z.infer<typeof eth_subscribe>;

Check warning on line 29 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribe' is not in camel case
export type EthSubscribeRpcParamsType = EthSubscribeRpcPayloadType["params"];

export const eth_unsubscribe = RpcRequestPayloadSchema.extend({

Check warning on line 32 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_unsubscribe' is not in camel case
method: z.literal("eth_unsubscribe"),
params: z.tuple([z.string()]),
});

export const eth_subscribeSuccessResponsePayloadSchema =

Check warning on line 37 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribeSuccessResponsePayloadSchema' is not in camel case
RpcResponseSuccessPayloadSchema.extend({
result: z.string(),
});

export type EthSubscribeSuccessResponsePayloadType = z.infer<
typeof eth_subscribeSuccessResponsePayloadSchema

Check warning on line 43 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / Continuous Integration

Identifier 'eth_subscribeSuccessResponsePayloadSchema' is not in camel case
>;

export const eth_subscriptionPayloadSchema = z.object({
jsonrpc: z.literal("2.0"),
method: z.literal("eth_subscription"),
params: z.object({
subscription: z.string(),
result: z.unknown(), // TODO: do better narrowing here
}),
});

export type EthSubscriptionPayloadType = z.infer<
typeof eth_subscriptionPayloadSchema
>;
Loading
Loading