Skip to content

Commit

Permalink
refactor: move startup logic to per-indexer container (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sekhmet authored Dec 9, 2024
1 parent f72cb58 commit 16f0aef
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 163 deletions.
192 changes: 29 additions & 163 deletions src/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { GqlEntityController } from './graphql/controller';
import { CheckpointRecord, CheckpointsStore, MetadataId } from './stores/checkpoints';
import { BaseIndexer, BlockNotFoundError, ReorgDetectedError } from './providers';
import { createLogger, Logger, LogLevel } from './utils/logger';
import { getConfigChecksum, getContractsFromConfig } from './utils/checkpoint';
import { getContractsFromConfig } from './utils/checkpoint';
import { extendSchema } from './utils/graphql';
import { createKnex } from './knex';
import { createPgPool } from './pg';
Expand All @@ -16,9 +16,9 @@ import { register } from './register';
import { sleep } from './utils/helpers';
import { ContractSourceConfig, CheckpointConfig, CheckpointOptions, TemplateSource } from './types';
import { getTableName } from './utils/database';
import { Container } from './container';

const INDEXER_NAME = 'default';
const SCHEMA_VERSION = 1;

const BLOCK_PRELOAD_START_RANGE = 1000;
const BLOCK_RELOAD_MIN_RANGE = 10;
Expand All @@ -36,6 +36,8 @@ export default class Checkpoint {
private readonly log: Logger;
private readonly indexer: BaseIndexer;

private container: Container;

private dbConnection: string;
private knex: Knex;
private pgPool?: PgPool;
Expand All @@ -44,7 +46,7 @@ export default class Checkpoint {
private preloadStep: number = BLOCK_PRELOAD_START_RANGE;
private preloadedBlocks: number[] = [];
private preloadEndBlock = 0;
private cpBlocksCache: number[] | null;
private cpBlocksCache: number[] | null = [];
private blockHashCache: { blockNumber: number; hash: string } | null = null;

constructor(
Expand Down Expand Up @@ -76,17 +78,6 @@ export default class Checkpoint {
: {})
});

this.indexer = indexer;
this.indexer.init({
instance: this,
log: this.log,
abis: opts?.abis
});

this.validateConfig();

this.cpBlocksCache = [];

const dbConnection = opts?.dbConnection || process.env.DATABASE_URL;
if (!dbConnection) {
throw new Error(
Expand All @@ -97,6 +88,26 @@ export default class Checkpoint {
this.knex = createKnex(dbConnection);
this.dbConnection = dbConnection;

this.indexer = indexer;
this.indexer.init({
instance: this,
log: this.log,
abis: opts?.abis
});

this.container = new Container(
INDEXER_NAME,
this.log,
this.knex,
this.store,
this.config,
this.indexer,
this.schema,
this.opts
);

this.container.validateConfig();

register.setKnex(this.knex);
}

Expand Down Expand Up @@ -160,7 +171,7 @@ export default class Checkpoint {
public async start() {
this.log.debug('starting');

await this.validateStore();
await this.container.validateStore();
await this.indexer.getProvider().init();

const templateSources = await this.store.getTemplateSources(INDEXER_NAME);
Expand All @@ -177,7 +188,7 @@ export default class Checkpoint {
)
);

const blockNum = await this.getStartBlockNum();
const blockNum = await this.container.getStartBlockNum();
this.preloadEndBlock =
(await this.indexer.getProvider().getLatestBlockNumber()) - BLOCK_PRELOAD_OFFSET;

Expand All @@ -200,12 +211,7 @@ export default class Checkpoint {
public async reset() {
this.log.debug('reset');

await this.store.createStore();
await this.store.setMetadata(INDEXER_NAME, MetadataId.LastIndexedBlock, 0);
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.store.removeBlocks(INDEXER_NAME);

await this.entityController.createEntityStores(this.knex);
await this.container.reset();
}

/**
Expand All @@ -216,8 +222,7 @@ export default class Checkpoint {
*
*/
public async resetMetadata() {
await this.store.resetStore();
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.container.resetMetadata();
}

private addSource(source: ContractSourceConfig) {
Expand Down Expand Up @@ -313,24 +318,6 @@ export default class Checkpoint {
};
}

private getConfigStartBlock() {
if (this.config.start && (this.config.tx_fn || this.config.global_events)) {
return this.config.start;
}

return Math.min(...(this.config.sources?.map(source => source.start) || []));
}

private async getStartBlockNum() {
const start = this.getConfigStartBlock();
const lastBlock =
(await this.store.getMetadataNumber(INDEXER_NAME, MetadataId.LastIndexedBlock)) ?? 0;

const nextBlock = lastBlock + 1;

return nextBlock > start ? nextBlock : start;
}

private async preload(blockNum: number) {
if (this.preloadedBlocks.length > 0) return this.preloadedBlocks.shift() as number;

Expand Down Expand Up @@ -507,125 +494,4 @@ export default class Checkpoint {
this.pgPool = createPgPool(this.dbConnection);
return this.pgPool;
}

private validateConfig() {
const sources = this.config.sources ?? [];
const templates = Object.values(this.config.templates ?? {});

const usedAbis = [
...sources.map(source => source.abi),
...templates.map(template => template.abi)
].filter(abi => abi) as string[];
const usedWriters = [
...sources.flatMap(source => source.events),
...templates.flatMap(template => template.events)
];

const missingAbis = usedAbis.filter(abi => !this.opts?.abis?.[abi]);
const missingWriters = usedWriters.filter(
writer => !this.indexer.getHandlers().includes(writer.fn)
);

if (missingAbis.length > 0) {
throw new Error(
`Following ABIs are used (${missingAbis.join(', ')}), but they are missing in opts.abis`
);
}

if (missingWriters.length > 0) {
throw new Error(
`Following writers are used (${missingWriters
.map(writer => writer.fn)
.join(', ')}), but they are not defined`
);
}
}

private async validateStore() {
const networkIdentifier = await this.indexer.getProvider().getNetworkIdentifier();
const configChecksum = getConfigChecksum(this.config);

const storedNetworkIdentifier = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.NetworkIdentifier
);
const storedStartBlock = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.StartBlock
);
const storedConfigChecksum = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.ConfigChecksum
);
const storedSchemaVersion = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.SchemaVersion
);
const hasNetworkChanged =
storedNetworkIdentifier && storedNetworkIdentifier !== networkIdentifier;
const hasStartBlockChanged =
storedStartBlock && storedStartBlock !== this.getConfigStartBlock();
const hasConfigChanged = storedConfigChecksum && storedConfigChecksum !== configChecksum;
const hasSchemaChanged = storedSchemaVersion !== SCHEMA_VERSION;

if (
(hasNetworkChanged || hasStartBlockChanged || hasConfigChanged || hasSchemaChanged) &&
this.opts?.resetOnConfigChange
) {
await this.resetMetadata();
await this.reset();

await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
await this.store.setMetadata(INDEXER_NAME, MetadataId.StartBlock, this.getConfigStartBlock());
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
} else if (hasNetworkChanged) {
this.log.error(
`network identifier changed from ${storedNetworkIdentifier} to ${networkIdentifier}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('network identifier changed');
} else if (hasStartBlockChanged) {
this.log.error(
`start block changed from ${storedStartBlock} to ${this.getConfigStartBlock()}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('start block changed');
} else if (hasConfigChanged) {
this.log.error(
`config checksum changed from ${storedConfigChecksum} to ${configChecksum} to due to a change in the config.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('config changed');
} else if (hasSchemaChanged) {
this.log.error(
`schema version changed from ${storedSchemaVersion} to ${SCHEMA_VERSION}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('schema changed');
} else {
if (!storedNetworkIdentifier) {
await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
}

if (!storedStartBlock) {
await this.store.setMetadata(
INDEXER_NAME,
MetadataId.StartBlock,
this.getConfigStartBlock()
);
}

if (!storedConfigChecksum) {
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
}
}
}
}
Loading

0 comments on commit 16f0aef

Please sign in to comment.