Skip to content

Commit

Permalink
Submit unaggregated attestations in one go (#3923)
Browse files Browse the repository at this point in the history
* Submit unaggregated attestations in one go

* Review PR

Co-authored-by: dapplion <[email protected]>
  • Loading branch information
twoeths and dapplion authored Apr 15, 2022
1 parent 8898b2a commit 9f0c7a6
Showing 1 changed file with 43 additions and 65 deletions.
108 changes: 43 additions & 65 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {AbortSignal} from "@chainsafe/abort-controller";
import {phase0, Slot, CommitteeIndex, ssz} from "@chainsafe/lodestar-types";
import {phase0, Slot, ssz} from "@chainsafe/lodestar-types";
import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {sleep} from "@chainsafe/lodestar-utils";
import {Api} from "@chainsafe/lodestar-api";
Expand Down Expand Up @@ -50,8 +50,8 @@ export class AttestationService {

private runAttestationTasks = async (slot: Slot, signal: AbortSignal): Promise<void> => {
// Fetch info first so a potential delay is absorved by the sleep() below
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot));
if (dutiesByCommitteeIndex.size === 0) {
const duties = this.dutiesService.getDutiesAtSlot(slot);
if (duties.length === 0) {
return;
}

Expand All @@ -65,16 +65,23 @@ export class AttestationService {
// Produce a single attestation for all committees, and clone mutate before signing
const attestationNoCommittee = await this.produceAttestation(slot);

// await for all so if the Beacon node is overloaded it auto-throttles
// TODO: This approach is convervative to reduce the node's load, review
// Step 1. Mutate, and sign `Attestation` for each validator. Then publish all `Attestations` in one go
await this.signAndPublishAttestations(slot, attestationNoCommittee, duties);

// Step 2. after all attestations are submitted, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot));

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committeeIndex`.
await Promise.all(
Array.from(dutiesByCommitteeIndex.entries()).map(async ([committeeIndex, duties]) => {
if (duties.length === 0) return;
await this.publishAttestationsAndAggregates(slot, committeeIndex, duties, attestationNoCommittee, signal).catch(
(e: Error) => {
this.logger.error("Error on attestations routine", {slot, committeeIndex}, e);
}
);
Array.from(dutiesByCommitteeIndex.entries()).map(([index, duties]) => {
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
return this.produceAndPublishAggregates(attestationData, duties);
})
);
};
Expand All @@ -97,27 +104,6 @@ export class AttestationService {
});
}

private async publishAttestationsAndAggregates(
slot: Slot,
committeeIndex: CommitteeIndex,
duties: AttDutyAndProof[],
attestationNoCommittee: phase0.AttestationData,
signal: AbortSignal
): Promise<void> {
// Step 1. Mutate, sign and publish `Attestation` for each validator.
const attestation = await this.publishAttestations(slot, committeeIndex, attestationNoCommittee, duties);

// Step 2. If an attestation was produced, make an aggregate.
// First, wait until the `aggregation_production_instant` (2/3rds of the way though the slot)
await sleep(this.clock.msToSlot(slot + 2 / 3), signal);
this.metrics?.attesterStepCallProduceAggregate.observe(this.clock.secFromSlot(slot + 2 / 3));

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committeeIndex`.
await this.produceAndPublishAggregates(attestation, duties);
}

/**
* Performs the first step of the attesting process: downloading one `Attestation` object.
* Beacon node's endpoint produceAttestationData return data is not dependant on committeeIndex.
Expand All @@ -134,56 +120,48 @@ export class AttestationService {
}

/**
* Performs the first step of the attesting process: downloading `Attestation` objects,
* signing them and returning them to the validator.
*
* https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
*
* Only one `Attestation` is downloaded from the BN. It is then signed by each
* validator and the list of individually-signed `Attestation` objects is returned to the BN.
*/
private async publishAttestations(
private async signAndPublishAttestations(
slot: Slot,
committeeIndex: CommitteeIndex,
attestationNoCommittee: phase0.AttestationData,
duties: AttDutyAndProof[]
): Promise<phase0.AttestationData> {
const logCtx = {slot, index: committeeIndex};

const currentEpoch = computeEpochAtSlot(slot);
): Promise<void> {
const signedAttestations: phase0.Attestation[] = [];
const headRootHex = toHexString(attestationNoCommittee.beaconBlockRoot);
const currentEpoch = computeEpochAtSlot(slot);

const attestation: phase0.AttestationData = {...attestationNoCommittee, index: committeeIndex};

for (const {duty} of duties) {
const logCtxValidator = {
...logCtx,
head: toHexString(attestation.beaconBlockRoot),
validatorIndex: duty.validatorIndex,
};
try {
signedAttestations.push(await this.validatorStore.signAttestation(duty, attestation, currentEpoch));
this.logger.debug("Signed attestation", logCtxValidator);
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
}
}
await Promise.all(
duties.map(async ({duty}) => {
const index = duty.committeeIndex;
const attestationData: phase0.AttestationData = {...attestationNoCommittee, index};
const logCtxValidator = {slot, index, head: headRootHex, validatorIndex: duty.validatorIndex};

try {
signedAttestations.push(await this.validatorStore.signAttestation(duty, attestationData, currentEpoch));
this.logger.debug("Signed attestation", logCtxValidator);
} catch (e) {
this.metrics?.attestaterError.inc({error: "sign"});
this.logger.error("Error signing attestation", logCtxValidator, e as Error);
}
})
);

this.metrics?.attesterStepCallPublishAttestation.observe(this.clock.secFromSlot(attestation.slot + 1 / 3));
this.metrics?.attesterStepCallPublishAttestation.observe(this.clock.secFromSlot(slot + 1 / 3));

// Step 2. Publish all `Attestations` in one go
if (signedAttestations.length > 0) {
try {
await this.api.beacon.submitPoolAttestations(signedAttestations);
this.logger.info("Published attestations", {...logCtx, count: signedAttestations.length});
this.logger.info("Published attestations", {slot, count: signedAttestations.length});
this.metrics?.publishedAttestations.inc(signedAttestations.length);
} catch (e) {
// Note: metric counts only 1 since we don't know how many signedAttestations are invalid
this.metrics?.attestaterError.inc({error: "publish"});
this.logger.error("Error publishing attestations", logCtx, e as Error);
this.logger.error("Error publishing attestations", {slot}, e as Error);
}
}

return attestation;
}

/**
Expand Down

0 comments on commit 9f0c7a6

Please sign in to comment.