Skip to content

Commit

Permalink
Implement distributed sync committee aggregation selection
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig committed Apr 6, 2023
1 parent 35c0243 commit 2ad316e
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 11 deletions.
101 changes: 97 additions & 4 deletions packages/validator/src/services/syncCommittee.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {ChainForkConfig} from "@lodestar/config";
import {Slot, CommitteeIndex, altair, Root} from "@lodestar/types";
import {Slot, CommitteeIndex, altair, Root, BLSSignature} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {Api, ApiError} from "@lodestar/api";
import {computeEpochAtSlot, isSyncCommitteeAggregator} from "@lodestar/state-transition";
import {Api, ApiError, routes} from "@lodestar/api";
import {IClock, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
Expand All @@ -14,6 +14,7 @@ import {ValidatorEventEmitter} from "./emitter.js";

type SyncCommitteeServiceOpts = {
scAfterBlockDelaySlotFraction?: number;
distributedAggregationSelection?: boolean;
};

/**
Expand All @@ -33,7 +34,9 @@ export class SyncCommitteeService {
private readonly metrics: Metrics | null,
private readonly opts?: SyncCommitteeServiceOpts
) {
this.dutiesService = new SyncCommitteeDutiesService(config, logger, api, clock, validatorStore, metrics);
this.dutiesService = new SyncCommitteeDutiesService(config, logger, api, clock, validatorStore, metrics, {
distributedAggregationSelection: opts?.distributedAggregationSelection,
});

// At most every slot, check existing duties from SyncCommitteeDutiesService and run tasks
clock.runEverySlot(this.runSyncCommitteeTasks);
Expand All @@ -56,6 +59,18 @@ export class SyncCommitteeService {
return;
}

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitSyncCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other sync committee tasks but must be finished before starting
// sync committee contributions as it is required to correctly determine if validator is aggregator
// and to produce a ContributionAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(dutiesAtSlot, slot, signal).catch((e) =>
this.logger.error("Error on sync committee aggregation selection", {slot}, e)
);
}

// unlike Attestation, SyncCommitteeSignature could be published asap
// especially with lodestar, it's very busy at 1/3 of slot
// see https://github.com/ChainSafe/lodestar/issues/4608
Expand Down Expand Up @@ -214,4 +229,82 @@ export class SyncCommitteeService {
}
}
}

/**
* Performs additional sync committee contribution tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should produce sync committee contribution
* 3. Mutate duty objects to set selection proofs for aggregators
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
duties: SyncDutyAndProofs[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.SyncCommitteeSelection[] = [];

for (const {duty, selectionProofs} of duties) {
const validatorSelections: routes.validator.SyncCommitteeSelection[] = selectionProofs.map(
({subcommitteeIndex, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex,
selectionProof: partialSelectionProof as BLSSignature,
})
);
partialSelections.push(...validatorSelections);
}

this.logger.debug("Submitting partial sync committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitSyncCommitteeSelections(partialSelections),
// Exit sync committee contributions flow if there is no response after 2/3 of slot.
// This is in contrast to attestations aggregations flow which is already exited at 1/3 of the slot
// because for sync committee is not required to resubscribe to subnets as beacon node will assume
// validator always aggregates. This allows us to wait until we have to produce sync committee contributions.
// Note that the sync committee contributions flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_sync_committee_aggregator` in duties service is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.clock.msToSlot(slot + 2 / 3), signal),
]);

if (!res) {
throw new Error("No response after 2/3 of slot");
}
ApiError.assert(res);

const combinedSelections = res.response.data;
this.logger.debug("Received combined sync committee selection proofs", {slot, count: combinedSelections.length});

for (const dutyAndProofs of duties) {
const {validatorIndex, subnets} = dutyAndProofs.duty;

for (const subnet of subnets) {
const logCtxValidator = {slot, index: subnet, validatorIndex};

const combinedSelection = combinedSelections.find(
(s) => s.validatorIndex === validatorIndex && s.slot === slot && s.subcommitteeIndex === subnet
);

if (!combinedSelection) {
this.logger.warn("Did not receive combined sync committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isSyncCommitteeAggregator(combinedSelection.selectionProof);

if (isAggregator) {
const selectionProofObject = dutyAndProofs.selectionProofs.find((p) => p.subcommitteeIndex === subnet);
if (selectionProofObject) {
// Update selection proof by mutating proof objects in duty object
selectionProofObject.selectionProof = combinedSelection.selectionProof;
}
}
}
}
}
}
31 changes: 25 additions & 6 deletions packages/validator/src/services/syncCommitteeDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ export type SyncDutySubnet = {
export type SyncSelectionProof = {
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
selectionProof: BLSSignature | null;
/** This value will only be set if validator is part of distributed cluster and only has a key share */
partialSelectionProof?: BLSSignature;
subcommitteeIndex: number;
};

Expand All @@ -60,6 +62,10 @@ export type SyncDutyAndProofs = {
// To assist with readability
type DutyAtPeriod = {duty: SyncDutySubnet};

type SyncCommitteeDutiesServiceOpts = {
distributedAggregationSelection?: boolean;
};

/**
* Validators are part of a static long (~27h) sync committee, and part of static subnets.
* However, the isAggregator role changes per slot.
Expand All @@ -74,7 +80,8 @@ export class SyncCommitteeDutiesService {
private readonly api: Api,
clock: IClock,
private readonly validatorStore: ValidatorStore,
metrics: Metrics | null
metrics: Metrics | null,
private readonly opts?: SyncCommitteeDutiesServiceOpts
) {
// Running this task every epoch is safe since a re-org of many epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
Expand Down Expand Up @@ -285,11 +292,23 @@ export class SyncCommitteeDutiesService {
const dutiesAndProofs: SyncSelectionProof[] = [];
for (const subnet of duty.subnets) {
const selectionProof = await this.validatorStore.signSyncCommitteeSelectionProof(duty.pubkey, slot, subnet);
dutiesAndProofs.push({
// selectionProof === null is used to check if is aggregator
selectionProof: isSyncCommitteeAggregator(selectionProof) ? selectionProof : null,
subcommitteeIndex: subnet,
});
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_sync_committee_aggregator` would produce incorrect result.
// Sync committee service will exchange partial for combined selection proofs retrieved from
// distributed validator middleware client and determine aggregators at beginning of every slot.
dutiesAndProofs.push({
selectionProof: null,
partialSelectionProof: selectionProof,
subcommitteeIndex: subnet,
});
} else {
dutiesAndProofs.push({
// selectionProof === null is used to check if is aggregator
selectionProof: isSyncCommitteeAggregator(selectionProof) ? selectionProof : null,
subcommitteeIndex: subnet,
});
}
}
return dutiesAndProofs;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ export class Validator {
emitter,
chainHeaderTracker,
metrics,
{scAfterBlockDelaySlotFraction: opts.scAfterBlockDelaySlotFraction}
{
scAfterBlockDelaySlotFraction: opts.scAfterBlockDelaySlotFraction,
distributedAggregationSelection: opts.distributed,
}
);

this.config = config;
Expand Down

0 comments on commit 2ad316e

Please sign in to comment.