Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core-p2p): do not suspend peer for AppNotReady #2590

Merged
merged 10 commits into from
May 21, 2019
Merged
2 changes: 1 addition & 1 deletion __tests__/unit/core-blockchain/mocks/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { p2p } from "./p2p";
import { transactionPool } from "./transactionPool";

export const blockchain = {
queue: { length: () => 0, push: () => undefined, clear: () => undefined },
queue: { length: () => 0, push: () => undefined, clear: () => undefined, idle: () => undefined },
isStopped: false,

setWakeUp: () => undefined,
Expand Down
6 changes: 3 additions & 3 deletions __tests__/unit/core-blockchain/state-machine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe("State Machine", () => {
describe("checkLastDownloadedBlockSynced", () => {
it('should dispatch the event "NOTSYNCED" by default', async () => {
blockchain.isSynced = jest.fn(() => false);
blockchain.queue.length = jest.fn(() => 1);
blockchain.queue.idle = jest.fn(() => false);
await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "NOTSYNCED");
});

Expand All @@ -68,7 +68,7 @@ describe("State Machine", () => {

it('should dispatch the event "NETWORKHALTED" if stateStorage.noBlockCounter > 5 and process queue is empty', async () => {
blockchain.isSynced = jest.fn(() => false);
blockchain.queue.length = jest.fn(() => 0);
blockchain.queue.idle = jest.fn(() => true);
stateStorage.noBlockCounter = 6;
await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "NETWORKHALTED");
});
Expand All @@ -78,7 +78,7 @@ describe("State Machine", () => {
- stateStorage.p2pUpdateCounter + 1 > 3 (network keeps missing blocks)
- blockchain.p2p.checkNetworkHealth() returns a forked network status`, async () => {
blockchain.isSynced = jest.fn(() => false);
blockchain.queue.length = jest.fn(() => 0);
blockchain.queue.idle = jest.fn(() => true);
stateStorage.noBlockCounter = 6;
stateStorage.p2pUpdateCounter = 3;

Expand Down
10 changes: 0 additions & 10 deletions __tests__/unit/core-p2p/peer-guard.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import "./mocks/core-container";

import { P2P } from "@arkecosystem/core-interfaces";
import { dato } from "@faustbrian/dato";
import { SocketErrors } from "../../../packages/core-p2p/src/enums";
import { PeerConnector } from "../../../packages/core-p2p/src/peer-connector";
import { PeerGuard } from "../../../packages/core-p2p/src/peer-guard";
import { createStubPeer } from "../../helpers/peers";
Expand Down Expand Up @@ -68,14 +67,5 @@ describe("PeerGuard", () => {
expect(reason).toBe("High Latency");
expect(convertToMinutes(until)).toBe(1);
});

it('should return a 30 seconds suspension for "Application not ready"', () => {
connector.getError = jest.fn(() => SocketErrors.AppNotReady);

const { until, reason } = guard.analyze(dummy);

expect(reason).toBe("Application is not ready");
expect(convertToMinutes(until)).toBe(0.5);
});
});
});
2 changes: 1 addition & 1 deletion __tests__/unit/core-state/mocks/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { p2p } from "./p2p";
import { transactionPool } from "./transactionPool";

export const blockchain = {
queue: { length: () => 0, push: () => undefined, clear: () => undefined },
queue: { length: () => 0, push: () => undefined, clear: () => undefined, idle: () => undefined },
isStopped: false,

setWakeUp: () => undefined,
Expand Down
47 changes: 23 additions & 24 deletions packages/core-blockchain/src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,11 +393,33 @@ export class Blockchain implements blockchain.IBlockchain {
}
}

if (acceptedBlocks.length > 0) {
try {
await this.database.saveBlocks(acceptedBlocks);
} catch (exceptionSaveBlocks) {
logger.error(
`Could not save ${acceptedBlocks.length} blocks to database : ${exceptionSaveBlocks.stack}`,
);

const resetToHeight = async height => {
try {
return await this.removeTopBlocks((await this.database.getLastBlock()).data.height - height);
} catch (e) {
logger.error(`Could not remove top blocks from database : ${e.stack}`);

return resetToHeight(height); // keep trying, we can't do anything while this fails
}
};
await resetToHeight(acceptedBlocks[0].data.height - 1);

return this.processBlocks(blocks, callback); // keep trying, we can't do anything while this fails
}
}

if (
lastProcessResult === BlockProcessorResult.Accepted ||
lastProcessResult === BlockProcessorResult.DiscardedButCanBeBroadcasted
) {
// broadcast only current block
const currentBlock: Interfaces.IBlock = blocks[blocks.length - 1];
const blocktime: number = config.getMilestone(currentBlock.data.height).blocktime;

Expand All @@ -406,29 +428,6 @@ export class Blockchain implements blockchain.IBlockchain {
}
}

if (acceptedBlocks.length === 0) {
return callback([]);
}

try {
await this.database.saveBlocks(acceptedBlocks);
} catch (exceptionSaveBlocks) {
logger.error(`Could not save ${acceptedBlocks.length} blocks to database : ${exceptionSaveBlocks.stack}`);

const resetToHeight = async height => {
try {
return await this.removeTopBlocks((await this.database.getLastBlock()).data.height - height);
} catch (e) {
logger.error(`Could not remove top blocks from database : ${e.stack}`);

return resetToHeight(height); // keep trying, we can't do anything while this fails
}
};
await resetToHeight(acceptedBlocks[0].data.height - 1);

return this.processBlocks(blocks, callback); // keep trying, we can't do anything while this fails
}

return callback(acceptedBlocks);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/core-blockchain/src/state-machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({
}

// tried to download but no luck after 5 tries (looks like network missing blocks)
if (stateStorage.noBlockCounter > 5 && blockchain.queue.length() === 0) {
if (stateStorage.noBlockCounter > 5 && blockchain.queue.idle()) {
logger.info("Tried to sync 5 times to different nodes, looks like the network is missing blocks");

stateStorage.noBlockCounter = 0;
Expand Down Expand Up @@ -101,7 +101,7 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({
stateStorage.networkStart = false;

blockchain.dispatch("SYNCFINISHED");
} else if (blockchain.queue.length() === 0) {
} else if (blockchain.queue.idle()) {
blockchain.dispatch("PROCESSFINISHED");
}
},
Expand Down
2 changes: 1 addition & 1 deletion packages/core-p2p/src/peer-communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {

public async downloadBlocks(peer: P2P.IPeer, fromBlockHeight: number): Promise<Interfaces.IBlockData[]> {
try {
this.logger.info(`Downloading blocks from height ${fromBlockHeight.toLocaleString()} via ${peer.ip}`);
this.logger.debug(`Downloading blocks from height ${fromBlockHeight.toLocaleString()} via ${peer.ip}`);

return await this.getPeerBlocks(peer, fromBlockHeight);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion packages/core-p2p/src/peer-guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class PeerGuard implements P2P.IPeerGuard {
}

if (this.connector.hasError(peer, SocketErrors.AppNotReady)) {
return this.createPunishment(this.offences.applicationNotReady);
return undefined; // no punishment when app is not ready
}

if (peer.latency === -1) {
Expand Down