Skip to content

Commit

Permalink
test: remove chain mocks (#5582)
Browse files Browse the repository at this point in the history
* Remove chain mocks

* Use actual BeaconChain class in network tests

* fix type errors

---------

Co-authored-by: Cayman <[email protected]>
  • Loading branch information
dapplion and wemeetagain authored May 31, 2023
1 parent 26c7a73 commit 3fe9afd
Show file tree
Hide file tree
Showing 19 changed files with 281 additions and 706 deletions.
7 changes: 5 additions & 2 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,18 @@ export class NetworkCore implements INetworkCore {

// Must goodbye and disconnect before stopping libp2p
await this.peerManager.goodbyeAndDisconnectAllPeers();
this.logger.debug("network sent goodbye to all peers");
await this.peerManager.stop();
this.logger.debug("network peerManager closed");
await this.gossip.stop();

this.logger.debug("network gossip closed");
await this.reqResp.stop();
await this.reqResp.unregisterAllProtocols();

this.logger.debug("network reqResp closed");
this.attnetsService.stop();
this.syncnetsService.stop();
await this.libp2p.stop();
this.logger.debug("network lib2p closed");

this.closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ export class WorkerNetworkCore implements INetworkCore {

async close(): Promise<void> {
await this.getApi().close();
this.modules.logger.debug("terminating network worker");
await Thread.terminate(this.modules.workerApi as unknown as Thread);
this.modules.logger.debug("terminated network worker");
}

async test(): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {PeerAction} from "./peers/index.js";
*/

export interface INetwork extends INetworkCorePublic {
readonly closed: boolean;
events: INetworkEventBus;

getConnectedPeers(): PeerIdStr[];
Expand Down
22 changes: 11 additions & 11 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type NetworkModules = {
config: BeaconConfig;
logger: LoggerNode;
chain: IBeaconChain;
signal: AbortSignal;
networkEventBus: NetworkEventBus;
aggregatorTracker: AggregatorTracker;
networkProcessor: NetworkProcessor;
Expand All @@ -54,7 +53,6 @@ export type NetworkInitModules = {
chain: IBeaconChain;
db: IBeaconDb;
getReqRespHandler: GetReqRespHandlerFn;
signal: AbortSignal;
// Optionally pass custom GossipHandlers, for testing
gossipHandlers?: GossipHandlers;
};
Expand All @@ -76,7 +74,8 @@ export class Network implements INetwork {
private readonly config: BeaconConfig;
private readonly clock: IClock;
private readonly chain: IBeaconChain;
private readonly signal: AbortSignal;
// Used only for sleep() statements
private readonly controller: AbortController;

// TODO: Review
private readonly networkProcessor: NetworkProcessor;
Expand All @@ -86,15 +85,14 @@ export class Network implements INetwork {
private subscribedToCoreTopics = false;
private connectedPeers = new Set<PeerIdStr>();
private regossipBlsChangesPromise: Promise<void> | null = null;
private closed = false;

constructor(modules: NetworkModules) {
this.peerId = modules.peerId;
this.config = modules.config;
this.logger = modules.logger;
this.chain = modules.chain;
this.clock = modules.chain.clock;
this.signal = modules.signal;
this.controller = new AbortController();
this.events = modules.networkEventBus;
this.networkProcessor = modules.networkProcessor;
this.core = modules.core;
Expand All @@ -105,7 +103,6 @@ export class Network implements INetwork {
this.chain.emitter.on(routes.events.EventType.head, this.onHead);
this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
modules.signal.addEventListener("abort", this.close.bind(this), {once: true});
}

static async init({
Expand All @@ -115,7 +112,6 @@ export class Network implements INetwork {
metrics,
chain,
db,
signal,
gossipHandlers,
peerId,
peerStoreDir,
Expand Down Expand Up @@ -176,26 +172,30 @@ export class Network implements INetwork {
config,
logger,
chain,
signal,
networkEventBus: events,
aggregatorTracker,
networkProcessor,
core,
});
}

get closed(): boolean {
return this.controller.signal.aborted;
}

/** Destroy this instance. Can only be called once. */
async close(): Promise<void> {
if (this.closed) return;
// Used only for sleep() statements
this.controller.abort();

this.events.off(NetworkEvent.peerConnected, this.onPeerConnected);
this.events.off(NetworkEvent.peerDisconnected, this.onPeerDisconnected);
this.chain.emitter.off(routes.events.EventType.head, this.onHead);
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
await this.core.close();

this.closed = true;
this.logger.debug("network core closed");
}

async scrapeMetrics(): Promise<string> {
Expand Down Expand Up @@ -583,7 +583,7 @@ export class Network implements INetwork {
private waitOneThirdOfSlot = async (slot: number): Promise<void> => {
const secAtSlot = computeTimeAtSlot(this.config, slot + 1 / 3, this.chain.genesisTime);
const msToSlot = secAtSlot * 1000 - Date.now();
await sleep(msToSlot, this.signal);
await sleep(msToSlot, this.controller.signal);
};

private onHead = async (): Promise<void> => {
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {BitArray} from "@chainsafe/ssz";
import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {BeaconConfig} from "@lodestar/config";
import {allForks, altair, phase0} from "@lodestar/types";
import {withTimeout} from "@lodestar/utils";
import {LoggerNode} from "@lodestar/logger/node";
import {GoodByeReasonCode, GOODBYE_KNOWN_CODES, Libp2pEvent} from "../../constants/index.js";
import {IClock} from "../../util/clock.js";
Expand Down Expand Up @@ -642,7 +643,8 @@ export class PeerManager {
this.metrics?.peerLongConnectionDisconnect.inc({reason});
}

await this.reqResp.sendGoodbye(peer, BigInt(goodbye));
// Wrap with shorter timeout than regular ReqResp requests to speed up shutdown
await withTimeout(() => this.reqResp.sendGoodbye(peer, BigInt(goodbye)), 1_000);
} catch (e) {
this.logger.verbose("Failed to send goodbye", {peer: prettyPrintPeerId(peer)}, e as Error);
} finally {
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ export class BeaconNode {
peerId,
peerStoreDir,
getReqRespHandler: getReqRespHandlers({db, chain}),
signal,
});

const sync = new BeaconSync(opts.sync, {
Expand Down
138 changes: 38 additions & 100 deletions packages/beacon-node/test/e2e/network/gossipsub.test.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,25 @@
import sinon from "sinon";
import {expect} from "chai";
import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {sleep} from "@lodestar/utils";

import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ssz} from "@lodestar/types";
import {getReqRespHandlers, Network, NetworkInitModules} from "../../../src/network/index.js";
import {defaultNetworkOptions, NetworkOptions} from "../../../src/network/options.js";
import {Network} from "../../../src/network/index.js";
import {GossipType, GossipHandlers} from "../../../src/network/gossip/index.js";
import {connect, onPeerConnect, getNetworkForTest} from "../../utils/network.js";

describe("gossipsub / main thread", function () {
runTests.bind(this)({useWorker: false});
});

import {MockBeaconChain, zeroProtoBlock} from "../../utils/mocks/chain/chain.js";
import {createNetworkModules, connect, onPeerConnect} from "../../utils/network.js";
import {generateState} from "../../utils/state.js";
import {StubbedBeaconDb} from "../../utils/stub/index.js";
import {testLogger} from "../../utils/logger.js";

const multiaddr = "/ip4/127.0.0.1/tcp/0";

const opts: NetworkOptions = {
...defaultNetworkOptions,
maxPeers: 1,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
discv5FirstQueryDelayMs: 0,
discv5: null,
skipParamsLog: true,
};

// Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache
/* eslint-disable @typescript-eslint/naming-convention */
const config = createChainForkConfig({
...defaultChainConfig,
ALTAIR_FORK_EPOCH: 1,
BELLATRIX_FORK_EPOCH: 1,
CAPELLA_FORK_EPOCH: 1,
describe("gossipsub / worker", function () {
runTests.bind(this)({useWorker: true});
});
const START_SLOT = computeStartSlotAtEpoch(config.ALTAIR_FORK_EPOCH);

describe("gossipsub", function () {
if (this.timeout() < 15 * 1000) this.timeout(15 * 1000);
this.retries(2); // This test fail sometimes, with a 5% rate.
/* eslint-disable mocha/no-top-level-hooks */

function runTests(this: Mocha.Suite, {useWorker}: {useWorker: boolean}): void {
if (this.timeout() < 15 * 1000) this.timeout(150 * 1000);
this.retries(0); // This test fail sometimes, with a 5% rate.

const afterEachCallbacks: (() => Promise<void> | void)[] = [];
afterEach(async () => {
Expand All @@ -50,75 +29,34 @@ describe("gossipsub", function () {
}
});

// Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache
/* eslint-disable @typescript-eslint/naming-convention */
const config = createChainForkConfig({
...defaultChainConfig,
ALTAIR_FORK_EPOCH: 1,
BELLATRIX_FORK_EPOCH: 1,
CAPELLA_FORK_EPOCH: 1,
});
const START_SLOT = computeStartSlotAtEpoch(config.ALTAIR_FORK_EPOCH);

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function mockModules(gossipHandlersPartial?: Partial<GossipHandlers>) {
const controller = new AbortController();

const block = ssz.phase0.SignedBeaconBlock.defaultValue();
const state = generateState({
finalizedCheckpoint: {
epoch: 0,
root: ssz.phase0.BeaconBlock.hashTreeRoot(block.message),
},
});

const beaconConfig = createBeaconConfig(config, state.genesisValidatorsRoot);
const chain = new MockBeaconChain({
genesisTime: 0,
chainId: 0,
networkId: BigInt(0),
state,
config: beaconConfig,
});

chain.forkChoice.getHead = () => {
return {
...zeroProtoBlock,
slot: START_SLOT,
};
};

const db = new StubbedBeaconDb(config);
const gossipHandlers = gossipHandlersPartial as GossipHandlers;

const loggerA = testLogger("A");
const loggerB = testLogger("B");

const modules: Omit<NetworkInitModules, "opts" | "peerId" | "logger"> = {
config: beaconConfig,
chain,
db,
getReqRespHandler: getReqRespHandlers({db, chain}),
gossipHandlers,
signal: controller.signal,
metrics: null,
};
const netA = await Network.init({
...modules,
...(await createNetworkModules(multiaddr, undefined, opts)),
logger: loggerA,
});
const netB = await Network.init({
...modules,
...(await createNetworkModules(multiaddr, undefined, opts)),
logger: loggerB,
});
const [netA, closeA] = await getNetworkForTest("A", config, {opts: {useWorker}, gossipHandlersPartial});
const [netB, closeB] = await getNetworkForTest("B", config, {opts: {useWorker}, gossipHandlersPartial});

afterEachCallbacks.push(async () => {
await chain.close();
controller.abort();
await Promise.all([netA.close(), netB.close()]);
sinon.restore();
await closeA();
await closeB();
});

return {netA, netB, chain, controller};
return {netA, netB};
}

it("Publish and receive a voluntaryExit", async function () {
let onVoluntaryExit: (ve: Uint8Array) => void;
const onVoluntaryExitPromise = new Promise<Uint8Array>((resolve) => (onVoluntaryExit = resolve));

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.voluntary_exit]: async ({serializedData}) => {
onVoluntaryExit(serializedData);
},
Expand All @@ -132,7 +70,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -151,7 +89,7 @@ describe("gossipsub", function () {
let onBlsToExecutionChange: (blsToExec: Uint8Array) => void;
const onBlsToExecutionChangePromise = new Promise<Uint8Array>((resolve) => (onBlsToExecutionChange = resolve));

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.bls_to_execution_change]: async ({serializedData}) => {
onBlsToExecutionChange(serializedData);
},
Expand All @@ -165,7 +103,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -185,7 +123,7 @@ describe("gossipsub", function () {
(resolve) => (onLightClientOptimisticUpdate = resolve)
);

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.light_client_optimistic_update]: async ({serializedData}) => {
onLightClientOptimisticUpdate(serializedData);
},
Expand All @@ -199,7 +137,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -222,7 +160,7 @@ describe("gossipsub", function () {
(resolve) => (onLightClientFinalityUpdate = resolve)
);

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.light_client_finality_update]: async ({serializedData}) => {
onLightClientFinalityUpdate(serializedData);
},
Expand All @@ -236,7 +174,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -250,7 +188,7 @@ describe("gossipsub", function () {
const optimisticUpdate = await onLightClientFinalityUpdatePromise;
expect(optimisticUpdate).to.deep.equal(ssz.capella.LightClientFinalityUpdate.serialize(lightClientFinalityUpdate));
});
});
}

async function hasSomeMeshPeer(net: Network): Promise<boolean> {
return Object.values(await net.dumpMeshPeers()).some((peers) => peers.length > 0);
Expand Down
Loading

0 comments on commit 3fe9afd

Please sign in to comment.