Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added retry mechanism in executionEngine for executePayload #3854

Merged
merged 6 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Note: isomorphic-fetch is not well mantained and does not support abort signals
import fetch from "cross-fetch";

import {ErrorAborted, TimeoutError} from "@lodestar/utils";
import {ErrorAborted, TimeoutError, retry} from "@lodestar/utils";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, IRpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";
Expand All @@ -29,6 +29,10 @@ export type ReqOpts = {
timeout?: number;
// To label request metrics
routeId?: string;
// retry opts
retryAttempts?: number;
retryDelay?: number;
shouldRetry?: (lastError: Error) => boolean;
};

export type JsonRpcHttpClientMetrics = {
Expand All @@ -37,10 +41,12 @@ export type JsonRpcHttpClientMetrics = {
requestUsedFallbackUrl: IGauge;
activeRequests: IGauge;
configUrlsCount: IGauge;
retryCount: IGauge;
};

export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: IRpcPayload[], opts?: ReqOpts): Promise<R[]>;
}

Expand All @@ -64,11 +70,20 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
/** If returns true, do not fallback to other urls and throw early */
shouldNotFallback?: (error: Error) => boolean;
/**
* If provided, the requests to the RPC server will be bundled with a HS256 encoded
* token using this secret. Otherwise the requests to the RPC server will be unauthorized
* Optional: If provided, use this jwt secret to HS256 encode and add a jwt token in the
* request header which can be authenticated by the RPC server to provide access.
* A fresh token is generated on each requests as EL spec mandates the ELs to check
* the token freshness +-5 seconds (via `iat` property of the token claim)
*
* Otherwise the requests to the RPC server will be unauthorized
* and it might deny responses to the RPC requests.
*/
jwtSecret?: Uint8Array;
/** Retry attempts */
retryAttempts?: number;
/** Retry delay, only relevant with retry attempts */
retryDelay?: number;
/** Metrics for retry, could be expanded later */
metrics?: JsonRpcHttpClientMetrics | null;
}
) {
Expand All @@ -85,13 +100,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
this.jwtSecret = opts?.jwtSecret;
this.metrics = opts?.metrics ?? null;

// Set config metric gauges once

const metrics = this.metrics;
if (metrics) {
metrics.configUrlsCount.set(urls.length);
metrics.activeRequests.addCollect(() => metrics.activeRequests.set(this.activeRequests));
}
this.metrics?.configUrlsCount.set(urls.length);
this.metrics?.activeRequests.addCollect(() => this.metrics?.activeRequests.set(this.activeRequests));
}

/**
Expand All @@ -102,6 +112,28 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
return parseRpcResponse(res, payload);
}

/**
* Perform RPC request with retry
*/
async fetchWithRetries<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R> {
const routeId = opts?.routeId ?? "unknown";
const res = await retry<IRpcResponse<R>>(
async (attempt) => {
/** If this is a retry, increment the retry counter for this method */
if (attempt > 1) {
this.opts?.metrics?.retryCount.inc({routeId});
}
return await this.fetchJson({jsonrpc: "2.0", id: this.id++, ...payload}, opts);
},
{
retries: opts?.retryAttempts ?? this.opts?.retryAttempts ?? 1,
retryDelay: opts?.retryDelay ?? this.opts?.retryAttempts ?? 0,
shouldRetry: opts?.shouldRetry,
}
);
return parseRpcResponse(res, payload);
}

/**
* Perform RPC batched request
* Type-wise assumes all requests results have the same type
Expand Down
50 changes: 36 additions & 14 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@ import {
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
metrics?: IMetrics | null;
};

export type ExecutionEngineHttpOpts = {
urls: string[];
retryAttempts: number;
retryDelay: number;
timeout?: number;
/**
* 256 bit jwt secret in hex format without the leading 0x. If provided, the execution engine
Expand All @@ -44,6 +51,8 @@ export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
* port/url, one can override this and skip providing a jwt secret.
*/
urls: ["http://localhost:8551"],
retryAttempts: 3,
retryDelay: 2000,
timeout: 12000,
};

Expand All @@ -65,12 +74,12 @@ export class ExecutionEngineHttp implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
private readonly rpc: IJsonRpcHttpClient;

constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, metrics?: IMetrics | null) {
constructor(opts: ExecutionEngineHttpOpts, {metrics, signal}: ExecutionEngineModules) {
this.rpc = new JsonRpcHttpClient(opts.urls, {
...opts,
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics?.executionEnginerHttpClient,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
});
}

Expand Down Expand Up @@ -103,12 +112,15 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const method = "engine_newPayloadV1";
const serializedExecutionPayload = serializeExecutionPayload(executionPayload);
const {status, latestValidHash, validationError} = await this.rpc
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [serializedExecutionPayload]},
.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{
method,
params: [serializedExecutionPayload],
},
notifyNewPayloadOpts
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated seperate from being INVALID. For now, just pass the error upstream.
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
Expand Down Expand Up @@ -201,14 +213,19 @@ export class ExecutionEngineHttp implements IExecutionEngine {
}
: undefined;

// TODO: propogate latestValidHash to the forkchoice, for now ignore it as we
// currently do not propogate the validation status up the forkchoice
// If we are just fcUing and not asking execution for payload, retry is not required
// and we can move on, as the next fcU will be issued soon on the new slot
const fcUReqOpts =
payloadAttributes !== undefined ? forkchoiceUpdatedV1Opts : {...forkchoiceUpdatedV1Opts, retryAttempts: 1};
const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes]},
forkchoiceUpdatedV1Opts
} = await this.rpc.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{
method,
params: [{headBlockHash, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes],
},
fcUReqOpts
);

switch (status) {
Expand Down Expand Up @@ -253,11 +270,16 @@ export class ExecutionEngineHttp implements IExecutionEngine {
*/
async getPayload(payloadId: PayloadId): Promise<bellatrix.ExecutionPayload> {
const method = "engine_getPayloadV1";
const executionPayloadRpc = await this.rpc.fetch<
const executionPayloadRpc = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [payloadId]}, getPayloadOpts);

>(
{
method,
params: [payloadId],
},
getPayloadOpts
);
return parseExecutionPayload(executionPayloadRpc);
}

Expand Down
15 changes: 11 additions & 4 deletions packages/beacon-node/src/execution/engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import {IExecutionEngine} from "./interface.js";
import {ExecutionEngineDisabled} from "./disabled.js";
import {ExecutionEngineHttp, ExecutionEngineHttpOpts, defaultExecutionEngineHttpOpts} from "./http.js";
import {
ExecutionEngineHttp,
ExecutionEngineModules,
ExecutionEngineHttpOpts,
defaultExecutionEngineHttpOpts,
} from "./http.js";
import {ExecutionEngineMock, ExecutionEngineMockOpts} from "./mock.js";

export {
Expand All @@ -15,17 +20,19 @@ export type ExecutionEngineOpts =
| ({mode?: "http"} & ExecutionEngineHttpOpts)
| ({mode: "mock"} & ExecutionEngineMockOpts)
| {mode: "disabled"};

export const defaultExecutionEngineOpts: ExecutionEngineOpts = defaultExecutionEngineHttpOpts;

export function initializeExecutionEngine(opts: ExecutionEngineOpts, signal: AbortSignal): IExecutionEngine {
export function initializeExecutionEngine(
opts: ExecutionEngineOpts,
modules: ExecutionEngineModules
): IExecutionEngine {
switch (opts.mode) {
case "mock":
return new ExecutionEngineMock(opts);
case "disabled":
return new ExecutionEngineDisabled();
case "http":
default:
return new ExecutionEngineHttp(opts, signal);
return new ExecutionEngineHttp(opts, modules);
}
}
10 changes: 10 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,11 @@ export function createLodestarMetrics(
help: "eth1 JsonHttpClient - total count of request errors",
labelNames: ["routeId"],
}),
retryCount: register.gauge<"routeId">({
name: "lodestar_eth1_http_client_request_retries_total",
help: "eth1 JsonHttpClient - total count of request retries",
labelNames: ["routeId"],
}),
requestUsedFallbackUrl: register.gauge({
name: "lodestar_eth1_http_client_request_used_fallback_url_total",
help: "eth1 JsonHttpClient - total count of requests on fallback url(s)",
Expand Down Expand Up @@ -1094,6 +1099,11 @@ export function createLodestarMetrics(
help: "ExecutionEngineHttp client - total count of request errors",
labelNames: ["routeId"],
}),
retryCount: register.gauge<"routeId">({
name: "lodestar_execution_engine_http_client_request_retries_total",
help: "ExecutionEngineHttp client - total count of request retries",
labelNames: ["routeId"],
}),
requestUsedFallbackUrl: register.gauge({
name: "lodestar_execution_engine_http_client_request_used_fallback_url_total",
help: "ExecutionEngineHttp client - total count of requests on fallback url(s)",
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export class BeaconNode {
{config, db, metrics, logger: logger.child(opts.logger.eth1), signal},
anchorState
),
executionEngine: initializeExecutionEngine(opts.executionEngine, signal),
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
executionBuilder: opts.executionBuilder.enabled
? initializeExecutionBuilder(opts.executionBuilder, config)
: undefined,
Expand Down
Loading