Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate hot blocks if using blinded storage #5940

Merged
merged 9 commits into from
Jul 18, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public interface Database extends AutoCloseable {
@MustBeClosed
Stream<SignedBeaconBlock> streamHotBlocks();

long countUnblindedFinalizedBlocks();

@MustBeClosed
Stream<Map.Entry<Bytes32, BlockCheckpoints>> streamBlockCheckpoints();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return dao.streamBlindedHotBlocks().map(this::getUnblindedBlock);
}

@Override
public long countUnblindedFinalizedBlocks() {
return 0;
}

@Override
@MustBeClosed
public Stream<SignedBeaconBlock> streamFinalizedBlocks(
Expand All @@ -148,29 +153,29 @@ protected void storeAnchorStateAndBlock(
final CombinedUpdaterBlinded updater,
final BeaconState anchorState,
final SignedBeaconBlock block) {
updater.addBlindedBlock(block, spec);
updater.addBlindedBlock(block, block.getRoot(), spec);
updater.addHotBlockCheckpointEpochs(
block.getRoot(),
new BlockCheckpoints(
anchorState.getCurrentJustifiedCheckpoint(),
anchorState.getFinalizedCheckpoint(),
anchorState.getCurrentJustifiedCheckpoint(),
anchorState.getFinalizedCheckpoint()));
updater.addFinalizedBlockRootBySlot(block);
updater.addFinalizedBlockRootBySlot(block.getSlot(), block.getRoot());
}

@Override
protected void storeFinalizedBlocksToDao(final Collection<SignedBeaconBlock> blocks) {
try (final FinalizedUpdaterBlinded updater = finalizedUpdater()) {
blocks.forEach(
block -> {
updater.addBlindedBlock(block, spec);
updater.addBlindedBlock(block, block.getRoot(), spec);
block
.getMessage()
.getBody()
.getOptionalExecutionPayload()
.ifPresent(updater::addExecutionPayload);
updater.addFinalizedBlockRootBySlot(block);
updater.addFinalizedBlockRootBySlot(block.getSlot(), block.getRoot());
});
updater.commit();
}
Expand Down Expand Up @@ -243,9 +248,9 @@ protected void addFinalizedBlock(
final boolean isRemovedFromHotBlocks,
final FinalizedUpdaterBlinded updater) {
if (isRemovedFromHotBlocks) {
updater.addBlindedBlock(block, spec);
updater.addBlindedBlock(block, block.getRoot(), spec);
}
updater.addFinalizedBlockRootBySlot(block);
updater.addFinalizedBlockRootBySlot(block.getSlot(), block.getRoot());
}

@Override
Expand All @@ -257,7 +262,8 @@ protected void updateHotBlocks(
try (final FinalizedUpdaterBlinded finalizedUpdater = dao.finalizedUpdaterBlinded()) {
addedBlocks
.values()
.forEach(block -> finalizedUpdater.addBlindedBlock(block.getBlock(), spec));
.forEach(
block -> finalizedUpdater.addBlindedBlock(block.getBlock(), block.getRoot(), spec));
if (!storeNonCanonicalBlocks) {
deletedHotBlockRoots.stream()
.filter(blockRoot -> !finalizedBlockRoots.contains(blockRoot))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server.kvstore;

import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDaoBlinded;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDaoUnblinded;

public class BlindedHotBlockMigration<
T extends KvStoreCombinedDaoBlinded & KvStoreCombinedDaoUnblinded> {
private static final Logger LOG = LogManager.getLogger();
private static final int BATCH_SIZE = 1_000;
private final Spec spec;

private final T dao;

private BlindedHotBlockMigration(final Spec spec, final T dao) {
this.spec = spec;
this.dao = dao;
}

static <T extends KvStoreCombinedDaoBlinded & KvStoreCombinedDaoUnblinded> void migrateBlocks(
final T dao, final Spec spec) {
final BlindedHotBlockMigration<T> blindedHotBlockMigration =
new BlindedHotBlockMigration<>(spec, dao);
blindedHotBlockMigration.performBatchMigration();
}

private void performBatchMigration() {
moveHotBlocksToBlindedStorage();
}

private void moveFirstFinalizedBlockToBlindedStorage() {
rolfyone marked this conversation as resolved.
Show resolved Hide resolved
Optional<SignedBeaconBlock> maybeBlock = dao.getEarliestFinalizedBlock();
maybeBlock.ifPresent(
block -> {
final Bytes32 root = block.getRoot();
LOG.info("Setting Lowest finalized Block at {}({})", root, block.getSlot());
try (KvStoreCombinedDaoBlinded.FinalizedUpdaterBlinded blindedUpdater =
dao.finalizedUpdaterBlinded();
KvStoreCombinedDaoUnblinded.FinalizedUpdaterUnblinded unblindedUpdater =
dao.finalizedUpdaterUnblinded()) {

blindedUpdater.addBlindedFinalizedBlock(block, root, spec);
unblindedUpdater.deleteUnblindedFinalizedBlock(block.getSlot(), root);
blindedUpdater.commit();
unblindedUpdater.commit();
}
});
}

private void moveHotBlocksToBlindedStorage() {
final long countBlocks = dao.countUnblindedHotBlocks();
if (countBlocks == 0) {
return;
}
LOG.info("Migrating blocks to blinded storage, {} hot blocks to migrate", countBlocks);

long counter = 0;
try (final Stream<SignedBeaconBlock> blocks = dao.streamHotBlocks()) {
for (Iterator<SignedBeaconBlock> it = blocks.iterator(); it.hasNext(); ) {
try (KvStoreCombinedDaoBlinded.FinalizedUpdaterBlinded blindedUpdater =
dao.finalizedUpdaterBlinded();
KvStoreCombinedDaoUnblinded.HotUpdaterUnblinded unblindedUpdater =
dao.hotUpdaterUnblinded()) {
for (int i = 0; i < BATCH_SIZE && it.hasNext(); i++) {
final SignedBeaconBlock block = it.next();
blindedUpdater.addBlindedBlock(block, block.getRoot(), spec);
unblindedUpdater.deleteUnblindedHotBlockOnly(block.getRoot());
counter++;
rolfyone marked this conversation as resolved.
Show resolved Hide resolved
}
blindedUpdater.commit();
unblindedUpdater.commit();
}
}
}
LOG.info("Hot blocks all moved ({} blocks)", counter);
moveFirstFinalizedBlockToBlindedStorage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public static Database createV4(
hotDao,
new V4FinalizedKvStoreDao(finalizedDb, schemaFinalized, finalizedStateStorageLogic));
if (storeBlockExecutionPayloadSeparately) {
BlindedHotBlockMigration.migrateBlocks(dao, spec);
return new BlindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
}
return new UnblindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
Expand Down Expand Up @@ -173,6 +174,7 @@ public static Database createWithStateTree(
final CombinedKvStoreDao<S> dao =
new CombinedKvStoreDao<>(db, schema, finalizedStateStorageLogic);
if (storeBlockExecutionPayloadSeparately) {
BlindedHotBlockMigration.migrateBlocks(dao, spec);
return new BlindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
}
return new UnblindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return dao.streamHotBlocks();
}

@Override
public long countUnblindedFinalizedBlocks() {
return dao.countUnblindedFinalizedBlocks();
}

@Override
@MustBeClosed
public Stream<SignedBeaconBlock> streamFinalizedBlocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return db.stream(schema.getColumnHotBlocksByRoot()).map(ColumnEntry::getValue);
}

@Override
public long countUnblindedHotBlocks() {
try (final Stream<ColumnEntry<Bytes, Bytes>> rawEntries =
db.streamRaw(schema.getColumnHotBlocksByRoot())) {
return rawEntries.count();
}
}

@Override
public Optional<BeaconState> getLatestFinalizedState() {
return db.get(schema.getVariableLatestFinalizedState());
Expand Down Expand Up @@ -335,6 +343,14 @@ public Stream<SignedBeaconBlock> streamFinalizedBlocks(
.map(ColumnEntry::getValue);
}

@Override
public long countUnblindedFinalizedBlocks() {
try (Stream<ColumnEntry<Bytes, Bytes>> entries =
db.streamRaw(schema.getColumnSlotsByFinalizedRoot())) {
return entries.count();
}
}

@Override
public Optional<UInt64> getEarliestBlindedBlockSlot() {
return db.getFirstEntry(schema.getColumnFinalizedBlockRootBySlot()).map(ColumnEntry::getKey);
Expand Down Expand Up @@ -573,6 +589,11 @@ public void deleteHotBlock(final Bytes32 blockRoot) {
deleteHotState(blockRoot);
}

@Override
public void deleteUnblindedHotBlockOnly(final Bytes32 blockRoot) {
transaction.delete(schema.getColumnHotBlocksByRoot(), blockRoot);
}

@Override
public void deleteHotState(final Bytes32 blockRoot) {
transaction.delete(schema.getColumnHotStatesByRoot(), blockRoot);
Expand Down Expand Up @@ -614,15 +635,23 @@ public void addFinalizedBlock(final SignedBeaconBlock block) {
}

@Override
public void addFinalizedBlockRootBySlot(final SignedBeaconBlock block) {
transaction.put(schema.getColumnFinalizedBlockRootBySlot(), block.getSlot(), block.getRoot());
public void addFinalizedBlockRootBySlot(final UInt64 slot, final Bytes32 root) {
transaction.put(schema.getColumnFinalizedBlockRootBySlot(), slot, root);
}

@Override
public void addBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
public void addBlindedFinalizedBlock(
final SignedBeaconBlock block, final Bytes32 root, final Spec spec) {
addBlindedBlock(block, root, spec);
addFinalizedBlockRootBySlot(block.getSlot(), root);
}

@Override
public void addBlindedBlock(
final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec) {
transaction.put(
schema.getColumnBlindedBlocksByRoot(),
block.getRoot(),
blockRoot,
block.blind(spec.atSlot(block.getSlot()).getSchemaDefinitions()));
final Optional<ExecutionPayload> maybePayload =
block.getMessage().getBody().getOptionalExecutionPayload();
Expand Down Expand Up @@ -660,6 +689,12 @@ public void addNonCanonicalBlock(final SignedBeaconBlock block) {
transaction.put(schema.getColumnNonCanonicalBlocksByRoot(), block.getRoot(), block);
}

@Override
public void deleteUnblindedFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot) {
transaction.delete(schema.getColumnFinalizedBlocksBySlot(), slot);
transaction.delete(schema.getColumnSlotsByFinalizedRoot(), blockRoot);
}

@Override
public void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots) {
Optional<Set<Bytes32>> maybeRoots = db.get(schema.getColumnNonCanonicalRootsBySlot(), slot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return hotDao.streamHotBlocks();
}

@Override
public long countUnblindedHotBlocks() {
return hotDao.countUnblindedHotBlocks();
}

@Override
public Map<UInt64, VoteTracker> getVotes() {
return hotDao.getVotes();
Expand Down Expand Up @@ -209,6 +214,11 @@ public Stream<SignedBeaconBlock> streamFinalizedBlocks(
return finalizedDao.streamFinalizedBlocks(startSlot, endSlot);
}

@Override
public long countUnblindedFinalizedBlocks() {
return finalizedDao.countUnblindedFinalizedBlocks();
}

@Override
@MustBeClosed
public Stream<Bytes> streamExecutionPayloads() {
Expand Down Expand Up @@ -475,6 +485,11 @@ public void deleteHotBlock(final Bytes32 blockRoot) {
hotUpdater.deleteHotBlock(blockRoot);
}

@Override
public void deleteUnblindedHotBlockOnly(final Bytes32 blockRoot) {
hotUpdater.deleteUnblindedHotBlockOnly(blockRoot);
}

@Override
public void deleteHotState(final Bytes32 blockRoot) {
hotUpdater.deleteHotState(blockRoot);
Expand All @@ -486,13 +501,20 @@ public void addFinalizedBlock(final SignedBeaconBlock block) {
}

@Override
public void addFinalizedBlockRootBySlot(final SignedBeaconBlock block) {
finalizedUpdater.addFinalizedBlockRootBySlot(block);
public void addFinalizedBlockRootBySlot(final UInt64 slot, final Bytes32 root) {
finalizedUpdater.addFinalizedBlockRootBySlot(slot, root);
}

@Override
public void addBlindedFinalizedBlock(
final SignedBeaconBlock block, final Bytes32 root, final Spec spec) {
finalizedUpdater.addBlindedFinalizedBlock(block, root, spec);
}

@Override
public void addBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
finalizedUpdater.addBlindedBlock(block, spec);
public void addBlindedBlock(
final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec) {
finalizedUpdater.addBlindedBlock(block, blockRoot, spec);
}

@Override
Expand All @@ -515,6 +537,11 @@ public void addNonCanonicalBlock(final SignedBeaconBlock block) {
finalizedUpdater.addNonCanonicalBlock(block);
}

@Override
public void deleteUnblindedFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot) {
finalizedUpdater.deleteUnblindedFinalizedBlock(slot, blockRoot);
}

@Override
public void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots) {
finalizedUpdater.addNonCanonicalRootAtSlot(slot, blockRoots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ default void addCheckpointEpochs(final Map<Bytes32, BlockAndCheckpoints> blocks)

interface FinalizedUpdaterBlinded extends FinalizedUpdaterCommon {

void addFinalizedBlockRootBySlot(final SignedBeaconBlock block);
void addFinalizedBlockRootBySlot(final UInt64 slot, final Bytes32 root);

void addBlindedBlock(final SignedBeaconBlock block, final Spec spec);
void addBlindedFinalizedBlock(
final SignedBeaconBlock block, final Bytes32 root, final Spec spec);

void addBlindedBlock(final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec);

void addExecutionPayload(final ExecutionPayload payload);

Expand Down
Loading