diff --git a/packages/validator/src/services/attestation.ts b/packages/validator/src/services/attestation.ts index 733c36ac7c13..e25b59da6d10 100644 --- a/packages/validator/src/services/attestation.ts +++ b/packages/validator/src/services/attestation.ts @@ -1,5 +1,5 @@ import {AbortSignal} from "@chainsafe/abort-controller"; -import {phase0, Slot, CommitteeIndex, ssz} from "@chainsafe/lodestar-types"; +import {phase0, Slot, ssz} from "@chainsafe/lodestar-types"; import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition"; import {sleep} from "@chainsafe/lodestar-utils"; import {Api} from "@chainsafe/lodestar-api"; @@ -50,8 +50,8 @@ export class AttestationService { private runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise => { // Fetch info first so a potential delay is absorved by the sleep() below - const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot)); - if (dutiesByCommitteeIndex.size === 0) { + const duties = this.dutiesService.getDutiesAtSlot(slot); + if (duties.length === 0) { return; } @@ -65,16 +65,23 @@ export class AttestationService { // Produce a single attestation for all committees, and clone mutate before signing const attestationNoCommittee = await this.produceAttestation(slot); - // await for all so if the Beacon node is overloaded it auto-throttles - // TODO: This approach is convervative to reduce the node's load, review + // Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go + await this.signAndPublishAttestations(slot, attestationNoCommittee, duties); + + // Step 2. after all attestations are submitted, make an aggregate. + // First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot) + await sleep(this.clock.msToSlot(slot + 2 / 3), signal); + this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3)); + + const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot)); + + // Then download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committeeIndex`. await Promise.all( - Array.from(dutiesByCommitteeIndex.entries()).map(async ([committeeIndex, duties]) => { - if (duties.length === 0) return; - await this.publishAttestationsAndAggregates(slot, committeeIndex, duties, attestationNoCommittee, signal).catch( - (e: Error) => { - this.logger.error("Error on attestations routine", {slot, committeeIndex}, e); - } - ); + Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) => { + const attestationData: phase0.AttestationData = {...attestationNoCommittee, index}; + return this.produceAndPublishAggregates(attestationData, duties); }) ); }; @@ -97,27 +104,6 @@ export class AttestationService { }); } - private async publishAttestationsAndAggregates( - slot: Slot, - committeeIndex: CommitteeIndex, - duties: AttDutyAndProof[], - attestationNoCommittee: phase0.AttestationData, - signal: AbortSignal - ): Promise { - // Step 1. Mutate, sign and publish `Attestation` for each validator. - const attestation = await this.publishAttestations(slot, committeeIndex, attestationNoCommittee, duties); - - // Step 2. If an attestation was produced, make an aggregate. - // First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot) - await sleep(this.clock.msToSlot(slot + 2 / 3), signal); - this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3)); - - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committeeIndex`. - await this.produceAndPublishAggregates(attestation, duties); - } - /** * Performs the first step of the attesting process: downloading one `Attestation` object. * Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex. @@ -134,56 +120,48 @@ export class AttestationService { } /** - * Performs the first step of the attesting process: downloading `Attestation` objects, - * signing them and returning them to the validator. - * - * https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting - * * Only one `Attestation` is downloaded from the BN. It is then signed by each * validator and the list of individually-signed `Attestation` objects is returned to the BN. */ - private async publishAttestations( + private async signAndPublishAttestations( slot: Slot, - committeeIndex: CommitteeIndex, attestationNoCommittee: phase0.AttestationData, duties: AttDutyAndProof[] - ): Promise { - const logCtx = {slot, index: committeeIndex}; - - const currentEpoch = computeEpochAtSlot(slot); + ): Promise { const signedAttestations: phase0.Attestation[] = []; + const headRootHex = toHexString(attestationNoCommittee.beaconBlockRoot); + const currentEpoch = computeEpochAtSlot(slot); - const attestation: phase0.AttestationData = {...attestationNoCommittee, index: committeeIndex}; - - for (const {duty} of duties) { - const logCtxValidator = { - ...logCtx, - head: toHexString(attestation.beaconBlockRoot), - validatorIndex: duty.validatorIndex, - }; - try { - signedAttestations.push(await this.validatorStore.signAttestation(duty, attestation, currentEpoch)); - this.logger.debug("Signed attestation", logCtxValidator); - } catch (e) { - this.metrics?.attestaterError.inc({error: "sign"}); - this.logger.error("Error signing attestation", logCtxValidator, e as Error); - } - } + await Promise.all( + duties.map(async ({duty}) => { + const index = duty.committeeIndex; + const attestationData: phase0.AttestationData = {...attestationNoCommittee, index}; + const logCtxValidator = {slot, index, head: headRootHex, validatorIndex: duty.validatorIndex}; + + try { + signedAttestations.push(await this.validatorStore.signAttestation(duty, attestationData, currentEpoch)); + this.logger.debug("Signed attestation", logCtxValidator); + } catch (e) { + this.metrics?.attestaterError.inc({error: "sign"}); + this.logger.error("Error signing attestation", logCtxValidator, e as Error); + } + }) + ); - this.metrics?.attesterStepCallPublishAttestation.observe(this.clock.secFromSlot(attestation.slot + 1 / 3)); + this.metrics?.attesterStepCallPublishAttestation.observe(this.clock.secFromSlot(slot + 1 / 3)); + // Step 2. Publish all `Attestations` in one go if (signedAttestations.length > 0) { try { await this.api.beacon.submitPoolAttestations(signedAttestations); - this.logger.info("Published attestations", {...logCtx, count: signedAttestations.length}); + this.logger.info("Published attestations", {slot, count: signedAttestations.length}); this.metrics?.publishedAttestations.inc(signedAttestations.length); } catch (e) { + // Note: metric counts only 1 since we don't know how many signedAttestations are invalid this.metrics?.attestaterError.inc({error: "publish"}); - this.logger.error("Error publishing attestations", logCtx, e as Error); + this.logger.error("Error publishing attestations", {slot}, e as Error); } } - - return attestation; } /**