Skip to content

Commit

Permalink
feat: N proposals from one validator in epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
vgorkavenko committed Jul 6, 2023
1 parent ed95a4a commit 13a8ce5
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 35 deletions.
14 changes: 8 additions & 6 deletions src/duty/propose/propose.rewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ export class ProposeRewards {
let propose_earned_reward = 0n;
let propose_missed_reward = 0n;
const propose_penalty = 0n;
if (v.block_proposed) {
const attRewardSum = blocksAttRewards.get(v.block_to_propose);
const syncRewardSum = blocksSyncRewards.get(v.block_to_propose);
propose_earned_reward = attRewardSum + syncRewardSum;
} else {
propose_missed_reward = attestationsAvg + syncAvg;
for (const [block, is_proposed] of v.block_proposals) {
if (is_proposed) {
const attRewardSum = blocksAttRewards.get(block);
const syncRewardSum = blocksSyncRewards.get(block);
propose_earned_reward += attRewardSum + syncRewardSum;
} else {
propose_missed_reward += attestationsAvg + syncAvg;
}
}
this.summary.epoch(epoch).set({
epoch,
Expand Down
24 changes: 17 additions & 7 deletions src/duty/propose/propose.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@ export class ProposeService {
this.logger.log(`Start getting proposers duties info`);
const proposersDutyInfo = await this.clClient.getCanonicalProposerDuties(epoch);
this.logger.log(`Processing proposers duties info`);
for (const prop of proposersDutyInfo) {
const index = Number(prop.validator_index);
const slot = Number(prop.slot);
const blockHeader = await this.clClient.getBlockHeader(prop.slot);
const proposersDutyInfoByIndex = proposersDutyInfo.reduce((acc, cur) => {
const index = cur.validator_index;
if (!acc[index]) {
acc[index] = [];
}
acc[index].push(cur);
return acc;
}, {});
for (const [index, props] of Object.entries(proposersDutyInfoByIndex)) {
const propsResults: [number, boolean][] = await Promise.all(
(<any[]>props).map(async (p) => {
const blockHeader = await this.clClient.getBlockHeader(p.slot);
return [Number(p.slot), !!blockHeader];
}),
);
this.summary.epoch(epoch).set({
epoch,
val_id: index,
val_id: Number(index),
is_proposer: true,
block_to_propose: slot,
block_proposed: !!blockHeader,
block_proposals: propsResults,
});
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/duty/summary/summary.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ export interface ValidatorDutySummary {
val_effective_balance?: bigint;
///
is_proposer?: boolean;
block_to_propose?: number;
block_proposed?: boolean;
block_proposals?: [number, boolean][];
///
is_sync?: boolean;
sync_percent?: number;
Expand Down
24 changes: 17 additions & 7 deletions src/storage/clickhouse/clickhouse.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,29 +534,39 @@ export const otherValidatorsSummaryStatsQuery = (epoch: Epoch): string => `
)
`;

export const userNodeOperatorsProposesStatsLastNEpochQuery = (epoch: Epoch, epochInterval = 120): string => `
export const userNodeOperatorsProposesStatsLastNEpochQuery = (epoch: Epoch, epochInterval = 120, validatorIndexes = []): string => {
let strFilterValIndexes = '';
if (validatorIndexes.length > 0) {
strFilterValIndexes = `AND val_id in [${validatorIndexes.map((i) => `'${i}'`).join(',')}]`;
}
return `
SELECT
val_nos_module_id,
val_nos_id,
SUM(a) as all,
SUM(p + m) as all,
SUM(p) as proposed,
SUM(m) as missed
FROM (
SELECT
val_nos_module_id,
val_nos_id,
COUNT(block_proposed) as a,
IF(block_proposed = 0, count(block_proposed), 0) as m
arrayCount(proposal -> tupleElement(proposal, 2) == 1, block_proposals) as p,
arrayCount(proposal -> tupleElement(proposal, 2) == 0, block_proposals) as m
FROM (
SELECT val_nos_module_id, val_nos_id, block_proposed
SELECT val_nos_module_id, val_nos_id, block_proposals
FROM validators_summary
WHERE
is_proposer = 1 AND val_stuck = 0 AND (epoch <= ${epoch} AND epoch > (${epoch} - ${epochInterval}))
is_proposer = 1 AND
val_stuck = 0 AND
(epoch <= ${epoch} AND epoch > (${epoch} - ${epochInterval}))
${strFilterValIndexes}
LIMIT 1 BY epoch, val_id
)
GROUP BY val_nos_module_id, val_nos_id, block_proposed
GROUP BY val_nos_module_id, val_nos_id, block_proposals
)
GROUP BY val_nos_module_id, val_nos_id
`;
};

export const epochMetadata = (epoch: Epoch): string => `
SELECT *
Expand Down
43 changes: 30 additions & 13 deletions src/storage/clickhouse/clickhouse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import {
validatorCountByConditionAttestationLastNEpochQuery,
validatorCountHighAvgIncDelayAttestationOfNEpochQuery,
validatorQuantile0001BalanceDeltasQuery,
validatorsCountByConditionMissProposeQuery,
validatorsCountWithNegativeDeltaQuery,
validatorsCountWithSyncParticipationByConditionLastNEpochQuery,
} from './clickhouse.constants';
Expand Down Expand Up @@ -63,6 +62,7 @@ import migration_000004_epoch_processing from './migrations/migration_000004_epo
import migration_000005_withdrawals from './migrations/migration_000005_withdrawals';
import migration_000006_stuck_validators from './migrations/migration_000006_stuck_validators';
import migration_000007_module_id from './migrations/migration_000007_module_id';
import migration_000007_n_proposals from './migrations/migration_000007_n_proposals';

@Injectable()
export class ClickhouseService implements OnModuleInit {
Expand Down Expand Up @@ -232,11 +232,35 @@ export class ClickhouseService implements OnModuleInit {
migration_000004_epoch_processing,
migration_000005_withdrawals,
migration_000006_stuck_validators,
migration_000007_n_proposals,
migration_000007_module_id,
];
for (const query of migrations) {
await this.db.exec({ query });
}

const { blockToProposeIsExist, blockProposedIsExist } = (
await this.select(`SELECT
hasColumnInTable(currentDatabase(), 'validators_summary', 'block_to_propose') as blockToProposeIsExist,
hasColumnInTable(currentDatabase(), 'validators_summary', 'block_proposed') as blockProposedIsExist
`)
)[0];
if (blockToProposeIsExist != blockProposedIsExist) {
throw Error(
'Corrupted `validators_summary` table. ' +
"There shouldn't be `block_to_propose` and `block_proposed` columns after migration. " +
'Please, resolve this manually.',
);
}
if (blockToProposeIsExist && blockProposedIsExist) {
await this.db.exec({
query: `ALTER TABLE validators_summary
UPDATE block_proposals = [(block_to_propose, block_proposed)]
WHERE block_to_propose IS NOT NULL AND block_proposals = []`,
});
await this.db.exec({ query: `ALTER TABLE validators_summary DROP COLUMN block_to_propose` });
await this.db.exec({ query: `ALTER TABLE validators_summary DROP COLUMN block_proposed` });
}
}

public async getAvgValidatorBalanceDelta(epoch: Epoch): Promise<NOsDelta[]> {
Expand Down Expand Up @@ -474,13 +498,9 @@ export class ClickhouseService implements OnModuleInit {
epoch: Epoch,
validatorIndexes: string[] = [],
): Promise<NOsValidatorsByConditionProposeCount[]> {
return (
await this.select<NOsValidatorsByConditionProposeCount[]>(
validatorsCountByConditionMissProposeQuery(epoch, validatorIndexes, 'block_proposed = 1'),
)
).map((v) => ({
return (await this.select<NOsProposesStats[]>(userNodeOperatorsProposesStatsLastNEpochQuery(epoch, 1, validatorIndexes))).map((v) => ({
...v,
amount: Number(v.amount),
amount: Number(v.proposed),
}));
}

Expand All @@ -492,13 +512,9 @@ export class ClickhouseService implements OnModuleInit {
epoch: Epoch,
validatorIndexes: string[] = [],
): Promise<NOsValidatorsByConditionProposeCount[]> {
return (
await this.select<NOsValidatorsByConditionProposeCount[]>(
validatorsCountByConditionMissProposeQuery(epoch, validatorIndexes, 'block_proposed = 0'),
)
).map((v) => ({
return (await this.select<NOsProposesStats[]>(userNodeOperatorsProposesStatsLastNEpochQuery(epoch, 1, validatorIndexes))).map((v) => ({
...v,
amount: Number(v.amount),
amount: Number(v.missed),
}));
}

Expand Down Expand Up @@ -572,6 +588,7 @@ export class ClickhouseService implements OnModuleInit {
return (await this.select<NOsProposesStats[]>(userNodeOperatorsProposesStatsLastNEpochQuery(epoch, epochInterval))).map((v) => ({
...v,
all: Number(v.all),
proposed: Number(v.proposed),
missed: Number(v.missed),
}));
}
Expand Down
1 change: 1 addition & 0 deletions src/storage/clickhouse/clickhouse.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export interface NOsProposesStats {
val_nos_module_id: string;
val_nos_id: string;
all: number;
proposed: number;
missed: number;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const sql = `
ALTER TABLE validators_summary
ADD COLUMN IF NOT EXISTS block_proposals Array(Tuple(Int64, UInt8)) DEFAULT [] AFTER block_proposed
`;

export default sql;

0 comments on commit 13a8ce5

Please sign in to comment.