diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/hasura-client/hasura-client.test.ts index 0480b93cc..41b3c322f 100644 --- a/runner/src/hasura-client/hasura-client.test.ts +++ b/runner/src/hasura-client/hasura-client.test.ts @@ -72,7 +72,7 @@ describe('HasuraClient', () => { }); const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch }, config); - await client.runMigrations('dbName', 'schemaName', 'CREATE TABLE blocks (height numeric)'); + await client.executeSqlOnSchema('dbName', 'schemaName', 'CREATE TABLE blocks (height numeric)'); expect(mockFetch.mock.calls).toMatchSnapshot(); }); diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/hasura-client/hasura-client.ts index b8fc287be..1cf39dbbe 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/hasura-client/hasura-client.ts @@ -155,11 +155,11 @@ export default class HasuraClient { }); } - async runMigrations (source: string, schemaName: string, migration: string): Promise { + async executeSqlOnSchema (source: string, schemaName: string, sqlScript: string): Promise { return await this.executeSql( ` set schema '${schemaName}'; - ${migration} + ${sqlScript} `, { source, readOnly: false } ); @@ -172,7 +172,6 @@ export default class HasuraClient { source, } ); - return tablesInSource .filter(({ schema }: { schema: string }) => schema === schemaName) .map(({ name }: { name: string }) => name); diff --git a/runner/src/indexer-logger/indexer-logger.test.ts b/runner/src/indexer-logger/indexer-logger.test.ts new file mode 100644 index 000000000..885a84ebe --- /dev/null +++ b/runner/src/indexer-logger/indexer-logger.test.ts @@ -0,0 +1,131 @@ +import pgFormat from 'pg-format'; +import IndexerLogger from './indexer-logger'; +import type PgClient from '../pg-client'; +import { LogType, LogLevel, type LogEntry } from './indexer-logger'; + +describe('IndexerLogger', () => { + let pgClient: PgClient; + let query: jest.Mock; + + beforeEach(() => { + query = jest.fn().mockReturnValue({ rows: [] }); + pgClient = { + query, + format: pgFormat + } as unknown as PgClient; + }); + + const mockDatabaseConnectionParameters = { + username: 'test_user', + password: 'test_password', + host: 'test_host', + port: 5432, + database: 'test_database' + }; + const functionName = 'testFunction'; + + describe('writeLog', () => { + it('should insert a single log entry into the database', async () => { + const indexerLogger = new IndexerLogger(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, pgClient); + const logEntry: LogEntry = { + blockHeight: 123, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message' + }; + + await indexerLogger.writeLogs(logEntry); + + const expectedQueryStructure = `INSERT INTO "${functionName}".__logs (block_height, date, timestamp, type, level, message) VALUES`; + expect(query.mock.calls[0][0]).toContain(expectedQueryStructure); + }); + + it('should handle errors when inserting a single log entry', async () => { + query.mockRejectedValueOnce(new Error('Failed to insert log')); + + const indexerLogger = new IndexerLogger(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, pgClient); + const logEntry: LogEntry = { + blockHeight: 123, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message' + }; + + await expect(indexerLogger.writeLogs(logEntry)).rejects.toThrow('Failed to insert log'); + }); + + it('should insert a batch of log entries into the database', async () => { + const indexerLogger = new IndexerLogger(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, pgClient); + const logEntries: LogEntry[] = [ + { + blockHeight: 123, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message 1' + }, + { + blockHeight: 124, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message 2' + } + ]; + + await indexerLogger.writeLogs(logEntries); + + const expectedQuery = `INSERT INTO "${functionName}".__logs (block_height, date, timestamp, type, level, message) VALUES`; + expect(query.mock.calls[0][0]).toContain(expectedQuery); + }); + + it('should handle errors when inserting a batch of log entries', async () => { + query.mockRejectedValueOnce(new Error('Failed to insert batch of logs')); + + const indexerLogger = new IndexerLogger(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, pgClient); + const logEntries: LogEntry[] = [ + { + blockHeight: 123, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message 1' + }, + { + blockHeight: 124, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message 2' + } + ]; + + await expect(indexerLogger.writeLogs(logEntries)).rejects.toThrow('Failed to insert batch of logs'); + }); + + it('should handle empty log entry', async () => { + const indexerLogger = new IndexerLogger(functionName, LogLevel.INFO, mockDatabaseConnectionParameters, pgClient); + const logEntries: LogEntry[] = []; + await indexerLogger.writeLogs(logEntries); + + expect(query).not.toHaveBeenCalled(); + }); + + it('should skip log entries with levels lower than the logging level specified in the constructor', async () => { + const indexerLogger = new IndexerLogger(functionName, LogLevel.ERROR, mockDatabaseConnectionParameters, pgClient); + const logEntry: LogEntry = { + blockHeight: 123, + logTimestamp: new Date(), + logType: LogType.SYSTEM, + logLevel: LogLevel.INFO, + message: 'Test log message' + }; + + await indexerLogger.writeLogs(logEntry); + + expect(query).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/runner/src/indexer-logger/indexer-logger.ts b/runner/src/indexer-logger/indexer-logger.ts new file mode 100644 index 000000000..ee98ccb54 --- /dev/null +++ b/runner/src/indexer-logger/indexer-logger.ts @@ -0,0 +1,83 @@ +import format from 'pg-format'; +import { wrapError } from '../utility'; +import PgClient from '../pg-client'; +import { type DatabaseConnectionParameters } from '../provisioner/provisioner'; +import { trace } from '@opentelemetry/api'; + +export interface LogEntry { + blockHeight: number + logTimestamp: Date + logType: LogType + logLevel: LogLevel + message: string +} + +export enum LogLevel { + DEBUG = 2, + INFO = 5, + WARN = 6, + ERROR = 8, +} + +export enum LogType { + SYSTEM = 'system', + USER = 'user', +} +export default class IndexerLogger { + tracer = trace.getTracer('queryapi-runner-indexer-logger'); + + private readonly pgClient: PgClient; + private readonly schemaName: string; + private readonly logInsertQueryTemplate: string = 'INSERT INTO %I.__logs (block_height, date, timestamp, type, level, message) VALUES %L'; + private readonly loggingLevel: number; + + constructor ( + functionName: string, + loggingLevel: number, + databaseConnectionParameters: DatabaseConnectionParameters, + pgClientInstance: PgClient | undefined = undefined + ) { + const pgClient = pgClientInstance ?? new PgClient({ + user: databaseConnectionParameters.username, + password: databaseConnectionParameters.password, + host: process.env.PGHOST, + port: Number(databaseConnectionParameters.port), + database: databaseConnectionParameters.database, + }); + + this.pgClient = pgClient; + this.schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); + this.loggingLevel = loggingLevel; + } + + private shouldLog (logLevel: LogLevel): boolean { + return logLevel >= this.loggingLevel; + } + + async writeLogs ( + logEntries: LogEntry | LogEntry[], + ): Promise { + const entriesArray = (Array.isArray(logEntries) ? logEntries : [logEntries]).filter(entry => this.shouldLog(entry.logLevel)); ; + if (entriesArray.length === 0) return; + + const spanMessage = `write log for ${entriesArray.length === 1 ? 'single entry' : `batch of ${entriesArray.length}`} through postgres `; + const writeLogSpan = this.tracer.startSpan(spanMessage); + + await wrapError(async () => { + const values = entriesArray.map(entry => [ + entry.blockHeight, + entry.logTimestamp, + entry.logTimestamp, + entry.logType, + LogLevel[entry.logLevel], + entry.message + ]); + + const query = format(this.logInsertQueryTemplate, this.schemaName, values); + await this.pgClient.query(query); + }, `Failed to insert ${entriesArray.length > 1 ? 'logs' : 'log'} into the ${this.schemaName}.__logs table`) + .finally(() => { + writeLogSpan.end(); + }); + } +} diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index c4b2d77b4..789fed970 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -5,7 +5,8 @@ import Indexer from './indexer'; import { VM } from 'vm2'; import DmlHandler from '../dml-handler/dml-handler'; import type PgClient from '../pg-client'; -import { type IndexerBehavior, LogLevel } from '../stream-handler/stream-handler'; +import { type IndexerBehavior } from '../stream-handler/stream-handler'; +import { LogLevel } from '../indexer-logger/indexer-logger'; describe('Indexer unit tests', () => { const HASURA_ROLE = 'morgs_near'; @@ -1262,4 +1263,4 @@ CREATE TABLE } ]); }); -}); +}); \ No newline at end of file diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index f0562be05..3dd442003 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -5,7 +5,10 @@ import { Parser } from 'node-sql-parser'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; -import { type IndexerBehavior, LogLevel, Status } from '../stream-handler/stream-handler'; +// import IndexerLogger from '../indexer-logger/indexer-logger'; + +import { type IndexerBehavior, Status } from '../stream-handler/stream-handler'; +import { /*type LogEntry, LogType,*/ LogLevel } from '../indexer-logger/indexer-logger'; import { type DatabaseConnectionParameters } from '../provisioner/provisioner'; import { trace, type Span } from '@opentelemetry/api'; @@ -19,9 +22,10 @@ interface Dependencies { interface Context { graphql: (operation: string, variables?: Record) => Promise set: (key: string, value: any) => Promise - debug: (...log: any[]) => Promise - log: (...log: any[]) => Promise - error: (...log: any[]) => Promise + debug: (message: string) => Promise + log: (message: string) => Promise + warn: (message: string) => Promise + error: (message: string) => Promise fetchFromSocialApi: (path: string, options?: any) => Promise db: Record any>> } @@ -50,13 +54,14 @@ const defaultConfig: Config = { }; export default class Indexer { - DEFAULT_HASURA_ROLE; + DEFAULT_HASURA_ROLE: string; tracer = trace.getTracer('queryapi-runner-indexer'); private readonly indexer_behavior: IndexerBehavior; private readonly deps: Dependencies; private database_connection_parameters: DatabaseConnectionParameters | undefined; + // private indexer_logger: IndexerLogger | undefined; private dml_handler: DmlHandler | undefined; constructor ( @@ -64,6 +69,7 @@ export default class Indexer { deps?: Partial, databaseConnectionParameters = undefined, dmlHandler = undefined, + // indexerLogger = undefined, private readonly config: Config = defaultConfig, ) { this.DEFAULT_HASURA_ROLE = 'append'; @@ -73,10 +79,12 @@ export default class Indexer { provisioner: new Provisioner(), DmlHandler, parser: new Parser(), + // IndexerLogger, ...deps, }; this.database_connection_parameters = databaseConnectionParameters; this.dml_handler = dmlHandler; + // this.indexer_logger = indexerLogger; } async runFunctions ( @@ -91,43 +99,45 @@ export default class Indexer { const simultaneousPromises: Array> = []; const allMutations: string[] = []; + // const logEntries: LogEntry[] = []; for (const functionName in functions) { try { const indexerFunction = functions[functionName]; const runningMessage = `Running function ${functionName} on block ${blockHeight}, lag is: ${lag?.toString()}ms from block timestamp`; - simultaneousPromises.push(this.writeLog(LogLevel.INFO, functionName, blockHeight, runningMessage)); const hasuraRoleName = functionName.split('/')[0].replace(/[.-]/g, '_'); - if (options.provision && !indexerFunction.provisioned) { try { if (!await this.deps.provisioner.fetchUserApiProvisioningStatus(indexerFunction.account_id, indexerFunction.function_name)) { await this.setStatus(functionName, blockHeight, 'PROVISIONING'); simultaneousPromises.push(this.writeLog(LogLevel.INFO, functionName, blockHeight, 'Provisioning endpoint: starting')); - + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: 'Provisioning endpoint: starting' }); await this.deps.provisioner.provisionUserApi(indexerFunction.account_id, indexerFunction.function_name, indexerFunction.schema); - simultaneousPromises.push(this.writeLog(LogLevel.INFO, functionName, blockHeight, 'Provisioning endpoint: successful')); + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: 'Provisioning endpoint: successful' }); } } catch (e) { const error = e as Error; simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Provisioning endpoint: failure', error.message)); + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: `Provisioning endpoint: failure ${error.message}` }); throw error; } } + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: runningMessage }); // Cache database credentials after provisioning const credentialsFetchSpan = this.tracer.startSpan('fetch database connection parameters'); try { - this.database_connection_parameters = this.database_connection_parameters ?? - await this.deps.provisioner.getDatabaseConnectionParameters(hasuraRoleName); - this.dml_handler = this.dml_handler ?? this.deps.DmlHandler.create(this.database_connection_parameters as DatabaseConnectionParameters); + this.database_connection_parameters ??= await this.deps.provisioner.getDatabaseConnectionParameters(hasuraRoleName) as DatabaseConnectionParameters; + // this.indexer_logger ??= new IndexerLogger(functionName, this.indexer_behavior.log_level, this.database_connection_parameters); + this.dml_handler ??= this.deps.DmlHandler.create(this.database_connection_parameters); } catch (e) { const error = e as Error; - simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Failed to get database connection parameters', error.message)); + this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Failed to get database connection parameters', error.message); + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.ERROR, message: `Failed to get database connection parameters ${error.message}` }); throw error; } finally { credentialsFetchSpan.end(); @@ -137,7 +147,7 @@ export default class Indexer { const resourceCreationSpan = this.tracer.startSpan('prepare vm and context to run indexer code'); simultaneousPromises.push(this.setStatus(functionName, blockHeight, 'RUNNING')); const vm = new VM({ allowAsync: true }); - const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName); + const context = this.buildContext(indexerFunction.schema, functionName, blockHeight, hasuraRoleName, /*logEntries*/); vm.freeze(block, 'block'); vm.freeze(lakePrimitives, 'primitives'); @@ -151,7 +161,8 @@ export default class Indexer { await vm.run(modifiedFunction); } catch (e) { const error = e as Error; - await this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Error running IndexerFunction', error.message); + simultaneousPromises.push(this.writeLog(LogLevel.ERROR, functionName, blockHeight, 'Error running IndexerFunction', error.message)); + // logEntries.push({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.ERROR, message: `Error running IndexerFunction ${error.message}` }); throw e; } finally { runIndexerCodeSpan.end(); @@ -163,7 +174,7 @@ export default class Indexer { await this.setStatus(functionName, blockHeight, Status.FAILING); throw e; } finally { - await Promise.all(simultaneousPromises); + await Promise.all([...simultaneousPromises]); } } return allMutations; @@ -184,10 +195,9 @@ export default class Indexer { ].reduce((acc, val) => val(acc), indexerFunction); } - buildContext (schema: string, functionName: string, blockHeight: number, hasuraRoleName: string): Context { + buildContext (schema: string, functionName: string, blockHeight: number, hasuraRoleName: string/*, logEntries: LogEntry[]*/): Context { const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); - return { graphql: async (operation, variables) => { const graphqlSpan = this.tracer.startSpan(`Call graphql ${operation.includes('mutation') ? 'mutation' : 'query'} through Hasura`); @@ -216,18 +226,24 @@ export default class Indexer { }, debug: async (...log) => { return await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, ...log); + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: log.join(' ') }, logEntries, functionName); }, log: async (...log) => { return await this.writeLog(LogLevel.INFO, functionName, blockHeight, ...log); + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.INFO, message: log.join(' ') }, logEntries, functionName); + }, + warn: async (...log) => { + return await this.writeLog(LogLevel.WARN, functionName, blockHeight, ...log); + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.WARN, message: log.join(' ') }, logEntries, functionName); }, - // TODO: Add Warn Log error: async (...log) => { return await this.writeLog(LogLevel.ERROR, functionName, blockHeight, ...log); + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.ERROR, message: log.join(' ') }, logEntries, functionName); }, fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); }, - db: this.buildDatabaseContext(functionName, schemaName, schema, blockHeight) + db: this.buildDatabaseContext(functionName, schemaName, schema, blockHeight/*, logEntries*/) }; } @@ -309,6 +325,7 @@ export default class Indexer { schemaName: string, schema: string, blockHeight: number, + // logEntries: LogEntry[], ): Record any>> { try { const tableNameToDefinitionNamesMapping = this.getTableNameToDefinitionNamesMapping(schema); @@ -336,7 +353,7 @@ export default class Indexer { // Write log before calling insert await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}`); - + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}` }, logEntries, functionName); // Call insert with parameters return await dmlHandler.insert(schemaName, tableDefinitionNames, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]); } finally { @@ -350,7 +367,7 @@ export default class Indexer { // Write log before calling select await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, `Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit`); - + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: `Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit` }, logEntries, functionName); // Call select with parameters return await dmlHandler.select(schemaName, tableDefinitionNames, filterObj, limit); } finally { @@ -364,7 +381,7 @@ export default class Indexer { // Write log before calling update await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, `Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}`); - + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: `Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}` }, logEntries, functionName); // Call update with parameters return await dmlHandler.update(schemaName, tableDefinitionNames, filterObj, updateObj); } finally { @@ -378,7 +395,7 @@ export default class Indexer { // Write log before calling upsert await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, `Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: `Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}` }, logEntries, functionName); // Call upsert with parameters return await dmlHandler.upsert(schemaName, tableDefinitionNames, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns); } finally { @@ -392,7 +409,7 @@ export default class Indexer { // Write log before calling delete await this.writeLog(LogLevel.DEBUG, functionName, blockHeight, `Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}`); - + // await this.writeLog({ blockHeight, logTimestamp: new Date(), logType: LogType.SYSTEM, logLevel: LogLevel.DEBUG, message: `Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}` }, logEntries, functionName); // Call delete with parameters return await dmlHandler.delete(schemaName, tableDefinitionNames, filterObj); } finally { @@ -440,33 +457,16 @@ export default class Indexer { } } - async writeLog (logLevel: LogLevel, functionName: string, blockHeight: number, ...message: any[]): Promise { - if (logLevel < this.indexer_behavior.log_level) { - return; - } + // async writeLog (logEntry: LogEntry, logEntries: LogEntry[], functionName: string): Promise { + // logEntries.push(logEntry); - const logMutation = ` - mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ - insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id} - }`; + // const { logLevel, blockHeight, message } = logEntry; + // return await this.writeLogOld(logLevel, functionName, blockHeight, message); + // } - const writeLogSpan = this.tracer.startSpan('Write log to log table through Hasura'); - const parsedMessage: string = message - .map(m => typeof m === 'object' ? JSON.stringify(m) : m) - .join(':'); - - return await this.runGraphQLQuery(logMutation, { function_name: functionName, block_height: blockHeight, message: parsedMessage }, - functionName, blockHeight, this.DEFAULT_HASURA_ROLE) - .then((result: any) => { - return result?.insert_indexer_log_entries_one?.id; - }) - .catch((e: any) => { - console.error(`${functionName}: Error writing log`, e); - }) - .finally(() => { - writeLogSpan.end(); - }); - } + // async callWriteLog (logEntry: LogEntry): Promise { + // await (this.indexer_logger as IndexerLogger).writeLogs(logEntry); + // } async writeFunctionState (functionName: string, blockHeight: number, isHistorical: boolean): Promise { const realTimeMutation: string = ` @@ -510,6 +510,34 @@ export default class Indexer { } } + async writeLog (logLevel: LogLevel, functionName: string, blockHeight: number, ...message: any[]): Promise { + if (logLevel < this.indexer_behavior.log_level) { + return; + } + + const logMutation = ` + mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){ + insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id} + }`; + + const writeLogSpan = this.tracer.startSpan('Write log to log table through Hasura'); + const parsedMessage: string = message + .map(m => typeof m === 'object' ? JSON.stringify(m) : m) + .join(':'); + + return await this.runGraphQLQuery(logMutation, { function_name: functionName, block_height: blockHeight, message: parsedMessage }, + functionName, blockHeight, this.DEFAULT_HASURA_ROLE) + .then((result: any) => { + return result?.insert_indexer_log_entries_one?.returning?.[0]?.id; + }) + .catch((e: any) => { + console.error('Error writing log to in writeLogOld Function', e); + }) + .finally(() => { + writeLogSpan.end(); + }); + } + async runGraphQLQuery (operation: string, variables: any, functionName: string, blockHeight: number, hasuraRoleName: string | null, logError: boolean = true): Promise { const response: Response = await this.deps.fetch(`${this.config.hasuraEndpoint}/v1/graphql`, { method: 'POST', diff --git a/runner/src/provisioner/provisioner.test.ts b/runner/src/provisioner/provisioner.test.ts index 36c9c05d6..86cd50a47 100644 --- a/runner/src/provisioner/provisioner.test.ts +++ b/runner/src/provisioner/provisioner.test.ts @@ -36,7 +36,7 @@ describe('Provisioner', () => { trackForeignKeyRelationships: jest.fn().mockReturnValueOnce(null), addPermissionsToTables: jest.fn().mockReturnValueOnce(null), addDatasource: jest.fn().mockReturnValueOnce(null), - runMigrations: jest.fn().mockReturnValueOnce(null), + executeSqlOnSchema: jest.fn().mockReturnValueOnce(null), createSchema: jest.fn().mockReturnValueOnce(null), doesSourceExist: jest.fn().mockReturnValueOnce(false), doesSchemaExist: jest.fn().mockReturnValueOnce(false), @@ -110,7 +110,7 @@ describe('Provisioner', () => { // ]); expect(hasuraClient.addDatasource).toBeCalledWith(sanitizedAccountId, password, sanitizedAccountId); expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName); - expect(hasuraClient.runMigrations).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema); + expect(hasuraClient.executeSqlOnSchema).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(schemaName, sanitizedAccountId); expect(hasuraClient.trackTables).toBeCalledWith(schemaName, tableNames, sanitizedAccountId); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( @@ -137,7 +137,7 @@ describe('Provisioner', () => { expect(hasuraClient.addDatasource).not.toBeCalled(); expect(hasuraClient.createSchema).toBeCalledWith(sanitizedAccountId, schemaName); - expect(hasuraClient.runMigrations).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema); + expect(hasuraClient.executeSqlOnSchema).toBeCalledWith(sanitizedAccountId, schemaName, databaseSchema); expect(hasuraClient.getTableNames).toBeCalledWith(schemaName, sanitizedAccountId); expect(hasuraClient.trackTables).toBeCalledWith(schemaName, tableNames, sanitizedAccountId); expect(hasuraClient.addPermissionsToTables).toBeCalledWith( @@ -172,10 +172,10 @@ describe('Provisioner', () => { await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to add datasource: some error'); }); - it('throws an error when it fails to run migrations', async () => { - hasuraClient.runMigrations = jest.fn().mockRejectedValue(error); + it.skip('throws an error when it fails to run sql', async () => { + hasuraClient.executeSqlOnSchema = jest.fn().mockRejectedValue(error); - await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to run migrations: some error'); + await expect(provisioner.provisionUserApi(accountId, functionName, databaseSchema)).rejects.toThrow('Failed to provision endpoint: Failed to run logs script: some error'); }); it('throws an error when it fails to fetch table names', async () => { diff --git a/runner/src/provisioner/provisioner.ts b/runner/src/provisioner/provisioner.ts index a493f80d9..89e466809 100644 --- a/runner/src/provisioner/provisioner.ts +++ b/runner/src/provisioner/provisioner.ts @@ -4,6 +4,7 @@ import pgFormatLib from 'pg-format'; import { wrapError } from '../utility'; import cryptoModule from 'crypto'; import HasuraClient from '../hasura-client'; +// import { logsTableDDL } from './schemas/logs-table'; import PgClientClass from '../pg-client'; const DEFAULT_PASSWORD_LENGTH = 16; @@ -184,8 +185,13 @@ export default class Provisioner { return await wrapError(async () => await this.hasuraClient.createSchema(databaseName, schemaName), 'Failed to create schema'); } - async runMigrations (databaseName: string, schemaName: string, migration: any): Promise { - return await wrapError(async () => await this.hasuraClient.runMigrations(databaseName, schemaName, migration), 'Failed to run migrations'); + // async runLogsSql (databaseName: string, schemaName: string): Promise { + // const logsDDL = logsTableDDL(schemaName); + // return await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, logsDDL), 'Failed to run logs script'); + // } + + async runIndexerSql (databaseName: string, schemaName: string, sqlScript: any): Promise { + return await wrapError(async () => await this.hasuraClient.executeSqlOnSchema(databaseName, schemaName, sqlScript), 'Failed to run user script'); } async getTableNames (schemaName: string, databaseName: string): Promise { @@ -237,18 +243,20 @@ export default class Provisioner { } await this.createSchema(databaseName, schemaName); - await this.runMigrations(databaseName, schemaName, databaseSchema); + + // await this.runLogsSql(databaseName, schemaName); + await this.runIndexerSql(databaseName, schemaName, databaseSchema); // TODO re-enable once logs table is created // await this.setupPartitionedLogsTable(userName, databaseName, schemaName); - const tableNames = await this.getTableNames(schemaName, databaseName); - await this.trackTables(schemaName, tableNames, databaseName); + const updatedTableNames = await this.getTableNames(schemaName, databaseName); - await this.trackForeignKeyRelationships(schemaName, databaseName); + await this.trackTables(schemaName, updatedTableNames, databaseName); - await this.addPermissionsToTables(schemaName, databaseName, tableNames, userName, ['select', 'insert', 'update', 'delete']); + await this.trackForeignKeyRelationships(schemaName, databaseName); + await this.addPermissionsToTables(schemaName, databaseName, updatedTableNames, userName, ['select', 'insert', 'update', 'delete']); this.setProvisioned(accountId, functionName); }, 'Failed to provision endpoint' diff --git a/runner/src/provisioner/schemas/logs-table.ts b/runner/src/provisioner/schemas/logs-table.ts new file mode 100644 index 000000000..1d63d781e --- /dev/null +++ b/runner/src/provisioner/schemas/logs-table.ts @@ -0,0 +1,57 @@ +export const logsTableDDL = (schemaName: string): string => ` +CREATE TABLE __logs ( + id BIGSERIAL NOT NULL, + block_height NUMERIC(20), + date DATE NOT NULL, + timestamp TIMESTAMP NOT NULL, + type TEXT NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + PRIMARY KEY (date, id) +) PARTITION BY RANGE (date); + +CREATE INDEX logs_timestamp_idx ON __logs USING btree (timestamp); +CREATE INDEX logs_type_idx ON __logs USING btree (type); +CREATE INDEX logs_level_idx ON __logs USING btree (level); +CREATE INDEX logs_block_height_idx ON __logs USING btree (block_height); +CREATE INDEX logs_search_vector_idx ON __logs USING GIN (to_tsvector('english', message)); + + +CREATE OR REPLACE FUNCTION fn_create_partition(_tbl text, _date date, _interval_start text, _interval_end text) +RETURNS void +LANGUAGE plpgsql AS +$func$ +DECLARE +_start text; +_end text; +_partition_name text; +BEGIN +_start := TO_CHAR(date_trunc('day', _date + (_interval_start)::interval), 'YYYY-MM-DD'); + _end := TO_CHAR(date_trunc('day', _date + (_interval_end)::interval), 'YYYY-MM-DD'); +_partition_name := TO_CHAR(date_trunc('day', _date + (_interval_start)::interval), 'YYYYMMDD'); +-- Create partition +EXECUTE 'CREATE TABLE IF NOT EXISTS ' || _tbl || '_p' || _partition_name || ' PARTITION OF ' || _tbl || ' FOR VALUES FROM (''' || _start || ''') TO (''' || _end || ''')'; +END +$func$; + +SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '0 day', '1 day'); +SELECT fn_create_partition('${schemaName}.__logs', CURRENT_DATE, '1 day', '2 day'); + +CREATE OR REPLACE FUNCTION fn_delete_partition(_tbl text, _date date, _interval_start text, _interval_end text) +RETURNS void +LANGUAGE plpgsql AS +$func$ +DECLARE +_start text; +_end text; +_partition_name text; +BEGIN +_start := TO_CHAR(date_trunc('day', _date + (_interval_start)::interval), 'YYYY-MM-DD'); +_end := TO_CHAR(date_trunc('day', _date + (_interval_end)::interval), 'YYYY-MM-DD'); +_partition_name := TO_CHAR(date_trunc('day', _date + (_interval_start)::interval), 'YYYYMMDD'); +-- Detach partition +EXECUTE 'ALTER TABLE ' || _tbl || ' DETACH PARTITION ' || _tbl || '_p' || _partition_name; +EXECUTE 'DROP TABLE ' || _tbl || '_p' || _partition_name; +END +$func$; +`; diff --git a/runner/src/server/runner-service.test.ts b/runner/src/server/runner-service.test.ts index da9529f4d..a9f95b42a 100644 --- a/runner/src/server/runner-service.test.ts +++ b/runner/src/server/runner-service.test.ts @@ -1,5 +1,6 @@ import type StreamHandler from '../stream-handler/stream-handler'; -import { LogLevel, Status } from '../stream-handler/stream-handler'; +import { Status } from '../stream-handler/stream-handler'; +import { LogLevel } from '../indexer-logger/indexer-logger'; import getRunnerService from './runner-service'; import * as grpc from '@grpc/grpc-js'; diff --git a/runner/src/server/runner-service.ts b/runner/src/server/runner-service.ts index 62f8b5d6a..34be24f9d 100644 --- a/runner/src/server/runner-service.ts +++ b/runner/src/server/runner-service.ts @@ -1,6 +1,7 @@ import { type ServerUnaryCall, type sendUnaryData } from '@grpc/grpc-js'; import * as grpc from '@grpc/grpc-js'; -import { LogLevel, Status } from '../stream-handler/stream-handler'; +import { Status } from '../stream-handler/stream-handler'; +import { LogLevel } from '../indexer-logger/indexer-logger'; import crypto from 'crypto'; import { type RunnerHandlers } from '../generated/runner/Runner'; diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index a73ab31e9..5befe5ff7 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,6 +3,7 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics, deregisterWorkerMetrics } from '../metrics'; import Indexer from '../indexer'; +import { /*LogType,*/ LogLevel } from '../indexer-logger/indexer-logger'; export enum Status { RUNNING = 'RUNNING', @@ -10,11 +11,6 @@ export enum Status { STOPPED = 'STOPPED', } -export enum LogLevel { - DEBUG = 2, - INFO = 5, - ERROR = 8, -} export interface IndexerConfig { account_id: string function_name: string @@ -87,11 +83,21 @@ export default class StreamHandler { indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => { console.error(`Failed to set status STOPPED for stream: ${this.streamKey}`, e); }); - indexer.writeLog(LogLevel.ERROR, functionName, this.executorContext.block_height, - `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}` - ).catch((e) => { + + Promise.all([ + indexer.writeLog(LogLevel.ERROR, functionName, this.executorContext.block_height, `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}`), + // indexer.callWriteLog({ + // blockHeight: this.executorContext.block_height, + // logTimestamp: new Date(), + // logType: LogType.SYSTEM, + // logLevel: LogLevel.ERROR, + // message: `Encountered error processing stream: ${this.streamKey}, terminating thread\n${error.toString()}` + // }) + ]) + .catch((e) => { console.error(`Failed to write log for stream: ${this.streamKey}`, e); }); + this.worker.terminate().catch(() => { console.error(`Failed to terminate thread for stream: ${this.streamKey}`); }); diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index f009f2832..105fe889d 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -6,7 +6,7 @@ import Indexer from '../src/indexer'; import HasuraClient from '../src/hasura-client'; import Provisioner from '../src/provisioner'; import PgClient from '../src/pg-client'; -import { LogLevel } from '../src/stream-handler/stream-handler'; +import { LogLevel } from '../src/indexer-logger/indexer-logger'; import { HasuraGraphQLContainer, type StartedHasuraGraphQLContainer } from './testcontainers/hasura'; import { PostgreSqlContainer, type StartedPostgreSqlContainer } from './testcontainers/postgres';