Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch block database inserts when syncing #2500

Merged
merged 7 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions __tests__/integration/core-blockchain/blockchain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ describe("Blockchain", () => {
.getStruct();

const transferBlock = createBlock(forgerKeys, [transfer]);
await blockchain.processBlock(transferBlock, mockCallback);
await blockchain.processBlocks([transferBlock], mockCallback);

const wallet = blockchain.database.walletManager.findByPublicKey(keyPair.publicKey);
const walletForger = blockchain.database.walletManager.findByPublicKey(forgerKeys.publicKey);
Expand All @@ -215,7 +215,7 @@ describe("Blockchain", () => {
let nextForgerWallet = delegates.find(wallet => wallet.publicKey === nextForger.publicKey);

const voteBlock = createBlock(nextForgerWallet, [vote]);
await blockchain.processBlock(voteBlock, mockCallback);
await blockchain.processBlocks([voteBlock], mockCallback);

// Wallet paid a fee of 1 and the vote has been placed.
expect(wallet.balance).toEqual(Utils.BigNumber.make(124));
Expand All @@ -236,7 +236,7 @@ describe("Blockchain", () => {
nextForgerWallet = delegates.find(wallet => wallet.publicKey === nextForger.publicKey);

const unvoteBlock = createBlock(nextForgerWallet, [unvote]);
await blockchain.processBlock(unvoteBlock, mockCallback);
await blockchain.processBlocks([unvoteBlock], mockCallback);

// Wallet paid a fee of 1 and no longer voted a delegate
expect(wallet.balance).toEqual(Utils.BigNumber.make(123));
Expand Down Expand Up @@ -329,6 +329,6 @@ async function __addBlocks(untilHeight) {

for (let height = lastHeight + 1; height < untilHeight && height < 155; height++) {
const blockToProcess = Blocks.BlockFactory.fromData(allBlocks[height - 2]);
await blockchain.processBlock(blockToProcess, () => null);
await blockchain.processBlocks([blockToProcess], () => null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ describe("Transaction Guard", () => {
const blockVerified = Blocks.BlockFactory.fromData(block);
blockVerified.verification.verified = true;

await blockchain.processBlock(blockVerified, () => null);
await blockchain.processBlocks([blockVerified], () => null);
};

const forgedErrorMessage = id => ({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ describe("Apply transactions and block rewards to wallets on new block", () => {
const blockWithRewardVerified = Blocks.BlockFactory.fromData(blockWithReward);
blockWithRewardVerified.verification.verified = true;

await blockchain.processBlock(blockWithRewardVerified, () => null);
await blockchain.processBlocks([blockWithRewardVerified], () => null);

const delegateWallet = poolWalletManager.findByPublicKey(generatorPublicKey);

Expand Down
36 changes: 31 additions & 5 deletions __tests__/unit/core-blockchain/blockchain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { genesisBlock as GB } from "../../utils/config/testnet/genesisBlock";
import { blocks101to155 } from "../../utils/fixtures/testnet/blocks101to155";
import { blocks2to100 } from "../../utils/fixtures/testnet/blocks2to100";
import { config } from "./mocks/config";
import { database } from "./mocks/database";
import { logger } from "./mocks/logger";
import { getMonitor } from "./mocks/p2p/network-monitor";
import { getStorage } from "./mocks/p2p/peer-storage";
Expand Down Expand Up @@ -88,11 +89,11 @@ describe("Blockchain", () => {

const blocksToEnqueue = [blocks101to155[54]];
blockchain.enqueueBlocks(blocksToEnqueue);
expect(processQueuePush).toHaveBeenCalledWith(blocksToEnqueue);
expect(processQueuePush).toHaveBeenCalledWith({ blocks: blocksToEnqueue });
});
});

describe("processBlock", () => {
describe("processBlocks", () => {
const block3 = BlockFactory.fromData(blocks2to100[1]);
let getLastBlock;
let setLastBlock;
Expand All @@ -112,7 +113,7 @@ describe("Blockchain", () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};

await blockchain.processBlock(BlockFactory.fromData(blocks2to100[2]), mockCallback);
await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
Expand All @@ -122,13 +123,38 @@ describe("Blockchain", () => {
const mockCallback = jest.fn(() => true);
const lastBlock = blockchain.getLastBlock();

await blockchain.processBlock(lastBlock, mockCallback);
await blockchain.processBlocks([lastBlock], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
expect(blockchain.getLastBlock()).toEqual(lastBlock);
});

it("should process a new block with database saveBlocks failing once", async () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};
database.saveBlocks = jest.fn().mockRejectedValueOnce(new Error("oops"));
jest.spyOn(blockchain, "removeTopBlocks").mockReturnValueOnce(null);

await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
});

it("should process a new block with database saveBlocks + getLastBlock failing once", async () => {
const mockCallback = jest.fn(() => true);
blockchain.state.blockchain = {};
jest.spyOn(database, "saveBlocks").mockRejectedValueOnce(new Error("oops saveBlocks"));
jest.spyOn(database, "getLastBlock").mockRejectedValueOnce(new Error("oops getLastBlock"));
jest.spyOn(blockchain, "removeTopBlocks").mockReturnValueOnce(null);

await blockchain.processBlocks([BlockFactory.fromData(blocks2to100[2])], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
});

it("should broadcast a block if (Crypto.Slots.getSlotNumber() * blocktime <= block.data.timestamp)", async () => {
blockchain.state.started = true;

Expand All @@ -138,7 +164,7 @@ describe("Blockchain", () => {

const broadcastBlock = jest.spyOn(getMonitor, "broadcastBlock");

await blockchain.processBlock(lastBlock, mockCallback);
await blockchain.processBlocks([lastBlock], mockCallback);
await delay(200);

expect(mockCallback.mock.calls.length).toBe(1);
Expand Down
3 changes: 2 additions & 1 deletion __tests__/unit/core-blockchain/mocks/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ export const database = {
commitQueuedQueries: () => null,
buildWallets: () => null,
saveWallets: () => null,
getLastBlock: () => null,
getLastBlock: async () => ({ data: { height: 1 } }),
saveBlock: () => null,
saveBlocks: () => null,
verifyBlockchain: () => true,
deleteRound: () => null,
applyRound: () => null,
Expand Down
2 changes: 1 addition & 1 deletion __tests__/unit/core-blockchain/state-machine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ describe("State Machine", () => {
databaseMocks = {
getLastBlock: jest
.spyOn(blockchain.database, "getLastBlock")
.mockReturnValue(BlockFactory.fromData(genesisBlock)),
.mockResolvedValue(BlockFactory.fromData(genesisBlock)),
// @ts-ignore
saveBlock: jest.spyOn(blockchain.database, "saveBlock").mockReturnValue(true),
verifyBlockchain: jest.spyOn(blockchain.database, "verifyBlockchain").mockReturnValue(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ export class DatabaseConnectionStub implements Database.IConnection {
public saveBlock(block: Blocks.Block): Promise<any> {
return undefined;
}

public saveBlocks(blocks: Blocks.Block[]): Promise<any> {
return undefined;
}
}
58 changes: 48 additions & 10 deletions packages/core-blockchain/src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ export class Blockchain implements blockchain.IBlockchain {
this.actions = stateMachine.actionMap(this);
this.blockProcessor = new BlockProcessor(this);

this.queue = async.queue((block: Interfaces.IBlockData, cb) => {
this.queue = async.queue((blockList: { blocks: Interfaces.IBlockData[] }, cb) => {
try {
return this.processBlock(Blocks.BlockFactory.fromData(block), cb);
return this.processBlocks(blockList.blocks.map(b => Blocks.BlockFactory.fromData(b)), cb);
} catch (error) {
logger.error(`Failed to process block in queue: ${block.height.toLocaleString()}`);
logger.error(
`Failed to process ${blockList.blocks.length} blocks from height ${
blockList.blocks[0].height
} in queue.`,
);
logger.error(error.stack);
return cb();
}
Expand Down Expand Up @@ -264,7 +268,7 @@ export class Blockchain implements blockchain.IBlockchain {
return;
}

this.queue.push(blocks);
this.queue.push({ blocks });
this.state.lastDownloadedBlock = BlockFactory.fromData(blocks.slice(-1)[0]);
}

Expand Down Expand Up @@ -361,18 +365,52 @@ export class Blockchain implements blockchain.IBlockchain {
/**
* Process the given block.
*/
public async processBlock(block: Interfaces.IBlock, callback): Promise<any> {
const result: BlockProcessorResult = await this.blockProcessor.process(block);
public async processBlocks(blocks: Interfaces.IBlock[], callback): Promise<any> {
const acceptedBlocks: Interfaces.IBlock[] = [];
let lastProcessResult: BlockProcessorResult;
for (const block of blocks) {
lastProcessResult = await this.blockProcessor.process(block);

if (lastProcessResult === BlockProcessorResult.Accepted) {
acceptedBlocks.push(block);
}
}

if (result === BlockProcessorResult.Accepted || result === BlockProcessorResult.DiscardedButCanBeBroadcasted) {
if (
lastProcessResult === BlockProcessorResult.Accepted ||
lastProcessResult === BlockProcessorResult.DiscardedButCanBeBroadcasted
) {
// broadcast only current block
const blocktime: number = config.getMilestone(block.data.height).blocktime;
const currentBlock = blocks[blocks.length - 1];
const blocktime: number = config.getMilestone(currentBlock.data.height).blocktime;

if (this.state.started && Crypto.Slots.getSlotNumber() * blocktime <= block.data.timestamp) {
this.p2p.getMonitor().broadcastBlock(block);
if (this.state.started && Crypto.Slots.getSlotNumber() * blocktime <= currentBlock.data.timestamp) {
this.p2p.getMonitor().broadcastBlock(currentBlock);
}
}

if (acceptedBlocks.length === 0) {
return callback();
}

try {
await this.database.saveBlocks(acceptedBlocks);
} catch (exceptionSaveBlocks) {
logger.error(`Could not save ${acceptedBlocks.length} blocks to database : ${exceptionSaveBlocks.stack}`);

const resetToHeight = async height => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const resetToHeight = async height => {
const resetToHeight = async height => {
try {
return await this.removeTopBlocks(
app
.resolvePlugin<State.IStateService>("state")
.getStore()
.getLastHeight() - height,
);
} catch (e) {
logger.error(`Could not remove top blocks from database : ${e.stack}`);
return resetToHeight(height); // keep trying, we can't do anything while this fails
}
};

try {
return await this.removeTopBlocks((await this.database.getLastBlock()).data.height - height);
} catch (e) {
logger.error(`Could not remove top blocks from database : ${e.stack}`);
return resetToHeight(height); // keep trying, we can't do anything while this fails
}
};
await resetToHeight(acceptedBlocks[0].data.height - 1);

return this.processBlocks(blocks, callback); // keep trying, we can't do anything while this fails
}

return callback();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ export class AcceptBlockHandler extends BlockHandler {

try {
await database.applyBlock(this.block);
await database.saveBlock(this.block);

// Check if we recovered from a fork
if (state.forkedBlock && state.forkedBlock.data.height === this.block.data.height) {
Expand Down
25 changes: 25 additions & 0 deletions packages/core-database-postgres/src/postgres-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,31 @@ export class PostgresConnection implements Database.IConnection {
}
}

public async saveBlocks(blocks: Interfaces.IBlock[]): Promise<void> {
try {
const queries = [this.blocksRepository.insert(blocks.map(block => block.data))];

for (const block of blocks) {
if (block.transactions.length > 0) {
queries.push(
this.transactionsRepository.insert(
block.transactions.map(tx => ({
...tx.data,
timestamp: tx.timestamp,
serialized: tx.serialized,
})),
),
);
}
}

await this.db.tx(t => t.batch(queries));
} catch (err) {
this.logger.error(err.message);
throw err;
}
}

/**
* Run all migrations.
* @return {void}
Expand Down
4 changes: 4 additions & 0 deletions packages/core-database/src/database-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ export class DatabaseService implements Database.IDatabaseService {
await this.connection.saveBlock(block);
}

public async saveBlocks(blocks: Interfaces.IBlock[]): Promise<void> {
await this.connection.saveBlocks(blocks);
}

public async saveRound(activeDelegates: Database.IDelegateWallet[]): Promise<void> {
this.logger.info(`Saving round ${activeDelegates[0].round.toLocaleString()}`);

Expand Down
6 changes: 3 additions & 3 deletions packages/core-interfaces/src/core-blockchain/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ export interface IBlockchain {
removeTopBlocks(count: any): Promise<void>;

/**
* Process the given block.
* Process the given blocks.
* NOTE: We should be sure this is fail safe (ie callback() is being called only ONCE)
* @param {Block} block
* @param {Block[]} block
* @param {Function} callback
* @return {(Function|void)}
*/
processBlock(block: Interfaces.IBlock, callback: any): Promise<any>;
processBlocks(blocks: Interfaces.IBlock[], callback: any): Promise<any>;

/**
* Called by forger to wake up and sync with the network.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export interface IConnection {

saveBlock(block: Interfaces.IBlock): Promise<void>;

saveBlocks(blocks: Interfaces.IBlock[]): Promise<void>;

deleteBlock(block: Interfaces.IBlock): Promise<void>;

enqueueDeleteBlock(block: Interfaces.IBlock): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export interface IDatabaseService {

saveBlock(block: Interfaces.IBlock): Promise<void>;

saveBlocks(blocks: Interfaces.IBlock[]): Promise<void>;

// TODO: These methods are exposing database terminology on the business layer, not a fan...

enqueueDeleteBlock(block: Interfaces.IBlock): void;
Expand Down