diff --git a/chainbase/src/main/java/org/tron/common/storage/leveldb/LevelDbDataSourceImpl.java b/chainbase/src/main/java/org/tron/common/storage/leveldb/LevelDbDataSourceImpl.java index 9cc0561a53c..7ed5a770799 100644 --- a/chainbase/src/main/java/org/tron/common/storage/leveldb/LevelDbDataSourceImpl.java +++ b/chainbase/src/main/java/org/tron/common/storage/leveldb/LevelDbDataSourceImpl.java @@ -135,9 +135,11 @@ private void openDatabase(Options dbOptions) throws IOException { } try { database = factory.open(dbPath.toFile(), dbOptions); - logger.info("DB {} open success with writeBufferSize {} M, cacheSize {} M, maxOpenFiles {}.", - this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024, - dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles()); + if (!this.getDBName().startsWith("checkpoint")) { + logger.info("DB {} open success with writeBufferSize {} M, cacheSize {} M, maxOpenFiles {}.", + this.getDBName(), dbOptions.writeBufferSize() / 1024 / 1024, + dbOptions.cacheSize() / 1024 / 1024, dbOptions.maxOpenFiles()); + } } catch (IOException e) { if (e.getMessage().contains("Corruption:")) { logger.warn("DB {} corruption detected, try to repair it.", this.getDBName()); diff --git a/chainbase/src/main/java/org/tron/core/db/TronDatabase.java b/chainbase/src/main/java/org/tron/core/db/TronDatabase.java index 9373189941a..04e20af4fd6 100644 --- a/chainbase/src/main/java/org/tron/core/db/TronDatabase.java +++ b/chainbase/src/main/java/org/tron/core/db/TronDatabase.java @@ -2,7 +2,6 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.nio.file.Paths; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; @@ -10,6 +9,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.iq80.leveldb.WriteOptions; +import org.rocksdb.DirectComparator; import org.springframework.beans.factory.annotation.Autowired; import org.tron.common.parameter.CommonParameter; import org.tron.common.storage.WriteOptionsWrapper; @@ -18,11 +18,8 @@ import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl; import org.tron.common.utils.StorageUtils; import org.tron.core.db.common.DbSourceInter; -import org.tron.core.db2.common.LevelDB; -import org.tron.core.db2.common.RocksDB; import org.tron.core.db2.common.WrappedByteArray; import org.tron.core.db2.core.ITronChainBase; -import org.tron.core.db2.core.SnapshotRoot; import org.tron.core.exception.BadItemException; import org.tron.core.exception.ItemNotFoundException; @@ -46,7 +43,7 @@ protected TronDatabase(String dbName) { dbSource = new LevelDbDataSourceImpl(StorageUtils.getOutputDirectoryByDbName(dbName), dbName, - StorageUtils.getOptionsByDbName(dbName), + getOptionsByDbNameForLevelDB(dbName), new WriteOptions().sync(CommonParameter.getInstance() .getStorage().isDbSync())); } else if ("ROCKSDB".equals(CommonParameter.getInstance() @@ -55,7 +52,7 @@ protected TronDatabase(String dbName) { CommonParameter.getInstance().getStorage().getDbDirectory()).toString(); dbSource = new RocksDbDataSourceImpl(parentName, dbName, CommonParameter.getInstance() - .getRocksDBCustomSettings()); + .getRocksDBCustomSettings(), getDirectComparator()); } dbSource.initDB(); @@ -69,6 +66,14 @@ private void init() { protected TronDatabase() { } + protected org.iq80.leveldb.Options getOptionsByDbNameForLevelDB(String dbName) { + return StorageUtils.getOptionsByDbName(dbName); + } + + protected DirectComparator getDirectComparator() { + return null; + } + public DbSourceInter getDbSource() { return dbSource; } diff --git a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java index 477631ab507..939e45de419 100644 --- a/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java +++ b/chainbase/src/main/java/org/tron/core/db2/core/SnapshotManager.java @@ -7,6 +7,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + +import java.io.File; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -15,10 +18,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import javax.annotation.PostConstruct; + import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -26,7 +32,10 @@ import org.tron.common.error.TronDBException; import org.tron.common.parameter.CommonParameter; import org.tron.common.storage.WriteOptionsWrapper; +import org.tron.common.utils.FileUtil; +import org.tron.common.utils.StorageUtils; import org.tron.core.db.RevokingDatabase; +import org.tron.core.db.TronDatabase; import org.tron.core.db2.ISession; import org.tron.core.db2.common.DB; import org.tron.core.db2.common.IRevokingDB; @@ -34,6 +43,7 @@ import org.tron.core.db2.common.Value; import org.tron.core.db2.common.WrappedByteArray; import org.tron.core.exception.RevokingStoreIllegalStateException; +import org.tron.core.store.CheckPointV2Store; import org.tron.core.store.CheckTmpStore; @Slf4j(topic = "DB") @@ -42,6 +52,8 @@ public class SnapshotManager implements RevokingDatabase { public static final int DEFAULT_MAX_FLUSH_COUNT = 500; public static final int DEFAULT_MIN_FLUSH_COUNT = 1; private static final int DEFAULT_STACK_MAX_SIZE = 256; + private static final long ONE_MINUTE_MILLS = 60*1000L; + private static final String CHECKPOINT_V2_DIR = "checkpoint"; @Getter private List dbs = new ArrayList<>(); @Getter @@ -63,6 +75,8 @@ public class SnapshotManager implements RevokingDatabase { private Map flushServices = new HashMap<>(); + private ScheduledExecutorService pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor(); + @Autowired @Setter @Getter @@ -71,11 +85,27 @@ public class SnapshotManager implements RevokingDatabase { @Setter private volatile int maxFlushCount = DEFAULT_MIN_FLUSH_COUNT; + private int checkpointVersion = 1; // default v1 + public SnapshotManager(String checkpointPath) { } @PostConstruct public void init() { + checkpointVersion = CommonParameter.getInstance().getStorage().getCheckpointVersion(); + // prune checkpoint + if (isV2Open()) { + pruneCheckpointThread = Executors.newSingleThreadScheduledExecutor(); + pruneCheckpointThread.scheduleWithFixedDelay(() -> { + try { + if (!unChecked) { + pruneCheckpoint(); + } + } catch (Throwable t) { + logger.error("Exception in prune checkpoint", t); + } + }, 10000, 3600, TimeUnit.MILLISECONDS); + } exitThread = new Thread(() -> { LockSupport.park(); // to Guarantee Some other thread invokes unpark with the current thread as the target @@ -246,6 +276,9 @@ public void shutdown() { logger.info("******** Before revokingDb size: {}.", size); checkTmpStore.close(); logger.info("******** End to pop revokingDb. ********"); + if (pruneCheckpointThread != null) { + pruneCheckpointThread.shutdown(); + } } public void updateSolidity(int hops) { @@ -309,8 +342,11 @@ public void flush() { if (shouldBeRefreshed()) { try { long start = System.currentTimeMillis(); - deleteCheckpoint(); + if (!isV2Open()) { + deleteCheckpoint(); + } createCheckpoint(); + long checkPointEnd = System.currentTimeMillis(); refresh(); flushCount = 0; @@ -328,6 +364,8 @@ public void flush() { } private void createCheckpoint() { + TronDatabase checkPointStore = null; + boolean syncFlag; try { Map batch = new HashMap<>(); for (Chainbase db : dbs) { @@ -350,72 +388,154 @@ private void createCheckpoint() { } } } + if (isV2Open()) { + String dbName = String.valueOf(System.currentTimeMillis()); + checkPointStore = getCheckpointDB(dbName); + syncFlag = CommonParameter.getInstance().getStorage().isCheckpointSync(); + } else { + checkPointStore = checkTmpStore; + syncFlag = CommonParameter.getInstance().getStorage().isDbSync(); + } - checkTmpStore.getDbSource().updateByBatch(batch.entrySet().stream() + checkPointStore.getDbSource().updateByBatch(batch.entrySet().stream() .map(e -> Maps.immutableEntry(e.getKey().getBytes(), e.getValue().getBytes())) .collect(HashMap::new, (m, k) -> m.put(k.getKey(), k.getValue()), HashMap::putAll), - WriteOptionsWrapper.getInstance().sync(CommonParameter - .getInstance().getStorage().isDbSync())); + WriteOptionsWrapper.getInstance().sync(syncFlag)); - } catch ( Exception e) { + } catch (Exception e) { throw new TronDBException(e); + } finally { + if (isV2Open() && checkPointStore != null) { + checkPointStore.close(); + } } } - private void deleteCheckpoint() { - try { - Map hmap = new HashMap<>(); - if (!checkTmpStore.getDbSource().allKeys().isEmpty()) { - for (Map.Entry e : checkTmpStore.getDbSource()) { - hmap.put(e.getKey(), null); - } + private TronDatabase getCheckpointDB(String dbName) { + return new CheckPointV2Store(CHECKPOINT_V2_DIR+"/"+dbName); + } + + private List getCheckpointList() { + String dbPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR), + CommonParameter.getInstance().getStorage().getDbDirectory()).toString(); + File file = new File(Paths.get(dbPath, CHECKPOINT_V2_DIR).toString()); + if (file.exists() && file.isDirectory()) { + String[] subDirs = file.list(); + if (subDirs != null) { + return Arrays.stream(subDirs).sorted().collect(Collectors.toList()); } + } + return null; + } - checkTmpStore.getDbSource().updateByBatch(hmap); - } catch (Exception e) { - throw new TronDBException(e); + private void deleteCheckpoint() { + checkTmpStore.reset(); + } + + private void pruneCheckpoint() { + if (unChecked) { + return; + } + List cpList = getCheckpointList(); + if (cpList == null) { + return; + } + if (cpList.size() < 3) { + return; + } + for (String cp: cpList.subList(0, cpList.size()-3)) { + long timestamp = Long.parseLong(cp); + if (System.currentTimeMillis() - timestamp < ONE_MINUTE_MILLS*2) { + break; + } + String checkpointPath = Paths.get(StorageUtils.getOutputDirectoryByDbName(CHECKPOINT_V2_DIR), + CommonParameter.getInstance().getStorage().getDbDirectory(), CHECKPOINT_V2_DIR).toString(); + if (!FileUtil.recursiveDelete(Paths.get(checkpointPath, cp).toString())) { + logger.error("checkpoint prune failed, timestamp: {}", timestamp); + return; + } + logger.debug("checkpoint prune success, timestamp: {}", timestamp); } } // ensure run this method first after process start. @Override public void check() { - for (Chainbase db : dbs) { + if (!isV2Open()) { + List cpList = getCheckpointList(); + if (cpList != null && cpList.size() != 0) { + logger.error("checkpoint check failed, can't convert checkpoint from v2 to v1"); + System.exit(-1); + } + checkV1(); + } else { + checkV2(); + } + } + + private void checkV1() { + for (Chainbase db: dbs) { if (!Snapshot.isRoot(db.getHead())) { throw new IllegalStateException("First check."); } } if (!checkTmpStore.getDbSource().allKeys().isEmpty()) { - Map dbMap = dbs.stream() - .map(db -> Maps.immutableEntry(db.getDbName(), db)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - advance(); - for (Map.Entry e : checkTmpStore.getDbSource()) { - byte[] key = e.getKey(); - byte[] value = e.getValue(); - String db = simpleDecode(key); - if (dbMap.get(db) == null) { - continue; - } - byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length + 4, key.length); - - byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length); - if (realValue != null) { - dbMap.get(db).getHead().put(realKey, realValue); - } else { - dbMap.get(db).getHead().remove(realKey); - } + recover(checkTmpStore); + } - } + unChecked = false; + } - dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead())); - retreat(); + private void checkV2() { + logger.info("checkpoint version: {}", CommonParameter.getInstance().getStorage().getCheckpointVersion()); + logger.info("checkpoint sync: {}", CommonParameter.getInstance().getStorage().isCheckpointSync()); + List cpList = getCheckpointList(); + if (cpList == null || cpList.size() == 0) { + logger.info("checkpoint size is 0, using v1 recover"); + checkV1(); + deleteCheckpoint(); + return; } + for (String cp: cpList) { + TronDatabase checkPointV2Store = getCheckpointDB(cp); + recover(checkPointV2Store); + checkPointV2Store.close(); + } + logger.info("checkpoint v2 recover success"); unChecked = false; } + private void recover(TronDatabase tronDatabase) { + Map dbMap = dbs.stream() + .map(db -> Maps.immutableEntry(db.getDbName(), db)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + advance(); + for (Map.Entry e: tronDatabase.getDbSource()) { + byte[] key = e.getKey(); + byte[] value = e.getValue(); + String db = simpleDecode(key); + if (dbMap.get(db) == null) { + continue; + } + byte[] realKey = Arrays.copyOfRange(key, db.getBytes().length + 4, key.length); + byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length); + if (realValue != null) { + dbMap.get(db).getHead().put(realKey, realValue); + } else { + dbMap.get(db).getHead().remove(realKey); + } + } + + dbs.forEach(db -> db.getHead().getRoot().merge(db.getHead())); + retreat(); + } + + private boolean isV2Open() { + return checkpointVersion == 2; + } + private byte[] simpleEncode(String s) { byte[] bytes = s.getBytes(); byte[] length = Ints.toByteArray(bytes.length); diff --git a/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java b/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java new file mode 100644 index 00000000000..d5ffaaa9143 --- /dev/null +++ b/chainbase/src/main/java/org/tron/core/store/CheckPointV2Store.java @@ -0,0 +1,48 @@ +package org.tron.core.store; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.springframework.beans.factory.annotation.Autowired; +import org.tron.core.db.TronDatabase; +import org.tron.core.exception.BadItemException; +import org.tron.core.exception.ItemNotFoundException; + +import java.util.Spliterator; +import java.util.function.Consumer; + +public class CheckPointV2Store extends TronDatabase { + + @Autowired + public CheckPointV2Store(String dbPath) { + super(dbPath); + } + + @Override + public void put(byte[] key, byte[] item) { + } + + @Override + public void delete(byte[] key) { + getDbSource().deleteData(key); + } + + @Override + public byte[] get(byte[] key) + throws InvalidProtocolBufferException, ItemNotFoundException, BadItemException { + return null; + } + + @Override + public boolean has(byte[] key) { + return false; + } + + @Override + public void forEach(Consumer action) { + + } + + @Override + public Spliterator spliterator() { + return null; + } +} \ No newline at end of file diff --git a/chainbase/src/main/java/org/tron/core/store/CheckTmpStore.java b/chainbase/src/main/java/org/tron/core/store/CheckTmpStore.java index 870aa9aa1b8..09f60c83898 100644 --- a/chainbase/src/main/java/org/tron/core/store/CheckTmpStore.java +++ b/chainbase/src/main/java/org/tron/core/store/CheckTmpStore.java @@ -24,7 +24,6 @@ public void put(byte[] key, byte[] item) { @Override public void delete(byte[] key) { - } @Override diff --git a/common/src/main/java/org/tron/core/config/args/Storage.java b/common/src/main/java/org/tron/core/config/args/Storage.java index 1b8bd595be0..6d911f3b0ac 100644 --- a/common/src/main/java/org/tron/core/config/args/Storage.java +++ b/common/src/main/java/org/tron/core/config/args/Storage.java @@ -69,6 +69,9 @@ public class Storage { private static final String MAX_OPEN_FILES_CONFIG_KEY = "maxOpenFiles"; private static final String EVENT_SUBSCRIBE_CONTRACT_PARSE = "event.subscribe.contractParse"; + private static final String CHECKPOINT_VERSION_KEY = "storage.checkpoint.version"; + private static final String CHECKPOINT_SYNC_KEY = "storage.checkpoint.sync"; + /** * Default values of directory */ @@ -79,6 +82,8 @@ public class Storage { private static final String DEFAULT_DB_DIRECTORY = "database"; private static final String DEFAULT_INDEX_DIRECTORY = "index"; private static final String DEFAULT_INDEX_SWITCH = "on"; + private static final int DEFAULT_CHECKPOINT_VERSION = 1; + private static final boolean DEFAULT_CHECKPOINT_SYNC = true; private static final int DEFAULT_ESTIMATED_TRANSACTIONS = 1000; private Config storage; @@ -120,6 +125,14 @@ public class Storage { @Setter private String transactionHistorySwitch; + @Getter + @Setter + private int checkpointVersion; + + @Getter + @Setter + private boolean checkpointSync; + private Options defaultDbOptions; @Getter @@ -175,6 +188,18 @@ public static String getTransactionHistorySwitchFromConfig(final Config config) : DEFAULT_TRANSACTIONHISTORY_SWITCH; } + public static int getCheckpointVersionFromConfig(final Config config) { + return config.hasPath(CHECKPOINT_VERSION_KEY) + ? config.getInt(CHECKPOINT_VERSION_KEY) + : DEFAULT_CHECKPOINT_VERSION; + } + + public static boolean getCheckpointSyncFromConfig(final Config config) { + return config.hasPath(CHECKPOINT_SYNC_KEY) + ? config.getBoolean(CHECKPOINT_SYNC_KEY) + : DEFAULT_CHECKPOINT_SYNC; + } + public static int getEstimatedTransactionsFromConfig(final Config config) { if (!config.hasPath(ESTIMATED_TRANSACTIONS_CONFIG_KEY)) { return DEFAULT_ESTIMATED_TRANSACTIONS; diff --git a/framework/src/main/java/org/tron/common/application/ApplicationImpl.java b/framework/src/main/java/org/tron/common/application/ApplicationImpl.java index 1aa158d4be2..1be119f1be3 100644 --- a/framework/src/main/java/org/tron/common/application/ApplicationImpl.java +++ b/framework/src/main/java/org/tron/common/application/ApplicationImpl.java @@ -67,6 +67,7 @@ public void shutdown() { tronNetService.stop(); consensusService.stop(); synchronized (dbManager.getRevokingStore()) { + dbManager.getSession().reset(); closeRevokingStore(); closeAllStore(); } diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 029e89d6cd6..5fd20d2f8e5 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -501,6 +501,11 @@ public static void setParam(final String[] args, final String confFileName) { .filter(StringUtils::isNotEmpty) .orElse(Storage.getTransactionHistorySwitchFromConfig(config))); + PARAMETER.storage + .setCheckpointVersion(Storage.getCheckpointVersionFromConfig(config)); + PARAMETER.storage + .setCheckpointSync(Storage.getCheckpointSyncFromConfig(config)); + PARAMETER.storage.setEstimatedBlockTransactions( Storage.getEstimatedTransactionsFromConfig(config)); diff --git a/framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java b/framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java index e66fafaa477..93927d52d23 100644 --- a/framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java +++ b/framework/src/main/java/org/tron/tool/litefullnode/LiteFullNodeTool.java @@ -20,6 +20,7 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.rocksdb.RocksDBException; @@ -43,19 +44,21 @@ public class LiteFullNodeTool { private static final long START_TIME = System.currentTimeMillis() / 1000; + private static long RECENT_BLKS = 65536; + private static final String SNAPSHOT_DIR_NAME = "snapshot"; private static final String HISTORY_DIR_NAME = "history"; private static final String INFO_FILE_NAME = "info.properties"; private static final String BACKUP_DIR_PREFIX = ".bak_"; private static final String CHECKPOINT_DB = "tmp"; - private static long RECENT_BLKS = 65536; - + private static final String CHECKPOINT_DB_V2 = "checkpoint"; private static final String BLOCK_DB_NAME = "block"; private static final String BLOCK_INDEX_DB_NAME = "block-index"; private static final String TRANS_DB_NAME = "trans"; private static final String COMMON_DB_NAME = "common"; private static final String TRANSACTION_RET_DB_NAME = "transactionRetStore"; private static final String TRANSACTION_HISTORY_DB_NAME = "transactionHistoryStore"; + private static final String PROPERTIES_DB_NAME = "properties"; private static final String DIR_FORMAT_STRING = "%s%s%s"; @@ -198,29 +201,43 @@ private void split(String sourceDir, String destDir, List dbs) throws IO private void mergeCheckpoint(String sourceDir, String destDir, List destDbs) { logger.info("Begin to merge checkpoint to dataset."); try { - DBInterface tmpDb = DbTool.getDB(sourceDir, CHECKPOINT_DB); - try (DBIterator iterator = tmpDb.iterator()) { - for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) { - byte[] key = iterator.getKey(); - byte[] value = iterator.getValue(); - String dbName = SnapshotManager.simpleDecode(key); - byte[] realKey = Arrays.copyOfRange(key, dbName.getBytes().length + 4, key.length); - byte[] realValue = value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length); - if (destDbs != null && destDbs.contains(dbName)) { - DBInterface destDb = DbTool.getDB(destDir, dbName); - if (realValue != null) { - destDb.put(realKey, realValue); - } else { - destDb.delete(realKey); - } - } + List cpList = getCheckpointV2List(sourceDir); + if (cpList.size() > 0) { + for (String cp: cpList) { + DBInterface checkpointDb = DbTool.getDB(sourceDir + "/" + CHECKPOINT_DB_V2, cp); + recover(checkpointDb, destDir, destDbs); } + } else { + DBInterface tmpDb = DbTool.getDB(sourceDir, CHECKPOINT_DB); + recover(tmpDb, destDir, destDbs); } } catch (IOException | RocksDBException e) { throw new RuntimeException(e); } } + private void recover(DBInterface db, String destDir, List destDbs) + throws IOException, RocksDBException { + try (DBIterator iterator = db.iterator()) { + for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) { + byte[] key = iterator.getKey(); + byte[] value = iterator.getValue(); + String dbName = SnapshotManager.simpleDecode(key); + byte[] realKey = Arrays.copyOfRange(key, dbName.getBytes().length + 4, key.length); + byte[] realValue = + value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length); + if (destDbs != null && destDbs.contains(dbName)) { + DBInterface destDb = DbTool.getDB(destDir, dbName); + if (realValue != null) { + destDb.put(realKey, realValue); + } else { + destDb.delete(realKey); + } + } + } + } + } + private void generateInfoProperties(String propertyfile, String databaseDir) throws IOException, RocksDBException { logger.info("Create {} for dataset.", INFO_FILE_NAME); @@ -236,19 +253,34 @@ private void generateInfoProperties(String propertyfile, String databaseDir) private long getLatestBlockHeaderNum(String databaseDir) throws IOException, RocksDBException { // query latest_block_header_number from checkpoint first final String latestBlockHeaderNumber = "latest_block_header_number"; - byte[] value = DbTool.getDB(databaseDir, CHECKPOINT_DB).get( - Bytes.concat(simpleEncode(CHECKPOINT_DB), latestBlockHeaderNumber.getBytes())); - if (value != null && value.length > 1) { - return ByteArray.toLong(Arrays.copyOfRange(value, 1, value.length)); + List cpList = getCheckpointV2List(databaseDir); + DBInterface checkpointDb = null; + if (cpList.size() > 0) { + String lastestCp = cpList.get(cpList.size() - 1); + checkpointDb = DbTool.getDB(databaseDir + "/" + CHECKPOINT_DB_V2, lastestCp); + } else { + checkpointDb = DbTool.getDB(databaseDir, CHECKPOINT_DB); + } + Long blockNumber = getLatestBlockHeaderNumFromCP(checkpointDb, latestBlockHeaderNumber.getBytes()); + if (blockNumber != null) { + return blockNumber; } // query from propertiesDb if checkpoint not contains latest_block_header_number - DBInterface propertiesDb = DbTool.getDB(databaseDir, "properties"); + DBInterface propertiesDb = DbTool.getDB(databaseDir, PROPERTIES_DB_NAME); return Optional.ofNullable(propertiesDb.get(ByteArray.fromString(latestBlockHeaderNumber))) .map(ByteArray::toLong) .orElseThrow( () -> new IllegalArgumentException("not found latest block header number")); } + private Long getLatestBlockHeaderNumFromCP(DBInterface db, byte[] key) { + byte[] value = db.get(Bytes.concat(simpleEncode(PROPERTIES_DB_NAME), key)); + if (value != null && value.length > 1) { + return ByteArray.toLong(Arrays.copyOfRange(value, 1, value.length)); + } + return null; + } + /** * recent blocks, trans and genesis block. */ @@ -493,6 +525,14 @@ public static void reSetRecentBlks() { RECENT_BLKS = 65536; } + private List getCheckpointV2List(String sourceDir) { + File file = new File(Paths.get(sourceDir, CHECKPOINT_DB_V2).toString()); + if (file.exists() && file.isDirectory() && file.list() != null) { + return Arrays.stream(file.list()).sorted().collect(Collectors.toList()); + } + return Lists.newArrayList(); + } + private void run(Args argv) { if (StringUtils.isBlank(argv.fnDataPath) || StringUtils.isBlank(argv.datasetPath)) { throw new ParameterException("fnDataPath or datasetPath can't be null"); diff --git a/framework/src/main/resources/config-localtest.conf b/framework/src/main/resources/config-localtest.conf index 239b55d7b4f..15488c53743 100644 --- a/framework/src/main/resources/config-localtest.conf +++ b/framework/src/main/resources/config-localtest.conf @@ -51,7 +51,8 @@ storage { // maxOpenFiles = 100 // }, ] - + checkpoint.version = 2 + checkpoint.sync = true } node.discovery = { diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index aa1f51507bf..6a06f4ed0fe 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -90,10 +90,12 @@ storage { balance.history.lookup = false + # checkpoint.version = 2 + # checkpoint.sync = true + # the estimated number of block transactions (default 1000, min 100, max 10000). # so the total number of cached transactions is 65536 * txCache.estimatedTransactions # txCache.estimatedTransactions = 1000 - } node.discovery = { diff --git a/framework/src/test/java/org/tron/common/utils/PublicMethod.java b/framework/src/test/java/org/tron/common/utils/PublicMethod.java new file mode 100644 index 00000000000..b61ea6c8c4d --- /dev/null +++ b/framework/src/test/java/org/tron/common/utils/PublicMethod.java @@ -0,0 +1,105 @@ +package org.tron.common.utils; + +import com.google.protobuf.ByteString; +import java.math.BigInteger; +import org.tron.api.GrpcAPI; +import org.tron.api.WalletGrpc; +import org.tron.common.crypto.ECKey; +import org.tron.core.Wallet; +import org.tron.protos.Protocol; +import org.tron.protos.contract.BalanceContract; +import stest.tron.wallet.common.client.utils.TransactionUtils; + +public class PublicMethod { + + /** + * Convert to pub. + * @param priKey private key + * @return public addr + */ + public static byte[] getFinalAddress(String priKey) { + Wallet.setAddressPreFixByte((byte) 0x41); + ECKey key = ECKey.fromPrivate(new BigInteger(priKey, 16)); + return key.getAddress(); + } + + /** + * Transfer TRX. + * @param to addr receives the asset + * @param amount asset amount + * @param owner sender + * @param priKey private key of the sender + * @param blockingStubFull Grpc interface + * @return true or false + */ + public static Boolean sendcoin(byte[] to, long amount, byte[] owner, String priKey, + WalletGrpc.WalletBlockingStub blockingStubFull) { + Wallet.setAddressPreFixByte((byte) 0x41); + ECKey temKey = null; + try { + BigInteger priK = new BigInteger(priKey, 16); + temKey = ECKey.fromPrivate(priK); + } catch (Exception ex) { + ex.printStackTrace(); + } + final ECKey ecKey = temKey; + + int times = 0; + while (times++ <= 2) { + + BalanceContract.TransferContract.Builder builder = + BalanceContract.TransferContract.newBuilder(); + com.google.protobuf.ByteString bsTo = com.google.protobuf.ByteString.copyFrom(to); + com.google.protobuf.ByteString bsOwner = ByteString.copyFrom(owner); + builder.setToAddress(bsTo); + builder.setOwnerAddress(bsOwner); + builder.setAmount(amount); + + BalanceContract.TransferContract contract = builder.build(); + Protocol.Transaction transaction = blockingStubFull.createTransaction(contract); + if (transaction == null || transaction.getRawData().getContractCount() == 0) { + continue; + } + transaction = signTransaction(ecKey, transaction); + GrpcAPI.Return response = broadcastTransaction(transaction, blockingStubFull); + return response.getResult(); + } + return false; + } + + /** + * Sign TX. + * @param ecKey ecKey of the private key + * @param transaction transaction object + */ + public static Protocol.Transaction signTransaction(ECKey ecKey, + Protocol.Transaction transaction) { + if (ecKey == null || ecKey.getPrivKey() == null) { + return null; + } + transaction = TransactionUtils.setTimestamp(transaction); + return TransactionUtils.sign(transaction, ecKey); + } + + /** + * Broadcast TX. + * @param transaction transaction object + * @param blockingStubFull Grpc interface + */ + public static GrpcAPI.Return broadcastTransaction( + Protocol.Transaction transaction, WalletGrpc.WalletBlockingStub blockingStubFull) { + int i = 10; + GrpcAPI.Return response = blockingStubFull.broadcastTransaction(transaction); + while (!response.getResult() && response.getCode() == GrpcAPI.Return.response_code.SERVER_BUSY + && i > 0) { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + e.printStackTrace(); + } + i--; + response = blockingStubFull.broadcastTransaction(transaction); + } + return response; + } +} diff --git a/framework/src/test/java/org/tron/core/db2/CheckpointV2Test.java b/framework/src/test/java/org/tron/core/db2/CheckpointV2Test.java new file mode 100644 index 00000000000..dff2d376fd5 --- /dev/null +++ b/framework/src/test/java/org/tron/core/db2/CheckpointV2Test.java @@ -0,0 +1,106 @@ +package org.tron.core.db2; + +import com.google.common.collect.Maps; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import com.google.protobuf.ByteString; +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.tron.common.application.Application; +import org.tron.common.application.ApplicationFactory; +import org.tron.common.application.TronApplicationContext; +import org.tron.common.utils.FileUtil; +import org.tron.common.utils.Sha256Hash; +import org.tron.core.Constant; +import org.tron.core.capsule.BlockCapsule; +import org.tron.core.config.DefaultConfig; +import org.tron.core.config.args.Args; +import org.tron.core.db2.RevokingDbWithCacheNewValueTest.TestRevokingTronStore; +import org.tron.core.db2.core.Chainbase; +import org.tron.core.db2.core.SnapshotManager; + +@Slf4j +public class CheckpointV2Test { + + private SnapshotManager revokingDatabase; + private TronApplicationContext context; + private Application appT; + private TestRevokingTronStore tronDatabase; + + @Before + public void init() { + Args.setParam(new String[]{"-d", "output_SnapshotManager_test"}, + Constant.TEST_CONF); + Args.getInstance().getStorage().setCheckpointVersion(2); + Args.getInstance().getStorage().setCheckpointSync(true); + context = new TronApplicationContext(DefaultConfig.class); + appT = ApplicationFactory.create(context); + revokingDatabase = context.getBean(SnapshotManager.class); + revokingDatabase.enable(); + tronDatabase = new TestRevokingTronStore("testSnapshotManager-test"); + revokingDatabase.add(tronDatabase.getRevokingDB()); + } + + @After + public void removeDb() { + Args.clearParam(); + context.destroy(); + tronDatabase.close(); + FileUtil.deleteDir(new File("output_SnapshotManager_test")); + revokingDatabase.getCheckTmpStore().close(); + tronDatabase.close(); + } + + @Test + public void testCheckpointV2() { + while (revokingDatabase.size() != 0) { + revokingDatabase.pop(); + } + + revokingDatabase.setMaxFlushCount(0); + revokingDatabase.setUnChecked(false); + revokingDatabase.setMaxSize(0); + List dbList = revokingDatabase.getDbs(); + Map dbMap = dbList.stream() + .map(db -> Maps.immutableEntry(db.getDbName(), db)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + for (int i = 1; i <= 5; i++) { + BlockCapsule blockCapsule = new BlockCapsule(i, Sha256Hash.ZERO_HASH, + System.currentTimeMillis(), ByteString.EMPTY); + try (ISession tmpSession = revokingDatabase.buildSession()) { + dbMap.get("block").put(Longs.toByteArray(i), blockCapsule.getData()); + tmpSession.commit(); + } + } + revokingDatabase.buildSession(); + + Iterator> iterator = dbMap.get("block").iterator(); + Sha256Hash preDbHash = Sha256Hash.ZERO_HASH; + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + byte[] hashBytes = Bytes.concat(entry.getKey(), entry.getValue()); + preDbHash = Sha256Hash.of(true, Bytes.concat(preDbHash.getBytes(), hashBytes)); + } + + revokingDatabase.check(); + revokingDatabase.buildSession(); + + Iterator> iterator2 = dbMap.get("block").iterator(); + Sha256Hash afterDbHash = Sha256Hash.ZERO_HASH; + while (iterator2.hasNext()) { + Map.Entry entry = iterator2.next(); + byte[] hashBytes = Bytes.concat(entry.getKey(), entry.getValue()); + afterDbHash = Sha256Hash.of(true, Bytes.concat(afterDbHash.getBytes(), hashBytes)); + } + + Assert.assertEquals(0, preDbHash.compareTo(afterDbHash)); + } +} diff --git a/framework/src/test/java/org/tron/core/db2/SnapshotManagerTest.java b/framework/src/test/java/org/tron/core/db2/SnapshotManagerTest.java index 81681346799..966e947d828 100644 --- a/framework/src/test/java/org/tron/core/db2/SnapshotManagerTest.java +++ b/framework/src/test/java/org/tron/core/db2/SnapshotManagerTest.java @@ -1,6 +1,14 @@ package org.tron.core.db2; +import com.google.common.collect.Maps; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Longs; +import com.google.protobuf.ByteString; import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.junit.After; import org.junit.Assert; @@ -10,11 +18,14 @@ import org.tron.common.application.ApplicationFactory; import org.tron.common.application.TronApplicationContext; import org.tron.common.utils.FileUtil; +import org.tron.common.utils.Sha256Hash; import org.tron.core.Constant; +import org.tron.core.capsule.BlockCapsule; import org.tron.core.config.DefaultConfig; import org.tron.core.config.args.Args; import org.tron.core.db2.RevokingDbWithCacheNewValueTest.TestRevokingTronStore; import org.tron.core.db2.SnapshotRootTest.ProtoCapsuleTest; +import org.tron.core.db2.core.Chainbase; import org.tron.core.db2.core.SnapshotManager; import org.tron.core.exception.BadItemException; import org.tron.core.exception.ItemNotFoundException; @@ -59,11 +70,18 @@ public synchronized void testRefresh() revokingDatabase.setMaxFlushCount(0); revokingDatabase.setUnChecked(false); revokingDatabase.setMaxSize(5); + List dbList = revokingDatabase.getDbs(); + Map dbMap = dbList.stream() + .map(db -> Maps.immutableEntry(db.getDbName(), db)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ProtoCapsuleTest protoCapsule = new ProtoCapsuleTest("refresh".getBytes()); for (int i = 1; i < 11; i++) { ProtoCapsuleTest testProtoCapsule = new ProtoCapsuleTest(("refresh" + i).getBytes()); try (ISession tmpSession = revokingDatabase.buildSession()) { tronDatabase.put(protoCapsule.getData(), testProtoCapsule); + BlockCapsule blockCapsule = new BlockCapsule(i, Sha256Hash.ZERO_HASH, + System.currentTimeMillis(), ByteString.EMPTY); + dbMap.get("block").put(Longs.toByteArray(i), blockCapsule.getData()); tmpSession.commit(); } } diff --git a/framework/src/test/java/org/tron/program/LiteFullNodeToolTest.java b/framework/src/test/java/org/tron/program/LiteFullNodeToolTest.java index f3dcd448ed2..4549b2cde4d 100644 --- a/framework/src/test/java/org/tron/program/LiteFullNodeToolTest.java +++ b/framework/src/test/java/org/tron/program/LiteFullNodeToolTest.java @@ -21,6 +21,7 @@ import org.tron.common.config.DbBackupConfig; import org.tron.common.crypto.ECKey; import org.tron.common.utils.FileUtil; +import org.tron.common.utils.PublicMethod; import org.tron.common.utils.Utils; import org.tron.core.Wallet; import org.tron.core.config.DefaultConfig; @@ -111,17 +112,22 @@ public void clear() { @Test public void testToolsWithLevelDB() { logger.info("testToolsWithLevelDB start"); - testTools("LEVELDB"); + testTools("LEVELDB", 1); + } + @Test + public void testToolsWithLevelDBV2() { + logger.info("testToolsWithLevelDB start"); + testTools("LEVELDB", 2); } @Test public void testToolsWithRocksDB() { logger.info("testToolsWithRocksDB start"); - testTools("ROCKSDB"); + testTools("ROCKSDB", 1); } - private void testTools(String dbType) { + private void testTools(String dbType, int checkpointVersion) { final String[] argsForSnapshot = new String[]{"-o", "split", "-t", "snapshot", "--fn-data-path", DB_PATH + File.separator + databaseDir, "--dataset-path", @@ -134,6 +140,7 @@ private void testTools(String dbType) { new String[]{"-o", "merge", "--fn-data-path", DB_PATH + File.separator + databaseDir, "--dataset-path", DB_PATH + File.separator + "history"}; Args.getInstance().getStorage().setDbEngine(dbType); + Args.getInstance().getStorage().setCheckpointVersion(checkpointVersion); LiteFullNodeTool.setRecentBlks(3); // start fullnode startApp(); @@ -190,8 +197,8 @@ private void generateSomeTransactions(int during) { byte[] address = ecKey2.getAddress(); String sunPri = "cba92a516ea09f620a16ff7ee95ce0df1d56550a8babe9964981a7144c8a784a"; - byte[] sunAddress = getFinalAddress(sunPri); - sendcoin(address, 1L, + byte[] sunAddress = PublicMethod.getFinalAddress(sunPri); + PublicMethod.sendcoin(address, 1L, sunAddress, sunPri, blockingStubFull); try { Thread.sleep(sleepOnce); @@ -203,97 +210,4 @@ private void generateSomeTransactions(int during) { } } } - - /** - * Set public for future use. - * @param priKey private key - * @return public addr - */ - public static byte[] getFinalAddress(String priKey) { - Wallet.setAddressPreFixByte((byte) 0x41); - ECKey key = ECKey.fromPrivate(new BigInteger(priKey, 16)); - return key.getAddress(); - } - - /** - * Set public for future use. - * @param to addr receives the asset - * @param amount asset amount - * @param owner sender - * @param priKey private key of the sender - * @param blockingStubFull Grpc interface - * @return true or false - */ - public static Boolean sendcoin(byte[] to, long amount, byte[] owner, String priKey, - WalletGrpc.WalletBlockingStub blockingStubFull) { - Wallet.setAddressPreFixByte((byte) 0x41); - ECKey temKey = null; - try { - BigInteger priK = new BigInteger(priKey, 16); - temKey = ECKey.fromPrivate(priK); - } catch (Exception ex) { - ex.printStackTrace(); - } - final ECKey ecKey = temKey; - - int times = 0; - while (times++ <= 2) { - - BalanceContract.TransferContract.Builder builder = - BalanceContract.TransferContract.newBuilder(); - ByteString bsTo = ByteString.copyFrom(to); - ByteString bsOwner = ByteString.copyFrom(owner); - builder.setToAddress(bsTo); - builder.setOwnerAddress(bsOwner); - builder.setAmount(amount); - - BalanceContract.TransferContract contract = builder.build(); - Protocol.Transaction transaction = blockingStubFull.createTransaction(contract); - if (transaction == null || transaction.getRawData().getContractCount() == 0) { - continue; - } - transaction = signTransaction(ecKey, transaction); - GrpcAPI.Return response = broadcastTransaction(transaction, blockingStubFull); - return response.getResult(); - } - return false; - } - - /** - * Set public for future use. - * @param ecKey ecKey of the private key - * @param transaction transaction object - */ - public static Protocol.Transaction signTransaction(ECKey ecKey, - Protocol.Transaction transaction) { - if (ecKey == null || ecKey.getPrivKey() == null) { - logger.warn("Warning: Can't sign,there is no private key !!"); - return null; - } - transaction = TransactionUtils.setTimestamp(transaction); - return TransactionUtils.sign(transaction, ecKey); - } - - /** - * Set public for future use. - * @param transaction transaction object - * @param blockingStubFull Grpc interface - */ - public static GrpcAPI.Return broadcastTransaction( - Protocol.Transaction transaction, WalletGrpc.WalletBlockingStub blockingStubFull) { - int i = 10; - GrpcAPI.Return response = blockingStubFull.broadcastTransaction(transaction); - while (!response.getResult() && response.getCode() == GrpcAPI.Return.response_code.SERVER_BUSY - && i > 0) { - try { - Thread.sleep(300); - } catch (InterruptedException e) { - e.printStackTrace(); - } - i--; - response = blockingStubFull.broadcastTransaction(transaction); - } - return response; - } - }