Skip to content

Commit

Permalink
Merge 713f213 into 684c2e3
Browse files Browse the repository at this point in the history
  • Loading branch information
dadepo authored Jul 19, 2022
2 parents 684c2e3 + 713f213 commit d1864c0
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 31 deletions.
52 changes: 42 additions & 10 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Note: isomorphic-fetch is not well mantained and does not support abort signals
import fetch from "cross-fetch";

import {ErrorAborted, TimeoutError} from "@lodestar/utils";
import {ErrorAborted, TimeoutError, retry} from "@lodestar/utils";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, IRpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";
Expand All @@ -29,6 +29,10 @@ export type ReqOpts = {
timeout?: number;
// To label request metrics
routeId?: string;
// retry opts
retryAttempts?: number;
retryDelay?: number;
shouldRetry?: (lastError: Error) => boolean;
};

export type JsonRpcHttpClientMetrics = {
Expand All @@ -37,10 +41,12 @@ export type JsonRpcHttpClientMetrics = {
requestUsedFallbackUrl: IGauge;
activeRequests: IGauge;
configUrlsCount: IGauge;
retryCount: IGauge;
};

export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: IRpcPayload[], opts?: ReqOpts): Promise<R[]>;
}

Expand All @@ -64,11 +70,20 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
/** If returns true, do not fallback to other urls and throw early */
shouldNotFallback?: (error: Error) => boolean;
/**
* If provided, the requests to the RPC server will be bundled with a HS256 encoded
* token using this secret. Otherwise the requests to the RPC server will be unauthorized
* Optional: If provided, use this jwt secret to HS256 encode and add a jwt token in the
* request header which can be authenticated by the RPC server to provide access.
* A fresh token is generated on each requests as EL spec mandates the ELs to check
* the token freshness +-5 seconds (via `iat` property of the token claim)
*
* Otherwise the requests to the RPC server will be unauthorized
* and it might deny responses to the RPC requests.
*/
jwtSecret?: Uint8Array;
/** Retry attempts */
retryAttempts?: number;
/** Retry delay, only relevant with retry attempts */
retryDelay?: number;
/** Metrics for retry, could be expanded later */
metrics?: JsonRpcHttpClientMetrics | null;
}
) {
Expand All @@ -85,13 +100,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
this.jwtSecret = opts?.jwtSecret;
this.metrics = opts?.metrics ?? null;

// Set config metric gauges once

const metrics = this.metrics;
if (metrics) {
metrics.configUrlsCount.set(urls.length);
metrics.activeRequests.addCollect(() => metrics.activeRequests.set(this.activeRequests));
}
this.metrics?.configUrlsCount.set(urls.length);
this.metrics?.activeRequests.addCollect(() => this.metrics?.activeRequests.set(this.activeRequests));
}

/**
Expand All @@ -102,6 +112,28 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
return parseRpcResponse(res, payload);
}

/**
* Perform RPC request with retry
*/
async fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R> {
const routeId = opts?.routeId ?? "unknown";
const res = await retry<IRpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
}
return await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryAttempts ?? 0,
shouldRetry: opts?.shouldRetry,
}
);
return parseRpcResponse(res, payload);
}

/**
* Perform RPC batched request
* Type-wise assumes all requests results have the same type
Expand Down
50 changes: 36 additions & 14 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ import {
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
metrics?: IMetrics | null;
};

export type ExecutionEngineHttpOpts = {
urls: string[];
retryAttempts: number;
retryDelay: number;
timeout?: number;
/**
* 256 bit jwt secret in hex format without the leading 0x. If provided, the execution engine
Expand All @@ -44,6 +51,8 @@ export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
* port/url, one can override this and skip providing a jwt secret.
*/
urls: ["http://localhost:8551"],
retryAttempts: 3,
retryDelay: 2000,
timeout: 12000,
};

Expand All @@ -65,12 +74,12 @@ export class ExecutionEngineHttp implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
private readonly rpc: IJsonRpcHttpClient;

constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, metrics?: IMetrics | null) {
constructor(opts: ExecutionEngineHttpOpts, {metrics, signal}: ExecutionEngineModules) {
this.rpc = new JsonRpcHttpClient(opts.urls, {
...opts,
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics?.executionEnginerHttpClient,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
});
}

Expand Down Expand Up @@ -103,12 +112,15 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const method = "engine_newPayloadV1";
const serializedExecutionPayload = serializeExecutionPayload(executionPayload);
const {status, latestValidHash, validationError} = await this.rpc
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [serializedExecutionPayload]},
.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{
method,
params: [serializedExecutionPayload],
},
notifyNewPayloadOpts
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated seperate from being INVALID. For now, just pass the error upstream.
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
Expand Down Expand Up @@ -201,14 +213,19 @@ export class ExecutionEngineHttp implements IExecutionEngine {
}
: undefined;

// TODO: propogate latestValidHash to the forkchoice, for now ignore it as we
// currently do not propogate the validation status up the forkchoice
// If we are just fcUing and not asking execution for payload, retry is not required
// and we can move on, as the next fcU will be issued soon on the new slot
const fcUReqOpts =
payloadAttributes !== undefined ? forkchoiceUpdatedV1Opts : {...forkchoiceUpdatedV1Opts, retryAttempts: 1};
const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes]},
forkchoiceUpdatedV1Opts
} = await this.rpc.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{
method,
params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes],
},
fcUReqOpts
);

switch (status) {
Expand Down Expand Up @@ -253,11 +270,16 @@ export class ExecutionEngineHttp implements IExecutionEngine {
*/
async getPayload(payloadId: PayloadId): Promise<bellatrix.ExecutionPayload> {
const method = "engine_getPayloadV1";
const executionPayloadRpc = await this.rpc.fetch<
const executionPayloadRpc = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [payloadId]}, getPayloadOpts);

>(
{
method,
params: [payloadId],
},
getPayloadOpts
);
return parseExecutionPayload(executionPayloadRpc);
}

Expand Down
15 changes: 11 additions & 4 deletions packages/beacon-node/src/execution/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {IExecutionEngine} from "./interface.js";
import {ExecutionEngineDisabled} from "./disabled.js";
import {ExecutionEngineHttp, ExecutionEngineHttpOpts, defaultExecutionEngineHttpOpts} from "./http.js";
import {
ExecutionEngineHttp,
ExecutionEngineModules,
ExecutionEngineHttpOpts,
defaultExecutionEngineHttpOpts,
} from "./http.js";
import {ExecutionEngineMock, ExecutionEngineMockOpts} from "./mock.js";

export {
Expand All @@ -15,17 +20,19 @@ export type ExecutionEngineOpts =
| ({mode?: "http"} & ExecutionEngineHttpOpts)
| ({mode: "mock"} & ExecutionEngineMockOpts)
| {mode: "disabled"};

export const defaultExecutionEngineOpts: ExecutionEngineOpts = defaultExecutionEngineHttpOpts;

export function initializeExecutionEngine(opts: ExecutionEngineOpts, signal: AbortSignal): IExecutionEngine {
export function initializeExecutionEngine(
opts: ExecutionEngineOpts,
modules: ExecutionEngineModules
): IExecutionEngine {
switch (opts.mode) {
case "mock":
return new ExecutionEngineMock(opts);
case "disabled":
return new ExecutionEngineDisabled();
case "http":
default:
return new ExecutionEngineHttp(opts, signal);
return new ExecutionEngineHttp(opts, modules);
}
}
10 changes: 10 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,11 @@ export function createLodestarMetrics(
help: "eth1 JsonHttpClient - total count of request errors",
labelNames: ["routeId"],
}),
retryCount: register.gauge<"routeId">({
name: "lodestar_eth1_http_client_request_retries_total",
help: "eth1 JsonHttpClient - total count of request retries",
labelNames: ["routeId"],
}),
requestUsedFallbackUrl: register.gauge({
name: "lodestar_eth1_http_client_request_used_fallback_url_total",
help: "eth1 JsonHttpClient - total count of requests on fallback url(s)",
Expand Down Expand Up @@ -1094,6 +1099,11 @@ export function createLodestarMetrics(
help: "ExecutionEngineHttp client - total count of request errors",
labelNames: ["routeId"],
}),
retryCount: register.gauge<"routeId">({
name: "lodestar_execution_engine_http_client_request_retries_total",
help: "ExecutionEngineHttp client - total count of request retries",
labelNames: ["routeId"],
}),
requestUsedFallbackUrl: register.gauge({
name: "lodestar_execution_engine_http_client_request_used_fallback_url_total",
help: "ExecutionEngineHttp client - total count of requests on fallback url(s)",
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export class BeaconNode {
{config, db, metrics, logger: logger.child(opts.logger.eth1), signal},
anchorState
),
executionEngine: initializeExecutionEngine(opts.executionEngine, signal),
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
executionBuilder: opts.executionBuilder.enabled
? initializeExecutionBuilder(opts.executionBuilder, config)
: undefined,
Expand Down
Loading

0 comments on commit d1864c0

Please sign in to comment.