Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move startup logic to per-indexer container #327

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 29 additions & 163 deletions src/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { GqlEntityController } from './graphql/controller';
import { CheckpointRecord, CheckpointsStore, MetadataId } from './stores/checkpoints';
import { BaseIndexer, BlockNotFoundError, ReorgDetectedError } from './providers';
import { createLogger, Logger, LogLevel } from './utils/logger';
import { getConfigChecksum, getContractsFromConfig } from './utils/checkpoint';
import { getContractsFromConfig } from './utils/checkpoint';
import { extendSchema } from './utils/graphql';
import { createKnex } from './knex';
import { createPgPool } from './pg';
Expand All @@ -16,9 +16,9 @@ import { register } from './register';
import { sleep } from './utils/helpers';
import { ContractSourceConfig, CheckpointConfig, CheckpointOptions, TemplateSource } from './types';
import { getTableName } from './utils/database';
import { Container } from './container';

const INDEXER_NAME = 'default';
const SCHEMA_VERSION = 1;

const BLOCK_PRELOAD_START_RANGE = 1000;
const BLOCK_RELOAD_MIN_RANGE = 10;
Expand All @@ -36,6 +36,8 @@ export default class Checkpoint {
private readonly log: Logger;
private readonly indexer: BaseIndexer;

private container: Container;

private dbConnection: string;
private knex: Knex;
private pgPool?: PgPool;
Expand All @@ -44,7 +46,7 @@ export default class Checkpoint {
private preloadStep: number = BLOCK_PRELOAD_START_RANGE;
private preloadedBlocks: number[] = [];
private preloadEndBlock = 0;
private cpBlocksCache: number[] | null;
private cpBlocksCache: number[] | null = [];
private blockHashCache: { blockNumber: number; hash: string } | null = null;

constructor(
Expand Down Expand Up @@ -76,17 +78,6 @@ export default class Checkpoint {
: {})
});

this.indexer = indexer;
this.indexer.init({
instance: this,
log: this.log,
abis: opts?.abis
});

this.validateConfig();

this.cpBlocksCache = [];

const dbConnection = opts?.dbConnection || process.env.DATABASE_URL;
if (!dbConnection) {
throw new Error(
Expand All @@ -97,6 +88,26 @@ export default class Checkpoint {
this.knex = createKnex(dbConnection);
this.dbConnection = dbConnection;

this.indexer = indexer;
this.indexer.init({
instance: this,
log: this.log,
abis: opts?.abis
});

this.container = new Container(
INDEXER_NAME,
this.log,
this.knex,
this.store,
this.config,
this.indexer,
this.schema,
this.opts
);

this.container.validateConfig();

register.setKnex(this.knex);
}

Expand Down Expand Up @@ -160,7 +171,7 @@ export default class Checkpoint {
public async start() {
this.log.debug('starting');

await this.validateStore();
await this.container.validateStore();
await this.indexer.getProvider().init();

const templateSources = await this.store.getTemplateSources(INDEXER_NAME);
Expand All @@ -177,7 +188,7 @@ export default class Checkpoint {
)
);

const blockNum = await this.getStartBlockNum();
const blockNum = await this.container.getStartBlockNum();
this.preloadEndBlock =
(await this.indexer.getProvider().getLatestBlockNumber()) - BLOCK_PRELOAD_OFFSET;

Expand All @@ -200,12 +211,7 @@ export default class Checkpoint {
public async reset() {
this.log.debug('reset');

await this.store.createStore();
await this.store.setMetadata(INDEXER_NAME, MetadataId.LastIndexedBlock, 0);
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.store.removeBlocks(INDEXER_NAME);

await this.entityController.createEntityStores(this.knex);
await this.container.reset();
}

/**
Expand All @@ -216,8 +222,7 @@ export default class Checkpoint {
*
*/
public async resetMetadata() {
await this.store.resetStore();
await this.store.setMetadata(INDEXER_NAME, MetadataId.SchemaVersion, SCHEMA_VERSION);
await this.container.resetMetadata();
}

private addSource(source: ContractSourceConfig) {
Expand Down Expand Up @@ -313,24 +318,6 @@ export default class Checkpoint {
};
}

private getConfigStartBlock() {
if (this.config.start && (this.config.tx_fn || this.config.global_events)) {
return this.config.start;
}

return Math.min(...(this.config.sources?.map(source => source.start) || []));
}

private async getStartBlockNum() {
const start = this.getConfigStartBlock();
const lastBlock =
(await this.store.getMetadataNumber(INDEXER_NAME, MetadataId.LastIndexedBlock)) ?? 0;

const nextBlock = lastBlock + 1;

return nextBlock > start ? nextBlock : start;
}

private async preload(blockNum: number) {
if (this.preloadedBlocks.length > 0) return this.preloadedBlocks.shift() as number;

Expand Down Expand Up @@ -507,125 +494,4 @@ export default class Checkpoint {
this.pgPool = createPgPool(this.dbConnection);
return this.pgPool;
}

private validateConfig() {
const sources = this.config.sources ?? [];
const templates = Object.values(this.config.templates ?? {});

const usedAbis = [
...sources.map(source => source.abi),
...templates.map(template => template.abi)
].filter(abi => abi) as string[];
const usedWriters = [
...sources.flatMap(source => source.events),
...templates.flatMap(template => template.events)
];

const missingAbis = usedAbis.filter(abi => !this.opts?.abis?.[abi]);
const missingWriters = usedWriters.filter(
writer => !this.indexer.getHandlers().includes(writer.fn)
);

if (missingAbis.length > 0) {
throw new Error(
`Following ABIs are used (${missingAbis.join(', ')}), but they are missing in opts.abis`
);
}

if (missingWriters.length > 0) {
throw new Error(
`Following writers are used (${missingWriters
.map(writer => writer.fn)
.join(', ')}), but they are not defined`
);
}
}

private async validateStore() {
const networkIdentifier = await this.indexer.getProvider().getNetworkIdentifier();
const configChecksum = getConfigChecksum(this.config);

const storedNetworkIdentifier = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.NetworkIdentifier
);
const storedStartBlock = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.StartBlock
);
const storedConfigChecksum = await this.store.getMetadata(
INDEXER_NAME,
MetadataId.ConfigChecksum
);
const storedSchemaVersion = await this.store.getMetadataNumber(
INDEXER_NAME,
MetadataId.SchemaVersion
);
const hasNetworkChanged =
storedNetworkIdentifier && storedNetworkIdentifier !== networkIdentifier;
const hasStartBlockChanged =
storedStartBlock && storedStartBlock !== this.getConfigStartBlock();
const hasConfigChanged = storedConfigChecksum && storedConfigChecksum !== configChecksum;
const hasSchemaChanged = storedSchemaVersion !== SCHEMA_VERSION;

if (
(hasNetworkChanged || hasStartBlockChanged || hasConfigChanged || hasSchemaChanged) &&
this.opts?.resetOnConfigChange
) {
await this.resetMetadata();
await this.reset();

await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
await this.store.setMetadata(INDEXER_NAME, MetadataId.StartBlock, this.getConfigStartBlock());
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
} else if (hasNetworkChanged) {
this.log.error(
`network identifier changed from ${storedNetworkIdentifier} to ${networkIdentifier}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('network identifier changed');
} else if (hasStartBlockChanged) {
this.log.error(
`start block changed from ${storedStartBlock} to ${this.getConfigStartBlock()}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('start block changed');
} else if (hasConfigChanged) {
this.log.error(
`config checksum changed from ${storedConfigChecksum} to ${configChecksum} to due to a change in the config.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('config changed');
} else if (hasSchemaChanged) {
this.log.error(
`schema version changed from ${storedSchemaVersion} to ${SCHEMA_VERSION}.
You probably should reset the database by calling .reset() and resetMetadata().
You can also set resetOnConfigChange to true in Checkpoint options to do this automatically.`
);

throw new Error('schema changed');
} else {
if (!storedNetworkIdentifier) {
await this.store.setMetadata(INDEXER_NAME, MetadataId.NetworkIdentifier, networkIdentifier);
}

if (!storedStartBlock) {
await this.store.setMetadata(
INDEXER_NAME,
MetadataId.StartBlock,
this.getConfigStartBlock()
);
}

if (!storedConfigChecksum) {
await this.store.setMetadata(INDEXER_NAME, MetadataId.ConfigChecksum, configChecksum);
}
}
}
}
Loading