Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resync handling [C] #7181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import bisq.core.dao.state.model.blockchain.BaseTxOutput;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.governance.IssuanceType;
import bisq.core.dao.state.storage.DaoStateStorageService;
import bisq.core.user.Preferences;

import bisq.network.p2p.NodeAddress;
Expand All @@ -41,7 +42,6 @@
import bisq.common.UserThread;
import bisq.common.config.Config;
import bisq.common.crypto.Hash;
import bisq.common.file.FileUtil;
import bisq.common.util.Hex;
import bisq.common.util.Utilities;

Expand Down Expand Up @@ -102,6 +102,7 @@ default void onDaoStateBlockCreated() {
}

private final DaoStateService daoStateService;
private final DaoStateStorageService daoStateStorageService;
private final DaoStateNetworkService daoStateNetworkService;
private final GenesisTxInfo genesisTxInfo;
private final Set<String> seedNodeAddresses;
Expand Down Expand Up @@ -144,13 +145,15 @@ default void onDaoStateBlockCreated() {

@Inject
public DaoStateMonitoringService(DaoStateService daoStateService,
DaoStateStorageService daoStateStorageService,
DaoStateNetworkService daoStateNetworkService,
GenesisTxInfo genesisTxInfo,
SeedNodeRepository seedNodeRepository,
Preferences preferences,
@Named(Config.STORAGE_DIR) File storageDir,
@Named(Config.IGNORE_DEV_MSG) boolean ignoreDevMsg) {
this.daoStateService = daoStateService;
this.daoStateStorageService = daoStateStorageService;
this.daoStateNetworkService = daoStateNetworkService;
this.genesisTxInfo = genesisTxInfo;
this.preferences = preferences;
Expand Down Expand Up @@ -481,35 +484,15 @@ private void verifyCheckpoints() {
Hex.encode(checkpoint.getHash()),
checkpoint);
try {
// Delete state and stop
removeFile("DaoStateStore");
removeFile("BlindVoteStore");
removeFile("ProposalStore");
removeFile("TempProposalStore");

listeners.forEach(Listener::onCheckpointFailed);
daoStateStorageService.removeAndBackupAllDaoData();
} catch (Throwable t) {
log.error("removeAndBackupAllDaoData failed", t);
}
listeners.forEach(Listener::onCheckpointFailed);
}
}));
}

private void removeFile(String storeName) {
long currentTime = System.currentTimeMillis();
String newFileName = storeName + "_" + currentTime;
String backupDirName = "out_of_sync_dao_data";
File corrupted = new File(storageDir, storeName);
try {
if (corrupted.exists()) {
FileUtil.removeAndBackupFile(storageDir, corrupted, newFileName, backupDirName);
}
} catch (Throwable t) {
t.printStackTrace();
log.error(t.toString());
}
}

private boolean isSeedNode(String peersNodeAddress) {
return seedNodeAddresses.contains(peersNodeAddress);
}
Expand Down
12 changes: 3 additions & 9 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class BsqNode implements DaoSetupService {
private final String genesisTxId;
private final int genesisBlockHeight;
private final ExportJsonFilesService exportJsonFilesService;
private final DaoStateSnapshotService daoStateSnapshotService;
protected final DaoStateSnapshotService daoStateSnapshotService;
private final P2PServiceListener p2PServiceListener;
protected boolean parseBlockchainComplete;
protected boolean p2pNetworkReady;
Expand Down Expand Up @@ -169,7 +169,7 @@ public void shutDown() {

@SuppressWarnings("WeakerAccess")
protected void onInitialized() {
daoStateSnapshotService.applySnapshot(false);
daoStateSnapshotService.applyPersistedSnapshot();

if (p2PService.isBootstrapped()) {
log.info("onAllServicesInitialized: isBootstrapped");
Expand All @@ -195,12 +195,6 @@ protected void onParseBlockChainComplete() {
maybeExportToJson();
}

@SuppressWarnings("WeakerAccess")
protected void startReOrgFromLastSnapshot() {
daoStateSnapshotService.applySnapshot(true);
}


protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFromSnapshotException {
if (shutdownInProgress) {
return Optional.empty();
Expand Down Expand Up @@ -273,7 +267,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
lastBlock.isPresent() ? lastBlock.get().getHash() : "lastBlock not present");

pendingBlocks.clear();
startReOrgFromLastSnapshot();
daoStateSnapshotService.revertToLastSnapshot();
startParseBlocks();
throw new RequiredReorgFromSnapshotException(rawBlock);
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ private void handleError(Throwable throwable) {
if (numExceptions > 10) {
log.warn("We got {} RPC HttpExceptions at our block handler.", numExceptions);
pendingBlocks.clear();
startReOrgFromLastSnapshot();
revertToLastSnapshot();
startParseBlocks();
numExceptions = 0;
}
Expand Down Expand Up @@ -301,7 +301,7 @@ private void handleError(Throwable throwable) {
return;
} else if (cause instanceof NotificationHandlerException) {
log.error("Error from within block notification daemon: {}", cause.getCause().toString());
startReOrgFromLastSnapshot();
revertToLastSnapshot();
startParseBlocks();
return;
} else if (cause instanceof Error) {
Expand All @@ -314,4 +314,8 @@ private void handleError(Throwable throwable) {
errorMessageHandler.accept(errorMessage);
}
}

private void revertToLastSnapshot() {
daoStateSnapshotService.revertToLastSnapshot();
}
}
118 changes: 72 additions & 46 deletions core/src/main/java/bisq/core/dao/state/DaoStateSnapshotService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.io.IOException;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

Expand Down Expand Up @@ -78,7 +79,7 @@ public class DaoStateSnapshotService implements DaoSetupService, DaoStateListene
private int daoRequiresRestartHandlerAttempts = 0;
private boolean readyForPersisting = true;
private boolean isParseBlockChainComplete;

private final List<Integer> heightsOfLastAppliedSnapshots = new ArrayList<>();

///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
Expand Down Expand Up @@ -269,47 +270,72 @@ private void createSnapshot() {
log.info("Cloned new daoStateCandidate at height {} took {} ms.", snapshotHeight, System.currentTimeMillis() - ts);
}

public void applySnapshot(boolean fromReorg) {
DaoState persistedBsqState = daoStateStorageService.getPersistedBsqState();
LinkedList<DaoStateHash> persistedDaoStateHashChain = daoStateStorageService.getPersistedDaoStateHashChain();
if (persistedBsqState != null) {
int chainHeightOfPersisted = persistedBsqState.getChainHeight();
if (!persistedBsqState.getBlocks().isEmpty()) {
int heightOfLastBlock = persistedBsqState.getLastBlock().getHeight();
if (heightOfLastBlock != chainHeightOfPersisted) {
log.warn("chainHeightOfPersisted must be same as heightOfLastBlock. heightOfLastBlock={}, chainHeightOfPersisted={}",
heightOfLastBlock, chainHeightOfPersisted);
resyncDaoStateFromResources();
return;
}
if (isHeightAtLeastGenesisHeight(heightOfLastBlock)) {
if (chainHeightOfLastAppliedSnapshot != chainHeightOfPersisted) {
chainHeightOfLastAppliedSnapshot = chainHeightOfPersisted;
daoStateService.applySnapshot(persistedBsqState);
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
daoStateStorageService.releaseMemory();
} else {
// The reorg might have been caused by the previous parsing which might contains a range of
// blocks.
log.warn("We applied already a snapshot with chainHeight {}. " +
"We remove all dao store files and shutdown. After a restart resource files will " +
"be applied if available.",
chainHeightOfLastAppliedSnapshot);
resyncDaoStateFromResources();
}
}
} else if (fromReorg) {
log.info("We got a reorg and we want to apply the snapshot but it is empty. " +
public void applyPersistedSnapshot() {
applySnapshot(true);
}

public void revertToLastSnapshot() {
applySnapshot(false);
}

private void applySnapshot(boolean fromInitialize) {
DaoState persistedDaoState = daoStateStorageService.getPersistedBsqState();
if (persistedDaoState == null) {
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
return;
}

int chainHeightOfPersistedDaoState = persistedDaoState.getChainHeight();
int numSameAppliedSnapshots = (int) heightsOfLastAppliedSnapshots.stream()
.filter(height -> height == chainHeightOfPersistedDaoState)
.count();
if (numSameAppliedSnapshots >= 3) {
log.warn("We got called applySnapshot the 3rd time with the same snapshot height. " +
"We abort and call resyncDaoStateFromResources.");
resyncDaoStateFromResources();
return;
}
heightsOfLastAppliedSnapshots.add(chainHeightOfPersistedDaoState);

if (persistedDaoState.getBlocks().isEmpty()) {
if (fromInitialize) {
log.info("No Bsq blocks in DaoState. Expected if no data are provided yet from resources or persisted data.");
} else {
log.info("We got a reorg or error and we want to apply the snapshot but it is empty. " +
"That is expected in the first blocks until the first snapshot has been created. " +
"We remove all dao store files and shutdown. " +
"After a restart resource files will be applied if available.");
resyncDaoStateFromResources();
} else {
log.info("No Bsq blocks in DaoState. Expected if no data are provided yet from resources or persisted data.");
}
} else {
log.info("Try to apply snapshot but no stored snapshot available. That is expected at first blocks.");
return;
}

if (!daoStateStorageService.isChainHeighMatchingLastBlockHeight()) {
resyncDaoStateFromResources();
return;
}

if (!isHeightAtLeastGenesisHeight(chainHeightOfPersistedDaoState)) {
log.error("heightOfPersistedLastBlock is below genesis height. This should never happen.");
return;
}

if (chainHeightOfLastAppliedSnapshot == chainHeightOfPersistedDaoState) {
// The reorg might have been caused by the previous parsing which might contains a range of
// blocks.
log.warn("We applied already a snapshot with chainHeight {}. " +
"We remove all dao store files and shutdown. After a restart resource files will " +
"be applied if available.",
chainHeightOfLastAppliedSnapshot);
resyncDaoStateFromResources();
return;
}

chainHeightOfLastAppliedSnapshot = chainHeightOfPersistedDaoState;
daoStateService.applySnapshot(persistedDaoState);
LinkedList<DaoStateHash> persistedDaoStateHashChain = daoStateStorageService.getPersistedDaoStateHashChain();
daoStateMonitoringService.applySnapshot(persistedDaoStateHashChain);
daoStateStorageService.releaseMemory();
}


Expand All @@ -323,20 +349,20 @@ private boolean isHeightAtLeastGenesisHeight(int heightOfLastBlock) {

private void resyncDaoStateFromResources() {
log.info("resyncDaoStateFromResources called");
if (resyncDaoStateFromResourcesHandler == null && ++daoRequiresRestartHandlerAttempts <= 3) {
log.warn("resyncDaoStateFromResourcesHandler has not been initialized yet, will try again in 10 seconds");
UserThread.runAfter(this::resyncDaoStateFromResources, 10); // a delay for the app to init
return;
if (resyncDaoStateFromResourcesHandler == null) {
if (++daoRequiresRestartHandlerAttempts <= 3) {
log.warn("resyncDaoStateFromResourcesHandler has not been initialized yet, will try again in 10 seconds");
UserThread.runAfter(this::resyncDaoStateFromResources, 10); // a delay for the app to init
return;
} else {
log.warn("No resyncDaoStateFromResourcesHandler has not been set. We shutdown non-gracefully with a failure code on exit");
System.exit(1);
}
}
try {
daoStateStorageService.removeAndBackupAllDaoData();
// the restart handler informs the user of the need to restart bisq (in desktop mode)
if (resyncDaoStateFromResourcesHandler == null) {
log.error("resyncDaoStateFromResourcesHandler COULD NOT be called as it has not been initialized yet");
} else {
log.info("calling resyncDaoStateFromResourcesHandler...");
resyncDaoStateFromResourcesHandler.run();
}
resyncDaoStateFromResourcesHandler.run();
} catch (IOException e) {
log.error("Error at resyncDaoStateFromResources: {}", e.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ public class BsqBlocksStorageService {
public final static String NAME = "BsqBlocks";

private final int genesisBlockHeight;
private final File storageDir;
private final File blocksDir;
private final BlocksPersistence blocksPersistence;
@Getter
private int chainHeightOfPersistedBlocks;

@Inject
public BsqBlocksStorageService(GenesisTxInfo genesisTxInfo,
PersistenceProtoResolver persistenceProtoResolver,
@Named(Config.STORAGE_DIR) File dbStorageDir) {
@Named(Config.STORAGE_DIR) File storageDir) {
genesisBlockHeight = genesisTxInfo.getGenesisBlockHeight();
storageDir = new File(dbStorageDir.getAbsolutePath() + File.separator + NAME);
blocksPersistence = new BlocksPersistence(storageDir, NAME, persistenceProtoResolver);
blocksDir = new File(storageDir.getAbsolutePath() + File.separator + NAME);
blocksPersistence = new BlocksPersistence(blocksDir, NAME, persistenceProtoResolver);
}

public void persistBlocks(List<Block> blocks) {
Expand Down Expand Up @@ -108,7 +108,7 @@ void copyFromResources(String postFix) {
String dirName = BsqBlocksStorageService.NAME;
String resourceDir = dirName + postFix;
try {
if (storageDir.exists()) {
if (blocksDir.exists()) {
log.info("No resource directory was copied. {} exists already.", dirName);
return;
}
Expand All @@ -118,11 +118,11 @@ void copyFromResources(String postFix) {
log.info("No files in directory. {}", resourceDir);
return;
}
if (!storageDir.exists()) {
storageDir.mkdir();
if (!blocksDir.exists()) {
blocksDir.mkdir();
}
for (String fileName : fileNames) {
File destinationFile = new File(storageDir, fileName);
File destinationFile = new File(blocksDir, fileName);
// File.separator doesn't appear to work on Windows. It has to be "/", not "\".
// See: https://github.com/bisq-network/bisq/pull/5909#pullrequestreview-827992563
FileUtil.resourceToFile(resourceDir + "/" + fileName, destinationFile);
Expand All @@ -144,12 +144,9 @@ public void removeBlocksDirectory() {
blocksPersistence.removeBlocksDirectory();
}

// We recreate the directory so that we don't fill the blocks after restart from resources
// In copyFromResources we only check for the directory not the files inside.
public void removeBlocksInDirectory() {
blocksPersistence.removeBlocksDirectory();
if (!storageDir.exists()) {
storageDir.mkdir();
public void makeBlocksDirectory() {
if (!blocksDir.exists()) {
blocksDir.mkdir();
}
}
}
Loading
Loading