Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #7157 from LiskHQ/7124-update_event_queue_and_def
Browse files Browse the repository at this point in the history
Update event definition, event queue and block header - Closes #7124 and #7125
  • Loading branch information
shuse2 authored May 13, 2022
2 parents 2bc5416 + 336f3b6 commit 987082c
Show file tree
Hide file tree
Showing 48 changed files with 805 additions and 123 deletions.
8 changes: 6 additions & 2 deletions elements/lisk-api-client/test/unit/block.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('block', () => {
let block: Block;
const sampleHeight = 1;
const encodedBlock =
'0ac001080110c4d23d18c4d23d22144a462ea57a8c9f72d866c09770e5ec70cef187272a14be63fb1c0426573352556f18b21efd5b6183c39c3214b27ca21f40d44113c2090ca8f05fb706c54e87dd3a14b27ca21f40d44113c2090ca8f05fb706c54e87dd42147f9d96a09a3fd17f3478eb7bef3a8bda00e1238b489c8c3d509c8c3d5a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8556206080012001a006a146da88e2fd4435e26e02682435f108002ccc3ddd5';
'0ae201080110c4d23d18c4d23d22144a462ea57a8c9f72d866c09770e5ec70cef187272a14be63fb1c0426573352556f18b21efd5b6183c39c3214b27ca21f40d44113c2090ca8f05fb706c54e87dd3a14b27ca21f40d44113c2090ca8f05fb706c54e87dd422030dda4fbc395828e5a9f2f8824771e434fce4945a1e7820012440d09dd1e2b6d4a147f9d96a09a3fd17f3478eb7bef3a8bda00e1238b509c8c3d589c8c3d6220e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8556a06080012001a0072146da88e2fd4435e26e02682435f108002ccc3ddd5';
const encodedBlockBuffer = Buffer.from(encodedBlock, 'hex');
const sampleBlock = {
header: {
Expand All @@ -35,6 +35,10 @@ describe('block', () => {
stateRoot: Buffer.from('7f9d96a09a3fd17f3478eb7bef3a8bda00e1238b', 'hex'),
transactionRoot: Buffer.from('b27ca21f40d44113c2090ca8f05fb706c54e87dd', 'hex'),
assetsRoot: Buffer.from('b27ca21f40d44113c2090ca8f05fb706c54e87dd', 'hex'),
eventRoot: Buffer.from(
'30dda4fbc395828e5a9f2f8824771e434fce4945a1e7820012440d09dd1e2b6d',
'hex',
),
generatorAddress: Buffer.from('be63fb1c0426573352556f18b21efd5b6183c39c', 'hex'),
maxHeightPrevoted: 1000988,
maxHeightGenerated: 1000988,
Expand All @@ -45,7 +49,7 @@ describe('block', () => {
certificateSignature: Buffer.alloc(0),
},
signature: Buffer.from('6da88e2fd4435e26e02682435f108002ccc3ddd5', 'hex'),
id: Buffer.from('097ce5adc1a34680d6c939287011dec9b70a3bc1f5f896a3f9024fc9bed59992', 'hex'),
id: Buffer.from('f14104e384546adaba487af56e658188eea07bb534a61b4cbb9ccaee54139b8c', 'hex'),
},
assets: [],
transactions: [],
Expand Down
17 changes: 17 additions & 0 deletions elements/lisk-chain/src/block_header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export interface BlockHeaderAttrs {
readonly stateRoot?: Buffer;
readonly transactionRoot?: Buffer;
readonly assetsRoot?: Buffer;
readonly eventRoot?: Buffer;
signature?: Buffer;
id?: Buffer;
}
Expand All @@ -59,6 +60,7 @@ export class BlockHeader {
private _stateRoot?: Buffer;
private _transactionRoot?: Buffer;
private _assetsRoot?: Buffer;
private _eventRoot?: Buffer;
private _signature?: Buffer;
private _id?: Buffer;

Expand All @@ -73,6 +75,7 @@ export class BlockHeader {
aggregateCommit,
validatorsHash,
stateRoot,
eventRoot,
assetsRoot,
transactionRoot,
signature,
Expand All @@ -87,6 +90,7 @@ export class BlockHeader {
this.maxHeightGenerated = maxHeightGenerated;
this._aggregateCommit = aggregateCommit;
this._validatorsHash = validatorsHash;
this._eventRoot = eventRoot;
this._stateRoot = stateRoot;
this._transactionRoot = transactionRoot;
this._assetsRoot = assetsRoot;
Expand All @@ -112,6 +116,15 @@ export class BlockHeader {
this._resetComputedValues();
}

public get eventRoot() {
return this._eventRoot;
}

public set eventRoot(val) {
this._eventRoot = val;
this._resetComputedValues();
}

public get assetsRoot() {
return this._assetsRoot;
}
Expand Down Expand Up @@ -335,6 +348,9 @@ export class BlockHeader {
if (!this.assetsRoot) {
throw new Error('Asset root is empty.');
}
if (!this.eventRoot) {
throw new Error('Event root is empty.');
}
if (!this.stateRoot) {
throw new Error('State root is empty.');
}
Expand All @@ -351,6 +367,7 @@ export class BlockHeader {
previousBlockID: this.previousBlockID,
stateRoot: this.stateRoot,
assetsRoot: this.assetsRoot,
eventRoot: this.eventRoot,
transactionRoot: this.transactionRoot,
validatorsHash: this.validatorsHash,
aggregateCommit: this.aggregateCommit,
Expand Down
10 changes: 9 additions & 1 deletion elements/lisk-chain/src/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { KVStore, NotFoundError } from '@liskhq/lisk-db';
import * as createDebug from 'debug';
import { regularMerkleTree } from '@liskhq/lisk-tree';
import {
DEFAULT_KEEP_EVENTS_FOR_HEIGHTS,
DEFAULT_MAX_BLOCK_HEADER_CACHE,
DEFAULT_MIN_BLOCK_HEADER_CACHE,
GENESIS_BLOCK_VERSION,
Expand All @@ -29,11 +30,13 @@ import {
stateDiffSchema,
} from './schema';
import { Block } from './block';
import { Event } from './event';
import { BlockHeader } from './block_header';
import { CurrentState } from './state_store/smt_store';

interface ChainConstructor {
// Constants
readonly keepEventsForHeights: number;
readonly maxTransactionsSize: number;
readonly minBlockHeaderCache?: number;
readonly maxBlockHeaderCache?: number;
Expand All @@ -58,6 +61,7 @@ export class Chain {
readonly maxTransactionsSize: number;
readonly minBlockHeaderCache: number;
readonly maxBlockHeaderCache: number;
readonly keepEventsForHeights: number;
};

private _lastBlock?: Block;
Expand All @@ -68,6 +72,7 @@ export class Chain {
public constructor({
// Constants
maxTransactionsSize,
keepEventsForHeights = DEFAULT_KEEP_EVENTS_FOR_HEIGHTS,
minBlockHeaderCache = DEFAULT_MIN_BLOCK_HEADER_CACHE,
maxBlockHeaderCache = DEFAULT_MAX_BLOCK_HEADER_CACHE,
}: ChainConstructor) {
Expand All @@ -81,6 +86,7 @@ export class Chain {
maxTransactionsSize,
maxBlockHeaderCache,
minBlockHeaderCache,
keepEventsForHeights,
};
}

Expand Down Expand Up @@ -112,6 +118,7 @@ export class Chain {
db: args.db,
minBlockHeaderCache: this.constants.minBlockHeaderCache,
maxBlockHeaderCache: this.constants.maxBlockHeaderCache,
keepEventsForHeights: this.constants.keepEventsForHeights,
});
}

Expand Down Expand Up @@ -191,13 +198,14 @@ export class Chain {

public async saveBlock(
block: Block,
events: Event[],
state: CurrentState,
finalizedHeight: number,
{ removeFromTempTable } = {
removeFromTempTable: false,
},
): Promise<void> {
await this.dataAccess.saveBlock(block, state, finalizedHeight, removeFromTempTable);
await this.dataAccess.saveBlock(block, events, state, finalizedHeight, removeFromTempTable);
this.dataAccess.addBlockHeader(block.header);
this._finalizedHeight = finalizedHeight;
this._lastBlock = block;
Expand Down
14 changes: 14 additions & 0 deletions elements/lisk-chain/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import { createMessageTag, hash } from '@liskhq/lisk-cryptography';

export const DEFAULT_KEEP_EVENTS_FOR_HEIGHTS = 300;
export const DEFAULT_MIN_BLOCK_HEADER_CACHE = 309;
export const DEFAULT_MAX_BLOCK_HEADER_CACHE = 515;

Expand All @@ -34,3 +35,16 @@ export const MAX_ASSET_DATA_SIZE_BYTES = 64;
export const SIGNATURE_LENGTH_BYTES = 64;

export const SMT_PREFIX_SIZE = 6;

export const EVENT_TOPIC_HASH_LENGTH_BYTES = 8;
export const EVENT_INDEX_LENGTH_BITS = 30;
export const EVENT_TOPIC_INDEX_LENGTH_BITS = 2;
export const EVENT_MAX_EVENT_SIZE_BYTES = 1024;

export const EVENT_TOTAL_INDEX_LENGTH_BYTES =
(EVENT_INDEX_LENGTH_BITS + EVENT_TOPIC_INDEX_LENGTH_BITS) / 8;

export const EVENT_MAX_TOPICS_PER_EVENT = 2 ** EVENT_TOPIC_INDEX_LENGTH_BITS;

export const MAX_EVENTS_PER_BLOCK = 2 ** EVENT_INDEX_LENGTH_BITS;
export const EVENT_ID_LENGTH_BYTES = 4 + EVENT_TOTAL_INDEX_LENGTH_BYTES;
20 changes: 18 additions & 2 deletions elements/lisk-chain/src/data_access/data_access.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { Transaction } from '../transaction';
import { RawBlock } from '../types';
import { BlockHeader } from '../block_header';
import { Block } from '../block';
import { Event } from '../event';

import { BlockCache } from './cache';
import { Storage as StorageAccess } from './storage';
Expand All @@ -27,14 +28,20 @@ interface DAConstructor {
readonly db: KVStore;
readonly minBlockHeaderCache: number;
readonly maxBlockHeaderCache: number;
readonly keepEventsForHeights: number;
}

export class DataAccess {
private readonly _storage: StorageAccess;
private readonly _blocksCache: BlockCache;

public constructor({ db, minBlockHeaderCache, maxBlockHeaderCache }: DAConstructor) {
this._storage = new StorageAccess(db);
public constructor({
db,
minBlockHeaderCache,
maxBlockHeaderCache,
keepEventsForHeights,
}: DAConstructor) {
this._storage = new StorageAccess(db, { keepEventsForHeights });
this._blocksCache = new BlockCache(minBlockHeaderCache, maxBlockHeaderCache);
}

Expand Down Expand Up @@ -230,6 +237,12 @@ export class DataAccess {
return this._decodeRawBlock(block);
}

public async getEvents(height: number): Promise<Event[]> {
const events = await this._storage.getEvents(height);

return events;
}

public async isBlockPersisted(blockId: Buffer): Promise<boolean> {
const isPersisted = await this._storage.isBlockPersisted(blockId);

Expand Down Expand Up @@ -283,6 +296,7 @@ export class DataAccess {
*/
public async saveBlock(
block: Block,
events: Event[],
state: CurrentState,
finalizedHeight: number,
removeFromTemp = false,
Expand All @@ -296,12 +310,14 @@ export class DataAccess {
const encodedTx = tx.getBytes();
encodedTransactions.push({ id: txID, value: encodedTx });
}
const encodedEvents = events.map(e => e.getBytes());
await this._storage.saveBlock(
blockID,
height,
finalizedHeight,
encodedHeader,
encodedTransactions,
encodedEvents,
block.assets.getBytes(),
state,
removeFromTemp,
Expand Down
51 changes: 45 additions & 6 deletions elements/lisk-chain/src/data_access/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { KVStore, formatInt, getFirstPrefix, getLastPrefix, NotFoundError } from
import { codec } from '@liskhq/lisk-codec';
import { hash } from '@liskhq/lisk-cryptography';
import { RawBlock, StateDiff } from '../types';

import { Event } from '../event';
import {
DB_KEY_BLOCKS_ID,
DB_KEY_BLOCKS_HEIGHT,
Expand All @@ -25,11 +25,13 @@ import {
DB_KEY_DIFF_STATE,
DB_KEY_FINALIZED_HEIGHT,
DB_KEY_BLOCK_ASSETS_BLOCK_ID,
DB_KEY_BLOCK_EVENTS,
} from '../db_keys';
import { concatDBKeys } from '../utils';
import { stateDiffSchema } from '../schema';
import { CurrentState } from '../state_store';
import { toSMTKey } from '../state_store/utils';
import { DEFAULT_KEEP_EVENTS_FOR_HEIGHTS } from '../constants';

const bytesArraySchema = {
$id: 'lisk-chain/bytesarray',
Expand All @@ -55,13 +57,20 @@ const decodeByteArray = (val: Buffer): Buffer[] => {
return decoded.list;
};

const encodeByteArray = (val: Buffer[]): Buffer => codec.encode(bytesArraySchema, { list: val });
export const encodeByteArray = (val: Buffer[]): Buffer =>
codec.encode(bytesArraySchema, { list: val });

interface StorageOption {
keepEventsForHeights?: number;
}

export class Storage {
private readonly _db: KVStore;
private readonly _keepEventsForHeights: number;

public constructor(db: KVStore) {
public constructor(db: KVStore, options?: StorageOption) {
this._db = db;
this._keepEventsForHeights = options?.keepEventsForHeights ?? DEFAULT_KEEP_EVENTS_FOR_HEIGHTS;
}

/*
Expand Down Expand Up @@ -235,6 +244,13 @@ export class Storage {
};
}

public async getEvents(height: number): Promise<Event[]> {
const eventsByte = await this._db.get(concatDBKeys(DB_KEY_BLOCK_EVENTS, formatInt(height)));
const events = decodeByteArray(eventsByte);

return events.map(e => Event.fromBytes(e));
}

public async getTempBlocks(): Promise<Buffer[]> {
const stream = this._db.createReadStream({
gte: getFirstPrefix(DB_KEY_TEMPBLOCKS_HEIGHT),
Expand Down Expand Up @@ -338,6 +354,7 @@ export class Storage {
finalizedHeight: number,
header: Buffer,
transactions: { id: Buffer; value: Buffer }[],
events: Buffer[],
assets: Buffer[],
state: CurrentState,
removeFromTemp = false,
Expand All @@ -354,6 +371,10 @@ export class Storage {
}
batch.put(concatDBKeys(DB_KEY_TRANSACTIONS_BLOCK_ID, id), Buffer.concat(ids));
}
if (events.length > 0) {
const encodedEvents = encodeByteArray(events);
batch.put(concatDBKeys(DB_KEY_BLOCK_EVENTS, heightBuf), encodedEvents);
}
if (assets.length > 0) {
const encodedAsset = encodeByteArray(assets);
batch.put(concatDBKeys(DB_KEY_BLOCK_ASSETS_BLOCK_ID, id), encodedAsset);
Expand All @@ -369,7 +390,7 @@ export class Storage {
batch.put(DB_KEY_FINALIZED_HEIGHT, finalizedHeightBytes);

await batch.write();
await this._cleanUntil(finalizedHeight);
await this._cleanUntil(height, finalizedHeight);
}

public async deleteBlock(
Expand All @@ -391,6 +412,7 @@ export class Storage {
}
batch.del(concatDBKeys(DB_KEY_TRANSACTIONS_BLOCK_ID, id));
}
batch.del(concatDBKeys(DB_KEY_BLOCK_EVENTS, heightBuf));
if (assets.length > 0) {
batch.del(concatDBKeys(DB_KEY_BLOCK_ASSETS_BLOCK_ID, id));
}
Expand Down Expand Up @@ -438,11 +460,28 @@ export class Storage {
}

// This function is out of batch, but even if it fails, it will run again next time
private async _cleanUntil(height: number): Promise<void> {
private async _cleanUntil(currentHeight: number, finalizedHeight: number): Promise<void> {
await this._db.clear({
gte: concatDBKeys(DB_KEY_DIFF_STATE, formatInt(0)),
lt: concatDBKeys(DB_KEY_DIFF_STATE, formatInt(height)),
lt: concatDBKeys(DB_KEY_DIFF_STATE, formatInt(finalizedHeight)),
});

// Delete outdated events if keepEventsForHeights is positive.
if (this._keepEventsForHeights > -1) {
// events are removed only if finalized and below height - keepEventsForHeights
const minEventDeleteHeight = Math.min(
finalizedHeight,
Math.max(0, currentHeight - this._keepEventsForHeights),
);
if (minEventDeleteHeight > 0) {
const endHeight = Buffer.alloc(4);
endHeight.writeUInt32BE(minEventDeleteHeight - 1, 0);
await this._db.clear({
gte: Buffer.concat([DB_KEY_BLOCK_EVENTS, Buffer.alloc(4, 0)]),
lte: Buffer.concat([DB_KEY_BLOCK_EVENTS, endHeight]),
});
}
}
}

private async _getBlockAssets(blockID: Buffer): Promise<Buffer[]> {
Expand Down
1 change: 1 addition & 0 deletions elements/lisk-chain/src/db_keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const DB_KEY_TRANSACTIONS_BLOCK_ID = Buffer.from([5]);
export const DB_KEY_TRANSACTIONS_ID = Buffer.from([6]);
export const DB_KEY_TEMPBLOCKS_HEIGHT = Buffer.from([7]);
export const DB_KEY_BLOCK_ASSETS_BLOCK_ID = Buffer.from([8]);
export const DB_KEY_BLOCK_EVENTS = Buffer.from([8]);

export const DB_KEY_STATE_STORE = Buffer.from([10]);

Expand Down
Loading

0 comments on commit 987082c

Please sign in to comment.