Skip to content

Commit

Permalink
feat: add indexer to metadata (#325)
Browse files Browse the repository at this point in the history
With support for multiple indexers in single Checkpoint instance
we have to accomodate for those in our database.
This commit makes necessary changes in our metadata store.

Currently checkpoint still supports single indexer, but now metadata can accomodate it.
  • Loading branch information
Sekhmet committed Dec 4, 2024
1 parent 508395b commit f72cb58
Show file tree
Hide file tree
Showing 4 changed files with 442 additions and 104 deletions.
67 changes: 43 additions & 24 deletions src/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { sleep } from './utils/helpers';
import { ContractSourceConfig, CheckpointConfig, CheckpointOptions, TemplateSource } from './types';
import { getTableName } from './utils/database';

const INDEXER_NAME = 'default';
const SCHEMA_VERSION = 1;

const BLOCK_PRELOAD_START_RANGE = 1000;
Expand Down Expand Up @@ -162,7 +163,7 @@ export default class Checkpoint {
await this.validateStore();
await this.indexer.getProvider().init();

const templateSources = await this.store.getTemplateSources();
const templateSources = await this.store.getTemplateSources(INDEXER_NAME);
await Promise.all(
templateSources.map(source =>
this.executeTemplate(
Expand Down Expand Up @@ -200,9 +201,9 @@ export default class Checkpoint {
this.log.debug('reset');

await this.store.createStore();
await this.store.setMetadata(MetadataId.LastIndexedBlock, 0);
await this.store.setMetadata(MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.store.removeBlocks();
await this.store.setMetadata(INDEXER_NAME, MetadataId.LastIndexedBlock, 0);
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.store.removeBlocks(INDEXER_NAME);

await this.entityController.createEntityStores(this.knex);
}
Expand All @@ -216,7 +217,7 @@ export default class Checkpoint {
*/
public async resetMetadata() {
await this.store.resetStore();
await this.store.setMetadata(MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
}

private addSource(source: ContractSourceConfig) {
Expand Down Expand Up @@ -249,7 +250,7 @@ export default class Checkpoint {
this.activeTemplates.push({ template: name, contractAddress: contract, startBlock: start });

if (persist) {
await this.store.insertTemplateSource(contract, start, name);
await this.store.insertTemplateSource(INDEXER_NAME, contract, start, name);
}

this.addSource({
Expand Down Expand Up @@ -287,21 +288,21 @@ export default class Checkpoint {
});
});

await this.store.insertCheckpoints(checkpoints);
await this.store.insertCheckpoints(INDEXER_NAME, checkpoints);
}

public async setBlockHash(blockNum: number, hash: string) {
this.blockHashCache = { blockNumber: blockNum, hash };

return this.store.setBlockHash(blockNum, hash);
return this.store.setBlockHash(INDEXER_NAME, blockNum, hash);
}

public async setLastIndexedBlock(block: number) {
await this.store.setMetadata(MetadataId.LastIndexedBlock, block);
await this.store.setMetadata(INDEXER_NAME, MetadataId.LastIndexedBlock, block);
}

public async insertCheckpoints(checkpoints: CheckpointRecord[]) {
await this.store.insertCheckpoints(checkpoints);
await this.store.insertCheckpoints(INDEXER_NAME, checkpoints);
}

public async getWriterParams(): Promise<{
Expand All @@ -322,7 +323,8 @@ export default class Checkpoint {

private async getStartBlockNum() {
const start = this.getConfigStartBlock();
const lastBlock = (await this.store.getMetadataNumber(MetadataId.LastIndexedBlock)) ?? 0;
const lastBlock =
(await this.store.getMetadataNumber(INDEXER_NAME, MetadataId.LastIndexedBlock)) ?? 0;

const nextBlock = lastBlock + 1;

Expand Down Expand Up @@ -426,7 +428,7 @@ export default class Checkpoint {
let current = blockNumber - 1;
let lastGoodBlock: null | number = null;
while (lastGoodBlock === null) {
const storedBlockHash = await this.store.getBlockHash(current);
const storedBlockHash = await this.store.getBlockHash(INDEXER_NAME, current);
const currentBlockHash = await this.indexer.getProvider().getBlockHash(current);

if (storedBlockHash === null || storedBlockHash === currentBlockHash) {
Expand All @@ -453,7 +455,7 @@ export default class Checkpoint {
});

// TODO: when we have full transaction support, we should include this in the transaction
await this.store.removeFutureData(lastGoodBlock);
await this.store.removeFutureData(INDEXER_NAME, lastGoodBlock);

this.cpBlocksCache = null;
this.blockHashCache = null;
Expand All @@ -469,6 +471,7 @@ export default class Checkpoint {
}

const checkpointBlocks = await this.store.getNextCheckpointBlocks(
INDEXER_NAME,
blockNum,
this.sourceContracts,
15
Expand All @@ -485,7 +488,7 @@ export default class Checkpoint {
return this.blockHashCache.hash;
}

return this.store.getBlockHash(blockNumber);
return this.store.getBlockHash(INDEXER_NAME, blockNumber);
}

private get store(): CheckpointsStore {
Expand Down Expand Up @@ -542,10 +545,22 @@ export default class Checkpoint {
const networkIdentifier = await this.indexer.getProvider().getNetworkIdentifier();
const configChecksum = getConfigChecksum(this.config);

const storedNetworkIdentifier = await this.store.getMetadata(MetadataId.NetworkIdentifier);
const storedStartBlock = await this.store.getMetadataNumber(MetadataId.StartBlock);
const storedConfigChecksum = await this.store.getMetadata(MetadataId.ConfigChecksum);
const storedSchemaVersion = await this.store.getMetadataNumber(MetadataId.SchemaVersion);
const storedNetworkIdentifier = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.NetworkIdentifier
);
const storedStartBlock = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.StartBlock
);
const storedConfigChecksum = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.ConfigChecksum
);
const storedSchemaVersion = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.SchemaVersion
);
const hasNetworkChanged =
storedNetworkIdentifier && storedNetworkIdentifier !== networkIdentifier;
const hasStartBlockChanged =
Expand All @@ -560,9 +575,9 @@ export default class Checkpoint {
await this.resetMetadata();
await this.reset();

await this.store.setMetadata(MetadataId.NetworkIdentifier, networkIdentifier);
await this.store.setMetadata(MetadataId.StartBlock, this.getConfigStartBlock());
await this.store.setMetadata(MetadataId.ConfigChecksum, configChecksum);
await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
await this.store.setMetadata(INDEXER_NAME, MetadataId.StartBlock, this.getConfigStartBlock());
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
} else if (hasNetworkChanged) {
this.log.error(
`network identifier changed from ${storedNetworkIdentifier} to ${networkIdentifier}.
Expand Down Expand Up @@ -597,15 +612,19 @@ export default class Checkpoint {
throw new Error('schema changed');
} else {
if (!storedNetworkIdentifier) {
await this.store.setMetadata(MetadataId.NetworkIdentifier, networkIdentifier);
await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
}

if (!storedStartBlock) {
await this.store.setMetadata(MetadataId.StartBlock, this.getConfigStartBlock());
await this.store.setMetadata(
INDEXER_NAME,
MetadataId.StartBlock,
this.getConfigStartBlock()
);
}

if (!storedConfigChecksum) {
await this.store.setMetadata(MetadataId.ConfigChecksum, configChecksum);
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
}
}
}
Expand Down
Loading

0 comments on commit f72cb58

Please sign in to comment.