From 44bc2ebd6e4afa187419757f2c8df6c9ef77cb93 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 9 Aug 2024 14:12:14 +1200 Subject: [PATCH] feat: Surface provisioning failures to user (#1002) This PR updates provisioning such that user-related errors are written to the user-owned logs table, allowing them to debug issues with their schema. Logging to the user table is tricky, since this table is created _during_ the provisioning step itself. Therefore, I have split provisioning in to two phases: 1. System Resources - Setups up system related entities: database, schema, logs table/jobs etc. 2. User Resources - Applies user schema, configures Hasura etc. This separation allows us to isolate the tasks which are likely to fail due to user error, and therefore only surface errors which are relevant. The creation of the logs table _should always succeed_, if it doesn't there is something wrong with the system, i.e. some form of bug has been introduced. Errors thrown during the System portion of provisioning will be error logged to the machine, and I will tune the existing alert so that we are notified of these errors. Additionally, I have converted all non-critical error logs to warnings, so that we don't get alerted on non-issues. closes: #901 --- runner/src/indexer-config/indexer-config.ts | 5 +- runner/src/indexer-meta/indexer-meta.ts | 6 +- runner/src/provisioner/provisioner.test.ts | 8 +- runner/src/provisioner/provisioner.ts | 103 +++++++++++++----- .../services/data-layer/data-layer-service.ts | 8 +- .../server/services/runner/runner-service.ts | 2 +- runner/src/stream-handler/stream-handler.ts | 4 +- runner/src/stream-handler/worker.ts | 2 +- runner/tests/integration.test.ts | 17 +++ 9 files changed, 111 insertions(+), 44 deletions(-) diff --git a/runner/src/indexer-config/indexer-config.ts b/runner/src/indexer-config/indexer-config.ts index f975f484..04b4b136 100644 --- a/runner/src/indexer-config/indexer-config.ts +++ b/runner/src/indexer-config/indexer-config.ts @@ -17,7 +17,8 @@ export class ProvisioningConfig extends BaseConfig { constructor ( public readonly accountId: string, public readonly functionName: string, - public readonly schema: string + public readonly schema: string, + public readonly logLevel: LogLevel = LogLevel.INFO ) { super(accountId, functionName); } @@ -101,7 +102,7 @@ export default class IndexerConfig extends ProvisioningConfig { public readonly schema: string, public readonly logLevel: LogLevel ) { - super(accountId, functionName, schema); + super(accountId, functionName, schema, logLevel); const hash = crypto.createHash('sha256'); hash.update(`${accountId}/${functionName}`); this.executorId = hash.digest('hex'); diff --git a/runner/src/indexer-meta/indexer-meta.ts b/runner/src/indexer-meta/indexer-meta.ts index f0da10b9..d09b4139 100644 --- a/runner/src/indexer-meta/indexer-meta.ts +++ b/runner/src/indexer-meta/indexer-meta.ts @@ -4,7 +4,7 @@ import PgClient, { type PostgresConnectionParams } from '../pg-client'; import { trace } from '@opentelemetry/api'; import type LogEntry from './log-entry'; import { LogLevel } from './log-entry'; -import type IndexerConfig from '../indexer-config'; +import { type ProvisioningConfig } from '../indexer-config/indexer-config'; export enum IndexerStatus { PROVISIONING = 'PROVISIONING', @@ -29,11 +29,11 @@ export default class IndexerMeta implements IndexerMetaInterface { tracer = trace.getTracer('queryapi-runner-indexer-logger'); private readonly pgClient: PgClient; - private readonly indexerConfig: IndexerConfig; + private readonly indexerConfig: ProvisioningConfig; private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.sys_logs (block_height, date, timestamp, type, level, message) VALUES %L'; constructor ( - indexerConfig: IndexerConfig, + indexerConfig: ProvisioningConfig, databaseConnectionParameters: PostgresConnectionParams, pgClientInstance: PgClient | undefined = undefined, ) { diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 8ef12371..0b58c486 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -72,7 +72,13 @@ describe('Provisioner', () => { }; }); - provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig); + const IndexerMeta = jest.fn().mockImplementation(() => { + return { + writeLogs: jest.fn() + }; + }); + + provisioner = new Provisioner(hasuraClient, adminPgClient, cronPgClient, undefined, crypto, pgFormat, PgClient as any, testingRetryConfig, IndexerMeta); indexerConfig = new IndexerConfig('', accountId, functionName, 0, '', databaseSchema, LogLevel.INFO); }); diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index f9d4e511..237bdb1e 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -10,7 +10,8 @@ import { logsTableDDL } from './schemas/logs-table'; import { metadataTableDDL } from './schemas/metadata-table'; import PgClientClass, { type PostgresConnectionParams } from '../pg-client'; import { type ProvisioningConfig } from '../indexer-config/indexer-config'; -import { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus } from '../indexer-meta'; +import IndexerMetaClass, { METADATA_TABLE_UPSERT, MetadataFields, IndexerStatus, LogEntry } from '../indexer-meta'; +import logger from '../logger'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -60,6 +61,9 @@ const defaultRetryConfig: RetryConfig = { export default class Provisioner { tracer: Tracer = trace.getTracer('queryapi-runner-provisioner'); + private readonly SYSTEM_TABLES = ['sys_logs', 'sys_metadata']; + private readonly logger: typeof logger; + constructor ( private readonly hasuraClient: HasuraClient = new HasuraClient(), private readonly adminDefaultPgClient: PgClientClass = adminDefaultPgClientGlobal, @@ -69,7 +73,10 @@ export default class Provisioner { private readonly pgFormat: typeof pgFormatLib = pgFormatLib, private readonly PgClient: typeof PgClientClass = PgClientClass, private readonly retryConfig: RetryConfig = defaultRetryConfig, - ) {} + private readonly IndexerMeta: typeof IndexerMetaClass = IndexerMetaClass + ) { + this.logger = logger.child({ service: 'Provisioner' }); + } generatePassword (length: number = DEFAULT_PASSWORD_LENGTH): string { return this.crypto @@ -322,42 +329,82 @@ export default class Provisioner { }, 'Failed to deprovision'); } - async provisionUserApi (indexerConfig: ProvisioningConfig): Promise { // replace any with actual type + async provisionUserApi (indexerConfig: ProvisioningConfig): Promise { + const logger = this.logger.child({ accountId: indexerConfig.accountId, functionName: indexerConfig.functionName }); + + await wrapSpan(async () => { + await wrapError(async () => { + try { + await this.provisionSystemResources(indexerConfig); + } catch (error) { + logger.error('Failed to provision system resources', error); + throw error; + } + + try { + await this.provisionUserResources(indexerConfig); + } catch (err) { + const error = err as Error; + + try { + await this.writeFailureToUserLogs(indexerConfig, error); + } catch (error) { + logger.error('Failed to log provisioning failure', error); + } + + logger.warn('Failed to provision user resources', error); + throw error; + } + }, 'Failed to provision endpoint'); + }, this.tracer, 'provision indexer resources'); + } + + async writeFailureToUserLogs (indexerConfig: ProvisioningConfig, error: Error): Promise { + const indexerMeta = new this.IndexerMeta(indexerConfig, await this.getPostgresConnectionParameters(indexerConfig.userName())); + await indexerMeta.writeLogs([LogEntry.systemError(error.message)]); + } + + async provisionSystemResources (indexerConfig: ProvisioningConfig): Promise { const userName = indexerConfig.userName(); const databaseName = indexerConfig.databaseName(); const schemaName = indexerConfig.schemaName(); - await wrapSpan(async () => { - await wrapError( - async () => { - if (!await this.hasuraClient.doesSourceExist(databaseName)) { - const password = this.generatePassword(); - await this.createUserDb(userName, password, databaseName); - await this.addDatasource(userName, password, databaseName); - } + if (!await this.hasuraClient.doesSourceExist(databaseName)) { + const password = this.generatePassword(); + await this.createUserDb(userName, password, databaseName); + await this.addDatasource(userName, password, databaseName); + } - await this.createSchema(databaseName, schemaName); + await this.createSchema(databaseName, schemaName); - await this.createMetadataTable(databaseName, schemaName); - await this.setProvisioningStatus(userName, schemaName); - await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + await this.createMetadataTable(databaseName, schemaName); + await this.setProvisioningStatus(userName, schemaName); + await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - const updatedTableNames = await this.getTableNames(schemaName, databaseName); + await this.trackTables(schemaName, this.SYSTEM_TABLES, databaseName); - await this.trackTables(schemaName, updatedTableNames, databaseName); + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, this.SYSTEM_TABLES, ['select', 'insert', 'update', 'delete']); + }); + } - await this.exponentialRetry(async () => { - await this.trackForeignKeyRelationships(schemaName, databaseName); - }); + async provisionUserResources (indexerConfig: ProvisioningConfig): Promise { + const databaseName = indexerConfig.databaseName(); + const schemaName = indexerConfig.schemaName(); - await this.exponentialRetry(async () => { - await this.addPermissionsToTables(indexerConfig, updatedTableNames, ['select', 'insert', 'update', 'delete']); - }); - }, - 'Failed to provision endpoint' - ); - }, this.tracer, 'provision indexer resources'); + await this.runIndexerSql(databaseName, schemaName, indexerConfig.schema); + + const userTableNames = (await this.getTableNames(schemaName, databaseName)).filter((tableName) => !this.SYSTEM_TABLES.includes(tableName)); + + await this.trackTables(schemaName, userTableNames, databaseName); + + await this.exponentialRetry(async () => { + await this.trackForeignKeyRelationships(schemaName, databaseName); + }); + + await this.exponentialRetry(async () => { + await this.addPermissionsToTables(indexerConfig, userTableNames, ['select', 'insert', 'update', 'delete']); + }); } async exponentialRetry (fn: () => Promise): Promise { diff --git a/runner/src/server/services/data-layer/data-layer-service.ts b/runner/src/server/services/data-layer/data-layer-service.ts index d24befa0..61a1c94e 100644 --- a/runner/src/server/services/data-layer/data-layer-service.ts +++ b/runner/src/server/services/data-layer/data-layer-service.ts @@ -111,16 +111,12 @@ export function createDataLayerService ( .then(() => { logger.info('Successfully provisioned Data Layer'); }) - .catch((err) => { - logger.error('Failed to provision Data Layer', err); - throw err; - }) ); callback(null, { taskId }); }) .catch((err) => { - logger.error('Failed to check if Data Layer is provisioned', err); + logger.warn('Failed to check if Data Layer is provisioned', err); const internal = new StatusBuilder() .withCode(status.INTERNAL) @@ -148,7 +144,7 @@ export function createDataLayerService ( logger.info('Successfully deprovisioned Data Layer'); }) .catch((err) => { - logger.error('Failed to deprovision Data Layer', err); + logger.warn('Failed to deprovision Data Layer', err); throw err; }) ); diff --git a/runner/src/server/services/runner/runner-service.ts b/runner/src/server/services/runner/runner-service.ts index 6299beac..dd29d392 100644 --- a/runner/src/server/services/runner/runner-service.ts +++ b/runner/src/server/services/runner/runner-service.ts @@ -81,7 +81,7 @@ export function getRunnerService ( executors.set(indexerConfig.executorId, streamHandler); callback(null, { executorId: indexerConfig.executorId }); streamHandler.start().catch((error: Error) => { - logger.error('Failed to start executor', error); + logger.warn('Failed to start executor', error); }); } catch (e) { const error = e as Error; diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index b6742840..9b9ea9ee 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -74,7 +74,7 @@ export default class StreamHandler { this.executorContext.executionState = ExecutionState.RUNNING; } catch (error: any) { const errorContent = error instanceof Error ? error.toString() : JSON.stringify(error); - this.logger.error('Terminating thread', error); + this.logger.warn('Terminating thread', error); this.executorContext.executionState = ExecutionState.STALLED; throw new Error(`Failed to start Indexer: ${errorContent}`); } @@ -92,7 +92,7 @@ export default class StreamHandler { } private handleError (error: Error): void { - this.logger.error('Terminating thread', error); + this.logger.warn('Terminating thread', error); this.executorContext.executionState = ExecutionState.STALLED; if (this.indexerMeta) { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 3ca3e5c9..38a6483b 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -189,7 +189,7 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise const error = err as Error; if (previousError !== error.message) { previousError = error.message; - workerContext.logger.error(`Failed on block ${currBlockHeight}`, err); + workerContext.logger.warn(`Failed on block ${currBlockHeight}`, err); } const sleepSpan = tracer.startSpan('Sleep for 10 seconds after failing', {}, context.active()); await sleep(10000); diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index 1fd71d9c..cca35999 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -270,6 +270,23 @@ describe('Indexer integration', () => { await expect(pgClient.query('SELECT * FROM cron.job WHERE jobname like $1', ['provisioning_near_test_provisioning%']).then(({ rows }) => rows)).resolves.toHaveLength(0); await expect(hasuraClient.doesSourceExist(testConfig1.databaseName())).resolves.toBe(false); }); + + it('Writes provisioning errors to user logs table', async () => { + const testConfig = new IndexerConfig( + 'test:stream', + 'user-failures.near', // must be unique to prevent conflicts with other tests + 'test', + 0, + '', + 'broken schema', + LogLevel.INFO + ); + + await expect(provisioner.provisionUserApi(testConfig)).rejects.toThrow(); + + const logs: any = await indexerLogsQuery(testConfig.schemaName(), graphqlClient); + expect(logs[0].message).toContain('Failed to run user script'); + }); }); async function prepareIndexer (indexerConfig: IndexerConfig, provisioner: Provisioner, hasuraContainer: StartedHasuraGraphQLContainer): Promise {