Skip to content

Commit

Permalink
feat: save blocks in batches during sync (#2500)
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one authored and faustbrian committed Apr 29, 2019
1 parent eeab766 commit ca74c46
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 27 deletions.
8 changes: 4 additions & 4 deletions __tests__/integration/core-blockchain/blockchain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ describe("Blockchain", () => {
.getStruct();

const transferBlock = createBlock(forgerKeys, [transfer]);
await blockchain.processBlock(transferBlock, mockCallback);
await blockchain.processBlocks([transferBlock], mockCallback);

const wallet = blockchain.database.walletManager.findByPublicKey(keyPair.publicKey);
const walletForger = blockchain.database.walletManager.findByPublicKey(forgerKeys.publicKey);
Expand All @@ -215,7 +215,7 @@ describe("Blockchain", () => {
let nextForgerWallet = delegates.find(wallet => wallet.publicKey === nextForger.publicKey);

const voteBlock = createBlock(nextForgerWallet, [vote]);
await blockchain.processBlock(voteBlock, mockCallback);
await blockchain.processBlocks([voteBlock], mockCallback);

// Wallet paid a fee of 1 and the vote has been placed.
expect(wallet.balance).toEqual(Utils.BigNumber.make(124));
Expand All @@ -236,7 +236,7 @@ describe("Blockchain", () => {
nextForgerWallet = delegates.find(wallet => wallet.publicKey === nextForger.publicKey);

const unvoteBlock = createBlock(nextForgerWallet, [unvote]);
await blockchain.processBlock(unvoteBlock, mockCallback);
await blockchain.processBlocks([unvoteBlock], mockCallback);

// Wallet paid a fee of 1 and no longer voted a delegate
expect(wallet.balance).toEqual(Utils.BigNumber.make(123));
Expand Down Expand Up @@ -329,6 +329,6 @@ async function __addBlocks(untilHeight) {

for (let height = lastHeight + 1; height < untilHeight && height < 155; height++) {
const blockToProcess = Blocks.BlockFactory.fromData(allBlocks[height - 2]);
await blockchain.processBlock(blockToProcess, () => null);
await blockchain.processBlocks([blockToProcess], () => null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ describe("Transaction Guard", () => {
const blockVerified = Blocks.BlockFactory.fromData(block);
blockVerified.verification.verified = true;

await blockchain.processBlock(blockVerified, () => null);
await blockchain.processBlocks([blockVerified], () => null);
};

const forgedErrorMessage = id => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ describe("Apply transactions and block rewards to wallets on new block", () => {
const blockWithRewardVerified = Blocks.BlockFactory.fromData(blockWithReward);
blockWithRewardVerified.verification.verified = true;

await blockchain.processBlock(blockWithRewardVerified, () => null);
await blockchain.processBlocks([blockWithRewardVerified], () => null);

const delegateWallet = poolWalletManager.findByPublicKey(generatorPublicKey);

Expand Down
36 changes: 31 additions & 5 deletions __tests__/unit/core-blockchain/blockchain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { genesisBlock as GB } from "../../utils/config/testnet/genesisBlock";
import { blocks101to155 } from "../../utils/fixtures/testnet/blocks101to155";
import { blocks2to100 } from "../../utils/fixtures/testnet/blocks2to100";
import { config } from "./mocks/config";
import { database } from "./mocks/database";
import { logger } from "./mocks/logger";
import { getMonitor } from "./mocks/p2p/network-monitor";
import { getStorage } from "./mocks/p2p/peer-storage";
Expand Down Expand Up @@ -88,11 +89,11 @@ describe("Blockchain", () => {

const blocksToEnqueue = [blocks101to155[54]];
blockchain.enqueueBlocks(blocksToEnqueue);
expect(processQueuePush).toHaveBeenCalledWith(blocksToEnqueue);
expect(processQueuePush).toHaveBeenCalledWith({ blocks: blocksToEnqueue });
});
});

describe("processBlock", () => {
describe("processBlocks", () => {
const block3 = BlockFactory.fromData(blocks2to100[1]);
let getLastBlock;
let setLastBlock;
Expand All @@ -112,7 +113,7 @@ describe("Blockchain", () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};

await blockchain.processBlock(BlockFactory.fromData(blocks2to100[2]), mockCallback);
await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
Expand All @@ -122,13 +123,38 @@ describe("Blockchain", () => {
const mockCallback = jest.fn(() => true);
const lastBlock = blockchain.getLastBlock();

await blockchain.processBlock(lastBlock, mockCallback);
await blockchain.processBlocks([lastBlock], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
expect(blockchain.getLastBlock()).toEqual(lastBlock);
});

it("should process a new block with database saveBlocks failing once", async () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};
database.saveBlocks = jest.fn().mockRejectedValueOnce(new Error("oops"));
jest.spyOn(blockchain, "removeTopBlocks").mockReturnValueOnce(null);

await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
});

it("should process a new block with database saveBlocks + getLastBlock failing once", async () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};
jest.spyOn(database, "saveBlocks").mockRejectedValueOnce(new Error("oops saveBlocks"));
jest.spyOn(database, "getLastBlock").mockRejectedValueOnce(new Error("oops getLastBlock"));
jest.spyOn(blockchain, "removeTopBlocks").mockReturnValueOnce(null);

await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
});

it("should broadcast a block if (Crypto.Slots.getSlotNumber() * blocktime <= block.data.timestamp)", async () => {
blockchain.state.started = true;

Expand All @@ -138,7 +164,7 @@ describe("Blockchain", () => {

const broadcastBlock = jest.spyOn(getMonitor, "broadcastBlock");

await blockchain.processBlock(lastBlock, mockCallback);
await blockchain.processBlocks([lastBlock], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
Expand Down
3 changes: 2 additions & 1 deletion __tests__/unit/core-blockchain/mocks/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ export const database = {
commitQueuedQueries: () => null,
buildWallets: () => null,
saveWallets: () => null,
getLastBlock: () => null,
getLastBlock: async () => ({ data: { height: 1 } }),
saveBlock: () => null,
saveBlocks: () => null,
verifyBlockchain: () => true,
deleteRound: () => null,
applyRound: () => null,
Expand Down
2 changes: 1 addition & 1 deletion __tests__/unit/core-blockchain/state-machine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ describe("State Machine", () => {
databaseMocks = {
getLastBlock: jest
.spyOn(blockchain.database, "getLastBlock")
.mockReturnValue(BlockFactory.fromData(genesisBlock)),
.mockResolvedValue(BlockFactory.fromData(genesisBlock)),
// @ts-ignore
saveBlock: jest.spyOn(blockchain.database, "saveBlock").mockReturnValue(true),
verifyBlockchain: jest.spyOn(blockchain.database, "verifyBlockchain").mockReturnValue(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ export class DatabaseConnectionStub implements Database.IConnection {
public saveBlock(block: Blocks.Block): Promise<any> {
return undefined;
}

public saveBlocks(blocks: Blocks.Block[]): Promise<any> {
return undefined;
}
}
58 changes: 48 additions & 10 deletions packages/core-blockchain/src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ export class Blockchain implements blockchain.IBlockchain {
this.actions = stateMachine.actionMap(this);
this.blockProcessor = new BlockProcessor(this);

this.queue = async.queue((block: Interfaces.IBlockData, cb) => {
this.queue = async.queue((blockList: { blocks: Interfaces.IBlockData[] }, cb) => {
try {
return this.processBlock(Blocks.BlockFactory.fromData(block), cb);
return this.processBlocks(blockList.blocks.map(b => Blocks.BlockFactory.fromData(b)), cb);
} catch (error) {
logger.error(`Failed to process block in queue: ${block.height.toLocaleString()}`);
logger.error(
`Failed to process ${blockList.blocks.length} blocks from height ${
blockList.blocks[0].height
} in queue.`,
);
logger.error(error.stack);
return cb();
}
Expand Down Expand Up @@ -264,7 +268,7 @@ export class Blockchain implements blockchain.IBlockchain {
return;
}

this.queue.push(blocks);
this.queue.push({ blocks });
this.state.lastDownloadedBlock = BlockFactory.fromData(blocks.slice(-1)[0]);
}

Expand Down Expand Up @@ -361,18 +365,52 @@ export class Blockchain implements blockchain.IBlockchain {
/**
* Process the given block.
*/
public async processBlock(block: Interfaces.IBlock, callback): Promise<any> {
const result: BlockProcessorResult = await this.blockProcessor.process(block);
public async processBlocks(blocks: Interfaces.IBlock[], callback): Promise<any> {
const acceptedBlocks: Interfaces.IBlock[] = [];
let lastProcessResult: BlockProcessorResult;
for (const block of blocks) {
lastProcessResult = await this.blockProcessor.process(block);

if (lastProcessResult === BlockProcessorResult.Accepted) {
acceptedBlocks.push(block);
}
}

if (result === BlockProcessorResult.Accepted || result === BlockProcessorResult.DiscardedButCanBeBroadcasted) {
if (
lastProcessResult === BlockProcessorResult.Accepted ||
lastProcessResult === BlockProcessorResult.DiscardedButCanBeBroadcasted
) {
// broadcast only current block
const blocktime: number = config.getMilestone(block.data.height).blocktime;
const currentBlock = blocks[blocks.length - 1];
const blocktime: number = config.getMilestone(currentBlock.data.height).blocktime;

if (this.state.started && Crypto.Slots.getSlotNumber() * blocktime <= block.data.timestamp) {
this.p2p.getMonitor().broadcastBlock(block);
if (this.state.started && Crypto.Slots.getSlotNumber() * blocktime <= currentBlock.data.timestamp) {
this.p2p.getMonitor().broadcastBlock(currentBlock);
}
}

if (acceptedBlocks.length === 0) {
return callback();
}

try {
await this.database.saveBlocks(acceptedBlocks);
} catch (exceptionSaveBlocks) {
logger.error(`Could not save ${acceptedBlocks.length} blocks to database : ${exceptionSaveBlocks.stack}`);

const resetToHeight = async height => {
try {
return await this.removeTopBlocks((await this.database.getLastBlock()).data.height - height);
} catch (e) {
logger.error(`Could not remove top blocks from database : ${e.stack}`);
return resetToHeight(height); // keep trying, we can't do anything while this fails
}
};
await resetToHeight(acceptedBlocks[0].data.height - 1);

return this.processBlocks(blocks, callback); // keep trying, we can't do anything while this fails
}

return callback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export class AcceptBlockHandler extends BlockHandler {

try {
await database.applyBlock(this.block);
await database.saveBlock(this.block);

// Check if we recovered from a fork
if (state.forkedBlock && state.forkedBlock.data.height === this.block.data.height) {
Expand Down
25 changes: 25 additions & 0 deletions packages/core-database-postgres/src/postgres-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ export class PostgresConnection implements Database.IConnection {
}
}

public async saveBlocks(blocks: Interfaces.IBlock[]): Promise<void> {
try {
const queries = [this.blocksRepository.insert(blocks.map(block => block.data))];

for (const block of blocks) {
if (block.transactions.length > 0) {
queries.push(
this.transactionsRepository.insert(
block.transactions.map(tx => ({
...tx.data,
timestamp: tx.timestamp,
serialized: tx.serialized,
})),
),
);
}
}

await this.db.tx(t => t.batch(queries));
} catch (err) {
this.logger.error(err.message);
throw err;
}
}

/**
* Run all migrations.
* @return {void}
Expand Down
4 changes: 4 additions & 0 deletions packages/core-database/src/database-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ export class DatabaseService implements Database.IDatabaseService {
await this.connection.saveBlock(block);
}

public async saveBlocks(blocks: Interfaces.IBlock[]): Promise<void> {
await this.connection.saveBlocks(blocks);
}

public async saveRound(activeDelegates: Database.IDelegateWallet[]): Promise<void> {
this.logger.info(`Saving round ${activeDelegates[0].round.toLocaleString()}`);

Expand Down
6 changes: 3 additions & 3 deletions packages/core-interfaces/src/core-blockchain/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ export interface IBlockchain {
removeTopBlocks(count: any): Promise<void>;

/**
* Process the given block.
* Process the given blocks.
* NOTE: We should be sure this is fail safe (ie callback() is being called only ONCE)
* @param {Block} block
* @param {Block[]} block
* @param {Function} callback
* @return {(Function|void)}
*/
processBlock(block: Interfaces.IBlock, callback: any): Promise<any>;
processBlocks(blocks: Interfaces.IBlock[], callback: any): Promise<any>;

/**
* Called by forger to wake up and sync with the network.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export interface IConnection {

saveBlock(block: Interfaces.IBlock): Promise<void>;

saveBlocks(blocks: Interfaces.IBlock[]): Promise<void>;

deleteBlock(block: Interfaces.IBlock): Promise<void>;

enqueueDeleteBlock(block: Interfaces.IBlock): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export interface IDatabaseService {

saveBlock(block: Interfaces.IBlock): Promise<void>;

saveBlocks(blocks: Interfaces.IBlock[]): Promise<void>;

// TODO: These methods are exposing database terminology on the business layer, not a fan...

enqueueDeleteBlock(block: Interfaces.IBlock): void;
Expand Down

0 comments on commit ca74c46

Please sign in to comment.