Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate hot blocks if using blinded storage #5940

Merged
merged 9 commits into from
Jul 18, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public interface Database extends AutoCloseable {
@MustBeClosed
Stream<SignedBeaconBlock> streamHotBlocks();

long countFinalizedBlocks();
rolfyone marked this conversation as resolved.
Show resolved Hide resolved

@MustBeClosed
Stream<Map.Entry<Bytes32, BlockCheckpoints>> streamBlockCheckpoints();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return dao.streamBlindedHotBlocks().map(this::getUnblindedBlock);
}

@Override
public long countFinalizedBlocks() {
return 0;
}

@Override
@MustBeClosed
public Stream<SignedBeaconBlock> streamFinalizedBlocks(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright ConsenSys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.server.kvstore;

import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDaoBlinded;
import tech.pegasys.teku.storage.server.kvstore.dataaccess.KvStoreCombinedDaoUnblinded;

public class BlindedHotBlockMigration<
T extends KvStoreCombinedDaoBlinded & KvStoreCombinedDaoUnblinded> {
private static final Logger LOG = LogManager.getLogger();
private static final int BATCH_SIZE = 10_000;
private final Spec spec;

private final T dao;

private BlindedHotBlockMigration(final Spec spec, final T dao) {
this.spec = spec;
this.dao = dao;
}

static <T extends KvStoreCombinedDaoBlinded & KvStoreCombinedDaoUnblinded> void migrateBlocks(
final T dao, final Spec spec) {
final BlindedHotBlockMigration<T> blindedHotBlockMigration =
new BlindedHotBlockMigration<>(spec, dao);
blindedHotBlockMigration.performBatchMigration();
}

private void performBatchMigration() {
moveHotBlocksToBlindedStorage();
moveFirstFinalizedBlockToBlindedStorage();
rolfyone marked this conversation as resolved.
Show resolved Hide resolved
}

private void moveFirstFinalizedBlockToBlindedStorage() {
rolfyone marked this conversation as resolved.
Show resolved Hide resolved
Optional<SignedBeaconBlock> maybeBlock = dao.getEarliestFinalizedBlock();
maybeBlock.ifPresent(
block -> {
final Bytes32 root = block.getRoot();
LOG.info("Setting Lowest finalized Block at {}({})", root, block.getSlot());
try (KvStoreCombinedDaoBlinded.FinalizedUpdaterBlinded blindedUpdater =
dao.finalizedUpdaterBlinded();
KvStoreCombinedDaoUnblinded.FinalizedUpdaterUnblinded unblindedUpdater =
dao.finalizedUpdaterUnblinded()) {

blindedUpdater.addFinalizedBlindedBlock(block, spec);
unblindedUpdater.deleteFinalizedBlock(block.getSlot(), root);
blindedUpdater.commit();
unblindedUpdater.commit();
}
});
}

private void moveHotBlocksToBlindedStorage() {
final long countBlocks = dao.countHotBlocks();
if (countBlocks == 0) {
return;
}
LOG.info("Migrating blocks to blinded storage, {} hot blocks to migrate", countBlocks);

long counter = 0;
try (final Stream<SignedBeaconBlock> blocks = dao.streamHotBlocks()) {
for (Iterator<SignedBeaconBlock> it = blocks.iterator(); it.hasNext(); ) {
try (KvStoreCombinedDaoBlinded.FinalizedUpdaterBlinded blindedUpdater =
dao.finalizedUpdaterBlinded();
KvStoreCombinedDaoUnblinded.HotUpdaterUnblinded unblindedUpdater =
dao.hotUpdaterUnblinded()) {
for (int i = 0; i < BATCH_SIZE && it.hasNext(); i++) {
final SignedBeaconBlock block = it.next();
blindedUpdater.addBlindedBlock(block, spec);
unblindedUpdater.deleteHotBlockOnly(block.getRoot());
counter++;
rolfyone marked this conversation as resolved.
Show resolved Hide resolved
}
blindedUpdater.commit();
unblindedUpdater.commit();
}
}
}
LOG.info("Hot blocks all moved ({} blocks)", counter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public static Database createV4(
hotDao,
new V4FinalizedKvStoreDao(finalizedDb, schemaFinalized, finalizedStateStorageLogic));
if (storeBlockExecutionPayloadSeparately) {
BlindedHotBlockMigration.migrateBlocks(dao, spec);
return new BlindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
}
return new UnblindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
Expand Down Expand Up @@ -173,6 +174,7 @@ public static Database createWithStateTree(
final CombinedKvStoreDao<S> dao =
new CombinedKvStoreDao<>(db, schema, finalizedStateStorageLogic);
if (storeBlockExecutionPayloadSeparately) {
BlindedHotBlockMigration.migrateBlocks(dao, spec);
return new BlindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
}
return new UnblindedBlockKvStoreDatabase(dao, stateStorageMode, storeNonCanonicalBlocks, spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return dao.streamHotBlocks();
}

@Override
public long countFinalizedBlocks() {
return dao.countFinalizedUnblindedBlocks();
}

@Override
@MustBeClosed
public Stream<SignedBeaconBlock> streamFinalizedBlocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return db.stream(schema.getColumnHotBlocksByRoot()).map(ColumnEntry::getValue);
}

@Override
public long countHotBlocks() {
try (final Stream<ColumnEntry<Bytes, Bytes>> rawEntries =
db.streamRaw(schema.getColumnHotBlocksByRoot())) {
return rawEntries.count();
}
}

@Override
public Optional<BeaconState> getLatestFinalizedState() {
return db.get(schema.getVariableLatestFinalizedState());
Expand Down Expand Up @@ -335,6 +343,14 @@ public Stream<SignedBeaconBlock> streamFinalizedBlocks(
.map(ColumnEntry::getValue);
}

@Override
public long countFinalizedUnblindedBlocks() {
try (Stream<ColumnEntry<Bytes, Bytes>> entries =
db.streamRaw(schema.getColumnSlotsByFinalizedRoot())) {
return entries.count();
}
}

@Override
public Optional<UInt64> getEarliestBlindedBlockSlot() {
return db.getFirstEntry(schema.getColumnFinalizedBlockRootBySlot()).map(ColumnEntry::getKey);
Expand Down Expand Up @@ -573,6 +589,11 @@ public void deleteHotBlock(final Bytes32 blockRoot) {
deleteHotState(blockRoot);
}

@Override
public void deleteHotBlockOnly(final Bytes32 blockRoot) {
transaction.delete(schema.getColumnHotBlocksByRoot(), blockRoot);
}

@Override
public void deleteHotState(final Bytes32 blockRoot) {
transaction.delete(schema.getColumnHotStatesByRoot(), blockRoot);
Expand Down Expand Up @@ -620,9 +641,21 @@ public void addFinalizedBlockRootBySlot(final SignedBeaconBlock block) {

@Override
public void addBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
addBlindedBlock(block, block.getRoot(), spec);
}

@Override
public void addFinalizedBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
addBlindedBlock(block, block.getRoot(), spec);
addFinalizedBlockRootBySlot(block);
}

@Override
public void addBlindedBlock(
final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec) {
transaction.put(
schema.getColumnBlindedBlocksByRoot(),
block.getRoot(),
blockRoot,
block.blind(spec.atSlot(block.getSlot()).getSchemaDefinitions()));
final Optional<ExecutionPayload> maybePayload =
block.getMessage().getBody().getOptionalExecutionPayload();
Expand Down Expand Up @@ -660,6 +693,12 @@ public void addNonCanonicalBlock(final SignedBeaconBlock block) {
transaction.put(schema.getColumnNonCanonicalBlocksByRoot(), block.getRoot(), block);
}

@Override
public void deleteFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot) {
transaction.delete(schema.getColumnFinalizedBlocksBySlot(), slot);
transaction.delete(schema.getColumnSlotsByFinalizedRoot(), blockRoot);
}

@Override
public void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots) {
Optional<Set<Bytes32>> maybeRoots = db.get(schema.getColumnNonCanonicalRootsBySlot(), slot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ public Stream<SignedBeaconBlock> streamHotBlocks() {
return hotDao.streamHotBlocks();
}

@Override
public long countHotBlocks() {
return hotDao.countHotBlocks();
}

@Override
public Map<UInt64, VoteTracker> getVotes() {
return hotDao.getVotes();
Expand Down Expand Up @@ -209,6 +214,11 @@ public Stream<SignedBeaconBlock> streamFinalizedBlocks(
return finalizedDao.streamFinalizedBlocks(startSlot, endSlot);
}

@Override
public long countFinalizedUnblindedBlocks() {
return finalizedDao.countFinalizedBlocks();
}

@Override
@MustBeClosed
public Stream<Bytes> streamExecutionPayloads() {
Expand Down Expand Up @@ -475,6 +485,11 @@ public void deleteHotBlock(final Bytes32 blockRoot) {
hotUpdater.deleteHotBlock(blockRoot);
}

@Override
public void deleteHotBlockOnly(final Bytes32 blockRoot) {
hotUpdater.deleteHotBlockOnly(blockRoot);
}

@Override
public void deleteHotState(final Bytes32 blockRoot) {
hotUpdater.deleteHotState(blockRoot);
Expand All @@ -495,6 +510,17 @@ public void addBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
finalizedUpdater.addBlindedBlock(block, spec);
}

@Override
public void addFinalizedBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
finalizedUpdater.addFinalizedBlindedBlock(block, spec);
}

@Override
public void addBlindedBlock(
final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec) {
finalizedUpdater.addBlindedBlock(block, blockRoot, spec);
}

@Override
public void addExecutionPayload(final ExecutionPayload payload) {
finalizedUpdater.addExecutionPayload(payload);
Expand All @@ -515,6 +541,11 @@ public void addNonCanonicalBlock(final SignedBeaconBlock block) {
finalizedUpdater.addNonCanonicalBlock(block);
}

@Override
public void deleteFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot) {
finalizedUpdater.deleteFinalizedBlock(slot, blockRoot);
}

@Override
public void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots) {
finalizedUpdater.addNonCanonicalRootAtSlot(slot, blockRoots);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ interface FinalizedUpdaterBlinded extends FinalizedUpdaterCommon {

void addBlindedBlock(final SignedBeaconBlock block, final Spec spec);

void addFinalizedBlindedBlock(final SignedBeaconBlock block, final Spec spec);

void addBlindedBlock(final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec);

void addExecutionPayload(final ExecutionPayload payload);

void deleteBlindedBlock(final Bytes32 root);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface KvStoreCombinedDaoUnblinded extends KvStoreCombinedDaoCommon {
@MustBeClosed
Stream<SignedBeaconBlock> streamHotBlocks();

long countHotBlocks();

Optional<SignedBeaconBlock> getFinalizedBlock(final Bytes32 root);

Optional<SignedBeaconBlock> getFinalizedBlockAtSlot(UInt64 slot);
Expand All @@ -54,6 +56,8 @@ public interface KvStoreCombinedDaoUnblinded extends KvStoreCombinedDaoCommon {
@MustBeClosed
Stream<SignedBeaconBlock> streamFinalizedBlocks(UInt64 startSlot, UInt64 endSlot);

long countFinalizedUnblindedBlocks();

Optional<UInt64> getSlotForFinalizedBlockRoot(Bytes32 blockRoot);

Optional<UInt64> getSlotForFinalizedStateRoot(Bytes32 stateRoot);
Expand All @@ -71,12 +75,16 @@ default void addHotBlocks(final Map<Bytes32, BlockAndCheckpoints> blocks) {
}

void deleteHotBlock(Bytes32 blockRoot);

void deleteHotBlockOnly(Bytes32 blockRoot);
}

interface FinalizedUpdaterUnblinded extends FinalizedUpdaterCommon {

void addFinalizedBlock(final SignedBeaconBlock block);

void addNonCanonicalBlock(final SignedBeaconBlock block);

void deleteFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ public Stream<SignedBeaconBlock> streamFinalizedBlocks(
.map(ColumnEntry::getValue);
}

public long countFinalizedBlocks() {
try (Stream<ColumnEntry<Bytes, Bytes>> entries =
db.streamRaw(schema.getColumnFinalizedBlocksBySlot())) {
return entries.count();
}
}

@MustBeClosed
public Stream<Bytes> streamExecutionPayloads() {
return db.stream(schema.getColumnExecutionPayloadByPayloadHash()).map(ColumnEntry::getValue);
Expand Down Expand Up @@ -241,9 +248,21 @@ public void addFinalizedBlockRootBySlot(final SignedBeaconBlock block) {

@Override
public void addBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
addBlindedBlock(block, block.getRoot(), spec);
}

@Override
public void addFinalizedBlindedBlock(final SignedBeaconBlock block, final Spec spec) {
addBlindedBlock(block, spec);
addFinalizedBlockRootBySlot(block);
}

@Override
public void addBlindedBlock(
final SignedBeaconBlock block, final Bytes32 blockRoot, final Spec spec) {
transaction.put(
schema.getColumnBlindedBlocksByRoot(),
block.getRoot(),
blockRoot,
block.blind(spec.atSlot(block.getSlot()).getSchemaDefinitions()));
final Optional<ExecutionPayload> maybePayload =
block.getMessage().getBody().getOptionalExecutionPayload();
Expand Down Expand Up @@ -281,6 +300,12 @@ public void addNonCanonicalBlock(final SignedBeaconBlock block) {
transaction.put(schema.getColumnNonCanonicalBlocksByRoot(), block.getRoot(), block);
}

@Override
public void deleteFinalizedBlock(final UInt64 slot, final Bytes32 blockRoot) {
transaction.delete(schema.getColumnFinalizedBlocksBySlot(), slot);
transaction.delete(schema.getColumnSlotsByFinalizedRoot(), blockRoot);
}

@Override
public void addNonCanonicalRootAtSlot(final UInt64 slot, final Set<Bytes32> blockRoots) {
Optional<Set<Bytes32>> maybeRoots = db.get(schema.getColumnNonCanonicalRootsBySlot(), slot);
Expand Down
Loading