From 359793117cce584857e8379476c6627b7e9ed981 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 16 Aug 2023 17:15:16 -0400 Subject: [PATCH 01/35] First pass at refactoring operator for stability, readability, logging, error handling --- src/commands/operator/index.ts | 247 ++++++++++++++++++--------------- src/utils/operator-job.ts | 121 +++++++++++----- 2 files changed, 224 insertions(+), 144 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 2faf08a2..b9453300 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -346,130 +346,159 @@ export default class Operator extends OperatorJobAwareCommand { ) this.networkMonitor.structuredLog(job.network, `Identified this as a ${interestingTransaction.bloomId} event`, tags) - // Depending on the event type, perform relevant actions try { switch (type) { - case EventType.CrossChainMessageSent: { - try { - const crossChainMessageSentEvent: CrossChainMessageSentEvent | null = this.bloomFilters[ - type - ]!.bloomEvent.decode(type, interestingTransaction.log!) - if (crossChainMessageSentEvent !== null) { - this.networkMonitor.structuredLog( - job.network, - `Bridge request found for job hash ${crossChainMessageSentEvent.messageHash}`, - tags, - ) - } - } catch (error: any) { - this.networkMonitor.structuredLogError( - job.network, - this.errorColor(`Decoding CrossChainMessageSentEvent error: `, error), - tags, - ) - } - + case EventType.CrossChainMessageSent: + await this.handleCrossChainMessageSentEvent(job, type, interestingTransaction, tags) break - } - - case EventType.AvailableOperatorJob: { - try { - const availableOperatorJobEvent: AvailableOperatorJobEvent | null = this.bloomFilters[ - type - ]!.bloomEvent.decode(type, interestingTransaction.log!) - if (availableOperatorJobEvent !== null) { - this.networkMonitor.structuredLog( - job.network, - `Found a new job ${availableOperatorJobEvent.jobHash}`, - tags, - ) - // first update operator details, in case operator was selected for a job, or any data changed - this.networkMonitor.structuredLog(job.network, `Updating operator status`, tags) - await this.updateOperatorStatus(job.network) - // then add operator job to internal list of jobs to monitor and work on - this.networkMonitor.structuredLog(job.network, `Adding job to list of available jobs`, tags) - await this.decodeOperatorJob( - job.network, - availableOperatorJobEvent.jobHash, - availableOperatorJobEvent.payload, - tags, - ) - } - } catch (error: any) { - this.networkMonitor.structuredLogError( - job.network, - this.errorColor(`Decoding AvailableOperatorJobEvent error: `, error), - tags, - ) - } - + case EventType.AvailableOperatorJob: + await this.handleAvailableOperatorJobEvent(job, type, interestingTransaction, tags) + break + case EventType.FinishedOperatorJob: + await this.handleFinishedOperatorJobEvent(job, type, interestingTransaction, tags) + break + case EventType.FailedOperatorJob: + await this.handleFailedOperatorJobEvent(job, type, interestingTransaction, tags) break - } - case EventType.FinishedOperatorJob: { - try { - const finishedOperatorJobEvent: FinishedOperatorJobEvent | null = this.bloomFilters[ - type - ]!.bloomEvent.decode(type, interestingTransaction.log!) - if (finishedOperatorJobEvent !== null) { - this.networkMonitor.structuredLog( - job.network, - `Operator executed job ${finishedOperatorJobEvent.jobHash}`, - tags, - ) - // remove job from operatorJobs if it exists - if (finishedOperatorJobEvent.jobHash in this.operatorJobs) { - this.networkMonitor.structuredLog(job.network, `Removing job from list of available jobs`, tags) - delete this.operatorJobs[finishedOperatorJobEvent.jobHash] - } - - // update operator details, in case operator was selected for a job, or any data changed - this.networkMonitor.structuredLog(job.network, `Updating operator status`, tags) - await this.updateOperatorStatus(job.network) - } - } catch (error: any) { - this.networkMonitor.structuredLogError( - job.network, - this.errorColor(`Decoding FinishedOperatorJobEvent error: `, error), - tags, - ) - } + default: + this.networkMonitor.structuredLogError(job.network, `UNKNOWN EVENT`, tags) + } + } catch (error: any) { + this.networkMonitor.structuredLogError( + job.network, + this.errorColor(`Error processing transaction: `, error), + tags, + ) + } + } - break + async handleCrossChainMessageSentEvent( + job: BlockJob, + type: EventType, + interestingTransaction: InterestingTransaction, + tags: (string | number)[], + ) { + try { + const crossChainMessageSentEvent: CrossChainMessageSentEvent | null = this.bloomFilters[ + type + ]!.bloomEvent.decode(type, interestingTransaction.log!) + if (crossChainMessageSentEvent !== null) { + this.networkMonitor.structuredLog( + job.network, + `Bridge request found for job hash ${crossChainMessageSentEvent.messageHash}`, + tags, + ) + } + } catch (error: any) { + this.networkMonitor.structuredLogError( + job.network, + this.errorColor(`Decoding CrossChainMessageSentEvent error: `, error), + tags, + ) + } + } + + async handleAvailableOperatorJobEvent( + job: BlockJob, + type: EventType, + interestingTransaction: InterestingTransaction, + tags: (string | number)[], + ) { + try { + const availableOperatorJobEvent: AvailableOperatorJobEvent | null = this.bloomFilters[ + type + ]!.bloomEvent.decode(type, interestingTransaction.log!) + + if (availableOperatorJobEvent !== null) { + this.networkMonitor.structuredLog(job.network, `Found a new job ${availableOperatorJobEvent.jobHash}`, tags) + + // first update operator details, in case operator was selected for a job, or any data changed + this.networkMonitor.structuredLog(job.network, `Updating operator status`, tags) + const statusUpdateSuccessful = await this.updateOperatorStatus(job.network) + + if (!statusUpdateSuccessful) { + this.networkMonitor.structuredLogError( + job.network, + `Failed to update operator status. Proceeding with last known status for job ${availableOperatorJobEvent.jobHash}`, + tags, + ) } - case EventType.FailedOperatorJob: { - try { - const failedOperatorJobEvent: FailedOperatorJobEvent | null = this.bloomFilters[ - type - ]!.bloomEvent.decode(type, interestingTransaction.log!) - if (failedOperatorJobEvent !== null) { - this.networkMonitor.structuredLog( - job.network, - `Operator job finished but with failed code ${failedOperatorJobEvent.jobHash}`, - tags, - ) - } - } catch (error: any) { - this.networkMonitor.structuredLogError( - job.network, - this.errorColor(`Decoding FailedOperatorJobEvent error: `, error), - tags, - ) - } + // then add operator job to internal list of jobs to monitor and work on + this.networkMonitor.structuredLog(job.network, `Adding job to list of available jobs`, tags) + await this.decodeOperatorJob( + job.network, + availableOperatorJobEvent.jobHash, + availableOperatorJobEvent.payload, + tags, + ) + } + } catch (error: any) { + this.networkMonitor.structuredLogError( + job.network, + this.errorColor(`Decoding AvailableOperatorJobEvent error: `, error), + tags, + ) + } + } - break + async handleFinishedOperatorJobEvent( + job: BlockJob, + type: EventType, + interestingTransaction: InterestingTransaction, + tags: (string | number)[], + ) { + try { + const finishedOperatorJobEvent: FinishedOperatorJobEvent | null = this.bloomFilters[ + type + ]!.bloomEvent.decode(type, interestingTransaction.log!) + if (finishedOperatorJobEvent !== null) { + this.networkMonitor.structuredLog( + job.network, + `Operator executed job ${finishedOperatorJobEvent.jobHash}`, + tags, + ) + // remove job from operatorJobs if it exists + if (finishedOperatorJobEvent.jobHash in this.operatorJobs) { + this.networkMonitor.structuredLog(job.network, `Removing job from list of available jobs`, tags) + delete this.operatorJobs[finishedOperatorJobEvent.jobHash] } - default: { - this.networkMonitor.structuredLogError(job.network, `UNKNOWN EVENT`, tags) - break - } + // update operator details, in case operator was selected for a job, or any data changed + this.networkMonitor.structuredLog(job.network, `Updating operator status`, tags) + await this.updateOperatorStatus(job.network) } } catch (error: any) { this.networkMonitor.structuredLogError( job.network, - this.errorColor(`Error processing transaction: `, error), + this.errorColor(`Decoding FinishedOperatorJobEvent error: `, error), + tags, + ) + } + } + + async handleFailedOperatorJobEvent( + job: BlockJob, + type: EventType, + interestingTransaction: InterestingTransaction, + tags: (string | number)[], + ) { + try { + const failedOperatorJobEvent: FailedOperatorJobEvent | null = this.bloomFilters[ + type + ]!.bloomEvent.decode(type, interestingTransaction.log!) + if (failedOperatorJobEvent !== null) { + this.networkMonitor.structuredLog( + job.network, + `Operator job finished but with failed code ${failedOperatorJobEvent.jobHash}`, + tags, + ) + } + } catch (error: any) { + this.networkMonitor.structuredLogError( + job.network, + this.errorColor(`Decoding FailedOperatorJobEvent error: `, error), tags, ) } diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index d2d07782..9721bf3d 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -85,33 +85,49 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { operatorJobPayload: string, tags: (string | number)[], ): Promise { - const contract: Contract = this.networkMonitor.operatorContract.connect(this.networkMonitor.providers[network]) - const rawJobDetails: any[] = await contract.getJobDetails(operatorJobHash) - const jobDetails: OperatorJobDetails = { - pod: rawJobDetails[0] as number, - blockTimes: rawJobDetails[1] as number, - operator: (rawJobDetails[2] as string).toLowerCase(), - startBlock: rawJobDetails[3] as number, - startTimestamp: BigNumber.from(rawJobDetails[4]), - fallbackOperators: rawJobDetails[5] as number[], - } as OperatorJobDetails - if (jobDetails.startBlock > 0) { - // for some reason these logs dont have tag attached. + try { + const contract: Contract = this.networkMonitor.operatorContract.connect(this.networkMonitor.providers[network]) + + // Try to fetch job details + const rawJobDetails: any[] = await contract.getJobDetails(operatorJobHash) + + // Validate rawJobDetails before processing + if (!rawJobDetails || rawJobDetails.length < 6) { + throw new Error(`Invalid job details for job ${operatorJobHash}`) + } + + const jobDetails: OperatorJobDetails = { + pod: rawJobDetails[0] as number, + blockTimes: rawJobDetails[1] as number, + operator: (rawJobDetails[2] as string).toLowerCase(), + startBlock: rawJobDetails[3] as number, + startTimestamp: BigNumber.from(rawJobDetails[4]), + fallbackOperators: rawJobDetails[5] as number[], + } as OperatorJobDetails + + if (jobDetails.startBlock <= 0) { + throw new Error(`Invalid startBlock for job ${operatorJobHash}`) + } this.networkMonitor.structuredLog(network, `Decoded valid job ${operatorJobHash}`, tags) this.networkMonitor.structuredLog(network, `Selected operator for job is ${jobDetails.operator}`, tags) + const targetTime: number = this.getTargetTime(network, jobDetails) - // extract gasLimit and gasPrice from payload + + // Extract gasLimit and gasPrice from payload const gasLimit: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-128, -64)) this.networkMonitor.structuredLog(network, `Job gas limit is ${gasLimit.toNumber()}`, tags) + const gasPrice: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-64)) this.networkMonitor.structuredLog(network, `Job maximum gas price is ${formatUnits(gasPrice, 'gwei')} GWEI`, tags) + const remainingTime: number = Math.round((targetTime - Date.now()) / 1000) this.networkMonitor.structuredLog( network, `Job can be operated ${remainingTime <= 0 ? 'immediately' : 'in ' + remainingTime + ' seconds'}`, tags, ) + this.operatorJobs[operatorJobHash] = { network, hash: operatorJobHash, @@ -122,17 +138,12 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { jobDetails, tags, } as OperatorJob - // process.stdout.write('\n\n' + JSON.stringify(this.operatorJobs[operatorJobHash],undefined,2) + '\n\n') + return this.operatorJobs[operatorJobHash] + } catch (error: any) { + this.networkMonitor.structuredLogError(network, `Error decoding job ${operatorJobHash}: ${error.message}`, tags) + return undefined } - - this.networkMonitor.structuredLogError( - network, - `Could not decode job ${operatorJobHash} (invalid or already completed)`, - tags, - ) - - return undefined } updateJobTimes(): void { @@ -147,22 +158,62 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { which index position inside of the pod doe they hold (used for fallback operator calculations), the current pod size (used for fallback operator calculations). */ - async updateOperatorStatus(network: string): Promise { + async updateOperatorStatus(network: string): Promise { const contract: Contract = this.networkMonitor.operatorContract.connect(this.networkMonitor.providers[network]) - this.operatorStatus.active[network] = !BigNumber.from( - await contract.getBondedAmount(this.operatorStatus.address), - ).isZero() - this.operatorStatus.currentPod[network] = BigNumber.from( - await contract.getBondedPod(this.operatorStatus.address), - ).toNumber() - this.operatorStatus.podIndex[network] = BigNumber.from( - await contract.getBondedPodIndex(this.operatorStatus.address), - ).toNumber() + + // A flag indicating the success of all contract calls + let allCallsSuccessful = true + + const contractCall = async (method: () => Promise, errorMessage: string): Promise => { + try { + return await method() + } catch (error: any) { + this.networkMonitor.structuredLogError(network, errorMessage + ': ' + error) + allCallsSuccessful = false // Mark the flag as false on any error + return null + } + } + + const bondedAmount = await contractCall( + () => contract.getBondedAmount(this.operatorStatus.address), + 'Error getting Bonded Amount', + ) + + if (bondedAmount) { + this.operatorStatus.active[network] = !BigNumber.from(bondedAmount).isZero() + } + + const bondedPod = await contractCall( + () => contract.getBondedPod(this.operatorStatus.address), + 'Error getting Bonded Pod', + ) + + if (bondedPod) { + this.operatorStatus.currentPod[network] = BigNumber.from(bondedPod).toNumber() + } + + const bondedPodIndex = await contractCall( + () => contract.getBondedPodIndex(this.operatorStatus.address), + 'Error getting Bonded Pod Index', + ) + + if (bondedPodIndex) { + this.operatorStatus.podIndex[network] = BigNumber.from(bondedPodIndex).toNumber() + } + if (this.operatorStatus.currentPod[network] > 0) { - this.operatorStatus.podSize[network] = BigNumber.from( - await contract.getPodOperatorsLength(this.operatorStatus.currentPod[network]), - ).toNumber() + const podOperatorsLength = await contractCall( + () => contract.getPodOperatorsLength(this.operatorStatus.currentPod[network]), + 'Error getting Pod Operators Length', + ) + + if (podOperatorsLength) { + this.operatorStatus.podSize[network] = BigNumber.from(podOperatorsLength).toNumber() + } } + + // Return the final status of all contract calls + return allCallsSuccessful } async checkJobStatus(operatorJobHash: string, tags?: (string | number)[]): Promise { From 53b56b309e06efdac995849c979e780c6a0c2fbd Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 16 Aug 2023 17:15:50 -0400 Subject: [PATCH 02/35] Lint --- src/utils/operator-job.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 9721bf3d..3d58e9fd 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -158,7 +158,7 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { which index position inside of the pod doe they hold (used for fallback operator calculations), the current pod size (used for fallback operator calculations). */ - async updateOperatorStatus(network: string): Promise { + async updateOperatorStatus(network: string): Promise { const contract: Contract = this.networkMonitor.operatorContract.connect(this.networkMonitor.providers[network]) // A flag indicating the success of all contract calls From d031416f444ef7feebd56c9a7945a8009b384322 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 16 Aug 2023 21:28:25 -0400 Subject: [PATCH 03/35] Make suggested changes --- src/commands/operator/index.ts | 4 ++-- src/utils/operator-job.ts | 37 +++++++++++++++++++++++++++++----- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index b9453300..7d1526a1 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -413,7 +413,7 @@ export default class Operator extends OperatorJobAwareCommand { if (availableOperatorJobEvent !== null) { this.networkMonitor.structuredLog(job.network, `Found a new job ${availableOperatorJobEvent.jobHash}`, tags) - // first update operator details, in case operator was selected for a job, or any data changed + // First update operator details, in case operator was selected for a job, or any data changed this.networkMonitor.structuredLog(job.network, `Updating operator status`, tags) const statusUpdateSuccessful = await this.updateOperatorStatus(job.network) @@ -425,7 +425,7 @@ export default class Operator extends OperatorJobAwareCommand { ) } - // then add operator job to internal list of jobs to monitor and work on + // Then add operator job to internal list of jobs to monitor and work on this.networkMonitor.structuredLog(job.network, `Adding job to list of available jobs`, tags) await this.decodeOperatorJob( job.network, diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 3d58e9fd..7674dc84 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -128,6 +128,7 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { tags, ) + // Add job to list this.operatorJobs[operatorJobHash] = { network, hash: operatorJobHash, @@ -139,6 +140,12 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { tags, } as OperatorJob + this.networkMonitor.structuredLog( + network, + `Added job. Total jobs count: ${Object.keys(this.operatorJobs).length}`, + tags, + ) + return this.operatorJobs[operatorJobHash] } catch (error: any) { this.networkMonitor.structuredLogError(network, `Error decoding job ${operatorJobHash}: ${error.message}`, tags) @@ -216,17 +223,37 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { return allCallsSuccessful } - async checkJobStatus(operatorJobHash: string, tags?: (string | number)[]): Promise { - if (operatorJobHash !== undefined && operatorJobHash !== '' && operatorJobHash in this.operatorJobs) { - const job: OperatorJob = this.operatorJobs[operatorJobHash] - if ((await this.decodeOperatorJob(job.network, job.hash, job.payload, tags ?? ([] as string[]))) === undefined) { + async checkJobStatus(operatorJobHash: string, tags: (string | number)[] = []): Promise { + // First validate input (Network is not known until job is decoded) + if (!operatorJobHash || !(operatorJobHash in this.operatorJobs)) { + this.networkMonitor.structuredLogError('Unknown', `Invalid job hash provided: ${operatorJobHash}`, tags) + return + } + + // Fetch job from list + this.networkMonitor.structuredLog('Unknown', `Total jobs count: ${Object.keys(this.operatorJobs.length)}`, tags) + const job: OperatorJob = this.operatorJobs[operatorJobHash] + this.networkMonitor.structuredLog(job.network, `Checking status for job ${job.hash}.`, tags) + + // Then try to decode the job + try { + const decodedJob = await this.decodeOperatorJob(job.network, job.hash, job.payload, tags) + + // If the job is no longer valid, remove it from the list + if (decodedJob === undefined) { this.networkMonitor.structuredLogError( job.network, `Job ${job.hash} is no longer active/valid, removing it from list`, - tags ?? ([] as string[]), + tags, ) delete this.operatorJobs[job.hash] } + } catch (error: any) { + this.networkMonitor.structuredLogError( + job.network, + `Error while checking job ${job.hash}: ${error.message}`, + tags, + ) } } } From e48002c342b116d939d82f3d3d6df320370c99eb Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 16 Aug 2023 21:37:47 -0400 Subject: [PATCH 04/35] Downgrade to prettier 3.3 on CI/CD --- .github/workflows/enforce_prettier.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/enforce_prettier.yml b/.github/workflows/enforce_prettier.yml index a05b29a5..7e830b8a 100644 --- a/.github/workflows/enforce_prettier.yml +++ b/.github/workflows/enforce_prettier.yml @@ -28,7 +28,7 @@ jobs: persist-credentials: false - name: Prettify code - uses: creyD/prettier_action@v4.3 + uses: creyD/prettier_action@v3.3 with: prettier_options: --check src/**/*.+(js|ts) dry: True From f51f8ec71cb78e863579e308982ea9f2586ac186 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 16:22:39 -0400 Subject: [PATCH 05/35] Refactor operator with better logging, error handling, organization, locking --- src/commands/operator/index.ts | 401 +++++++++++++++++++-------------- 1 file changed, 228 insertions(+), 173 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 7d1526a1..ab460324 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -79,15 +79,18 @@ export default class Operator extends OperatorJobAwareCommand { ...HealthCheck.flags, } + private processingJobsForNetworks: {[network: string]: boolean} = {} + private isJobBeingExecuted: {[jobHash: string]: boolean} = {} + // API Params BASE_URL?: string apiService!: ApiService apiColor = color.keyword('orange') errorColor = color.keyword('red') bloomFilters!: BloomFilterMap + updateBlockHeight!: string environment!: Environment - legacyBlocks = true /** @@ -100,78 +103,98 @@ export default class Operator extends OperatorJobAwareCommand { * Command Entry Point */ async run(): Promise { - this.log(`Operator command has begun!!!`) - const {flags} = await this.parse(Operator) - this.BASE_URL = flags.host - const enableHealthCheckServer = flags.healthCheck - const healthCheckPort = flags.healthCheckPort - let updateBlockHeight = flags.updateBlockHeight - const syncFlag = flags.sync - const processBlockRange = flags['process-block-range'] - this.legacyBlocks = !processBlockRange - const unsafePassword = flags.unsafePassword - - this.operatorMode = - OperatorMode[ - (await checkOptionFlag( - Object.values(OperatorMode), - flags.mode, - 'Select the mode in which to run the operator', - )) as keyof typeof OperatorMode - ] - - this.log(`Operator mode: ${this.operatorMode}`) + try { + this.log(`Operator command has begun!!!`) + const {flags} = await this.parse(Operator) + this.BASE_URL = flags.host + this.updateBlockHeight = flags.updateBlockHeight + const processBlockRange = flags['process-block-range'] + this.legacyBlocks = !processBlockRange - this.log('Loading user configurations...') - const {environment, userWallet, configFile} = await ensureConfigFileIsValid( - this.config.configDir, - unsafePassword, - true, - ) - this.log('User configurations loaded.') + this.operatorMode = await this.setOperatorMode(flags.mode) + + const {environment, userWallet, configFile} = await this.loadConfigurations(flags.unsafePassword) + this.environment = environment + + if (flags.replay !== '0') { + this.log('Replay flag enabled, will not load or save block heights.') + this.updateBlockHeight = BlockHeightOptions.DISABLE + } + + await this.authenticateApi() + this.initializeNetworkMonitor(flags, userWallet, configFile) + await this.manageBlockHeights(flags) + this.setApiServiceLogs() + + if (!(await shouldSync(flags.sync, this.networkMonitor.latestBlockHeight))) { + this.resetBlockHeights() + } - this.environment = environment + this.operatorStatus.address = userWallet.address.toLowerCase() + this.networkMonitor.exitCallback = this.exitCallback.bind(this) - if (flags.replay !== '0') { - this.log('Replay flag enabled, will not load or save block heights.') - updateBlockHeight = BlockHeightOptions.DISABLE + await this.startNetworkMonitor(flags) + await this.processSavedJobs() + this.scheduleJobsProcessing() + + if (flags.healthCheck) { + await this.startHealthCheckServer(flags.healthCheckPort || 6000) + } + + this.log(`Operator started running successfully.`) + } catch (error) { + this.handleError('An error occurred in the run method', error) } + } - if ( - this.BASE_URL !== undefined && - updateBlockHeight !== undefined && - updateBlockHeight === BlockHeightOptions.API - ) { + async loadConfigurations(unsafePassword: any): Promise<{environment: any; userWallet: any; configFile: any}> { + this.log('Loading user configurations...') + const configurations = await ensureConfigFileIsValid(this.config.configDir, unsafePassword, true) + this.log('User configurations loaded.') + return configurations + } + + async authenticateApi(): Promise { + if (this.BASE_URL && this.updateBlockHeight === BlockHeightOptions.API) { if (this.environment === Environment.experimental || this.environment === Environment.localhost) { this.log(`Skipping API authentication for ${Environment[this.environment]} environment`) } else { this.log(`Using API for block height track ...`) - // Create API Service for GraphQL requests + const logger: Logger = { + log: this.log, + warn: this.warn, + debug: this.debug, + error: this.error, + jsonEnabled: () => false, + } try { - const logger: Logger = { - log: this.log, - warn: this.warn, - debug: this.debug, - error: this.error, - jsonEnabled: () => false, - } - this.apiService = new ApiService(this.BASE_URL!, logger) + this.apiService = new ApiService(this.BASE_URL, logger) await this.apiService.operatorLogin() + this.log(this.apiColor(`Successfully authenticated into API ${this.BASE_URL}`)) } catch (error: any) { - this.log('Error: Failed to get Operator Token from API') - // NOTE: sample of how to do logs when in production mode - this.log(JSON.stringify({...error, stack: error.stack})) - this.exit() - } - - if (this.apiService === undefined) { - throw new Error('API service is not defined') + this.handleError('Failed to get Operator Token from API', error) } - - this.log(this.apiColor(`Successfully authenticated into API ${flags.host}`)) } } + } + + handleError(message: string, error: any): void { + this.log(`Error: ${message}`) + this.log(JSON.stringify({...error, stack: error.stack})) + this.exit() + } + async setOperatorMode(mode: any): Promise { + return OperatorMode[ + (await checkOptionFlag( + Object.values(OperatorMode), + mode, + 'Select the mode in which to run the operator', + )) as keyof typeof OperatorMode + ] + } + + initializeNetworkMonitor(flags: any, userWallet: any, configFile: any): void { this.networkMonitor = new NetworkMonitor({ enableV2: true, parent: this, @@ -183,18 +206,18 @@ export default class Operator extends OperatorJobAwareCommand { lastBlockFilename: 'operator-blocks.json', replay: flags.replay, apiService: this.apiService, - BlockHeightOptions: updateBlockHeight as BlockHeightOptions, - processBlockRange: processBlockRange, + BlockHeightOptions: this.updateBlockHeight as BlockHeightOptions, + processBlockRange: flags['process-block-range'], }) - this.jobsFile = path.join(this.config.configDir, this.networkMonitor.environment + '.operator-job-details.json') + } - switch (updateBlockHeight) { + async manageBlockHeights(flags: any): Promise { + switch (this.updateBlockHeight) { case BlockHeightOptions.API: if (flags.host === undefined) { this.errorColor(`--blockHeight flag option API requires the --host flag`) } - this.networkMonitor.latestBlockHeight = await this.networkMonitor.loadLastBlocksHeights( BlockHeightProcessType.OPERATOR, ) @@ -208,57 +231,66 @@ export default class Operator extends OperatorJobAwareCommand { this.networkMonitor.currentBlockHeight = {} break } + } + setApiServiceLogs(): void { if (this.apiService !== undefined) { this.apiService.setStructuredLog(this.networkMonitor.structuredLog.bind(this.networkMonitor)) this.apiService.setStructuredLogError(this.networkMonitor.structuredLogError.bind(this.networkMonitor)) } + } - if ((await shouldSync(syncFlag, this.networkMonitor.latestBlockHeight)) === false) { - this.networkMonitor.latestBlockHeight = {} - this.networkMonitor.currentBlockHeight = {} - } - - this.operatorStatus.address = userWallet.address.toLowerCase() - - this.networkMonitor.exitCallback = this.exitCallback.bind(this) + resetBlockHeights(): void { + this.networkMonitor.latestBlockHeight = {} + this.networkMonitor.currentBlockHeight = {} + } + async startNetworkMonitor(flags: any): Promise { CliUx.ux.action.start(`Starting operator in mode: ${OperatorMode[this.operatorMode]}`) - const continuous = flags.replay === '0' // If replay is set, run network monitor stops after catching up to the latest block + const continuous = flags.replay === '0' await this.networkMonitor.run(continuous, undefined, this.filterBuilder2) CliUx.ux.action.stop('🚀') + } - // check if file exists - if (await fs.pathExists(this.jobsFile)) { - this.log('Saved jobs file exists, parsing it for valid/active jobs.') - // if file exists, need to add it to list of jobs to process - this.operatorJobs = (await fs.readJson(this.jobsFile)) as {[key: string]: OperatorJob} - // need to check each job and make sure it's still valid - for (const jobHash of Object.keys(this.operatorJobs)) { - this.operatorJobs[jobHash].gasLimit = BigNumber.from(this.operatorJobs[jobHash].gasLimit) - this.operatorJobs[jobHash].gasPrice = BigNumber.from(this.operatorJobs[jobHash].gasPrice) - this.operatorJobs[jobHash].jobDetails.startTimestamp = BigNumber.from( - this.operatorJobs[jobHash].jobDetails.startTimestamp, - ) - // if job is still valid, it will stay in object, otherwise it will be removed - // Tags not passed in because they do not exist - // Maybe save tags with the job hash so we can pass it back in here - await this.checkJobStatus(jobHash) + async processSavedJobs(): Promise { + try { + // Check if file exists + if (await fs.pathExists(this.jobsFile)) { + this.log('Saved jobs file exists, parsing it for valid/active jobs.') + // if file exists, need to add it to list of jobs to process + this.operatorJobs = (await fs.readJson(this.jobsFile)) as {[key: string]: OperatorJob} + + // Need to check each job and make sure it's still valid + for (const jobHash of Object.keys(this.operatorJobs)) { + this.operatorJobs[jobHash].gasLimit = BigNumber.from(this.operatorJobs[jobHash].gasLimit) + this.operatorJobs[jobHash].gasPrice = BigNumber.from(this.operatorJobs[jobHash].gasPrice) + this.operatorJobs[jobHash].jobDetails.startTimestamp = BigNumber.from( + this.operatorJobs[jobHash].jobDetails.startTimestamp, + ) + + // If job is still valid, it will stay in object, otherwise it will be removed + await this.checkJobStatus(jobHash) + this.log('Saved jobs parsing completed.') + } + } else { + this.log('Saved jobs file not found (not loaded).') } - } else { - this.log('Saved jobs file not found (not loaded).') + } catch (error) { + this.handleError('An error occurred while processing saved jobs', error) } + } + scheduleJobsProcessing(): void { for (const network of this.networkMonitor.networks) { - // instantiate all network operator job watchers - setTimeout(this.processOperatorJobs.bind(this, network), 60_000) + // Instantiate all network operator job watchers + setTimeout(this.processOperatorJobs.bind(this, network), 60_000) // Run every 60 seconds } + } - // Start health check server on port 6000 or healthCheckPort + async startHealthCheckServer(port: number): Promise { + // Start health check server // Can be used to monitor that the operator is online and running - if (enableHealthCheckServer) { - await this.config.runHook('healthCheck', {networkMonitor: this.networkMonitor, healthCheckPort}) - } + await this.config.runHook('healthCheck', {networkMonitor: this.networkMonitor, port}) } exitCallback(): void { @@ -519,83 +551,101 @@ export default class Operator extends OperatorJobAwareCommand { } } - // This method is call to cycle through all operator jobs that were previously detected processOperatorJobs = (network: string, jobHash?: string): void => { - // IF processOperatorJobs has a jobHash, delete it from this.operatorJobs? Why do this here? why not delete it before? - if (jobHash !== undefined && jobHash !== '' && jobHash in this.operatorJobs) { - delete this.operatorJobs[jobHash] + // NOTE: It is possible that with only a 1 second delay before recalling this function via setTimeout + // on the same network, it could interupt the current process before it completes + // + // This lock is put in place to prevent race conditions and / or concurrency issues and ensure that the + // current processing is complete before new jobs are processed + if (this.processingJobsForNetworks[network]) { + this.log(`Previous job processing for network: ${network} still in progress, skipping this cycle.`) + return } - const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] - let highestGas: BigNumber = BigNumber.from('0') - const now: number = Date.now() - // update wait times really quickly - this.updateJobTimes() - // DO LOGIC HERE FOR FINDING VALID JOB - const jobs: OperatorJob[] = [] - // extract jobs for network - for (const job of Object.values(this.operatorJobs)) { - if (job.network === network) { - jobs.push(job) - } - } + try { + this.processingJobsForNetworks[network] = true + this.log(`Starting job processing for network: ${network}.`) - // sort jobs based on target time, to prioritize ones that need to be finished first - jobs.sort((a: OperatorJob, b: OperatorJob): number => { - return a.targetTime - b.targetTime - }) - const candidates: OperatorJob[] = [] - for (const job of jobs) { - // check that time is within scope - if (job.targetTime < now) { - // add to list of candidates - candidates.push(job) - // find highest gas candidate first - if (BigNumber.from(job.gasPrice).gt(highestGas)) { - highestGas = BigNumber.from(job.gasPrice) - } + // If a specific jobHash is provided, delete it. + // NOTE: This logic might be better placed elsewhere. + if (jobHash && this.operatorJobs[jobHash]) { + delete this.operatorJobs[jobHash] } - } - if (candidates.length > 0) { - // sort candidates by gas priority - // returning highest gas first - candidates.sort((a: OperatorJob, b: OperatorJob): number => { - return BigNumber.from(b.gasPrice).sub(BigNumber.from(a.gasPrice)).toNumber() - }) - const compareGas: BigNumber = gasPricing.isEip1559 ? gasPricing.nextBlockFee! : gasPricing.gasPrice! - let foundCandidate = false - for (const candidate of candidates) { - if (BigNumber.from(candidate.gasPrice).gte(compareGas)) { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - const tags = this.operatorJobs[candidate.hash].tags ?? [this.networkMonitor.randomTag()] - this.networkMonitor.structuredLog(network, `Sending job ${candidate.hash} for execution`, tags) - // have a valid job to do right away - this.processOperatorJob(network, candidate.hash, tags) - foundCandidate = true - break - } + const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] + if (!gasPricing) { + this.networkMonitor.structuredLogError(network, `Missing gas pricing data for network ${network}`) + return } - if (!foundCandidate) { + this.updateJobTimes() + const jobs: OperatorJob[] = Object.values(this.operatorJobs).filter(job => job.network === network) + const sortedJobs = this.sortJobsByPriority(jobs) + const chosenJob = this.selectJob(sortedJobs, gasPricing) + + if (chosenJob) { + const tags = this.operatorJobs[chosenJob.hash]?.tags ?? [this.networkMonitor.randomTag()] + this.networkMonitor.structuredLog(network, `Sending job ${chosenJob.hash} for execution`, tags) + this.processOperatorJob(network, chosenJob.hash, tags) + } else { setTimeout(this.processOperatorJobs.bind(this, network), 1000) } - } else { - setTimeout(this.processOperatorJobs.bind(this, network), 1000) + this.log(`Job processing for network: ${network} completed.`) + } catch (error) { + this.handleError(`An error occurred while processing jobs for network: ${network}`, error) + } finally { + this.processingJobsForNetworks[network] = false } } + // This function sorts jobs based on target time and then by gas price. + sortJobsByPriority(jobs: OperatorJob[]): OperatorJob[] { + const now = Date.now() + const validJobs = jobs.filter(job => job.targetTime < now) + return validJobs.sort((a, b) => { + const timeDiff = a.targetTime - b.targetTime + if (timeDiff !== 0) return timeDiff + return BigNumber.from(b.gasPrice).sub(BigNumber.from(a.gasPrice)).toNumber() + }) + } + + // This function selects the best job based on the provided gas pricing. + selectJob(jobs: OperatorJob[], gasPricing: GasPricing): OperatorJob | null { + const compareGas: BigNumber = gasPricing.isEip1559 ? gasPricing.nextBlockFee! : gasPricing.gasPrice! + for (const job of jobs) { + if (BigNumber.from(job.gasPrice).gte(compareGas)) { + return job + } + } + return null + } + /** * Execute the job */ async executeJob(jobHash: string, tags: (string | number)[]): Promise { - // quickly check that job is still valid - await this.checkJobStatus(jobHash, tags) - if (jobHash in this.operatorJobs) { + try { + // Idempotency check + if (this.isJobBeingExecuted[jobHash]) { + this.log('Job is already being executed', tags) + + return false + } + + this.isJobBeingExecuted[jobHash] = true + + // Check job status + await this.checkJobStatus(jobHash, tags) + + if (!(jobHash in this.operatorJobs)) { + return true + } + const job: OperatorJob = this.operatorJobs[jobHash] const network: string = job.network let operate = this.operatorMode === OperatorMode.auto + + // Operator mode handling if (this.operatorMode === OperatorMode.manual) { const operatorPrompt: any = await inquirer.prompt([ { @@ -608,31 +658,36 @@ export default class Operator extends OperatorJobAwareCommand { operate = operatorPrompt.shouldContinue } - if (operate) { - const receipt: TransactionReceipt | null = await this.networkMonitor.executeTransaction({ - network, - tags, - contract: this.networkMonitor.operatorContract, - methodName: 'executeJob', - args: [job.payload], - gasPrice: BigNumber.from(job.gasPrice), - gasLimit: BigNumber.from(job.gasLimit).mul(BigNumber.from('2')), - canFail: true, - waitForReceipt: true, - interval: 5000, - attempts: 30, - }) - if (receipt !== null && receipt.status === 1) { - delete this.operatorJobs[jobHash] - } + if (!operate) { + this.networkMonitor.structuredLog(network, 'Available job will not be executed', tags) + return false + } - return receipt !== null + // Transaction handling + const receipt: TransactionReceipt | null = await this.networkMonitor.executeTransaction({ + network, + tags, + contract: this.networkMonitor.operatorContract, + methodName: 'executeJob', + args: [job.payload], + gasPrice: BigNumber.from(job.gasPrice), + gasLimit: BigNumber.from(job.gasLimit).mul(BigNumber.from('2')), + canFail: true, + waitForReceipt: true, + interval: 5000, + attempts: 30, + }) + + if (receipt && receipt.status === 1) { + delete this.operatorJobs[jobHash] } - this.networkMonitor.structuredLog(network, 'Available job will not be executed', tags) + return receipt !== null + } catch (error: any) { + this.networkMonitor.structuredLogError(`An error occurred while executing job: ${jobHash}`, error) return false + } finally { + this.isJobBeingExecuted[jobHash] = false } - - return true } } From 6ab10a9f016e764982a99ac67ada36cda0939082 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 16:24:19 -0400 Subject: [PATCH 06/35] Lint --- src/commands/operator/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index ab460324..8721082c 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -218,6 +218,7 @@ export default class Operator extends OperatorJobAwareCommand { if (flags.host === undefined) { this.errorColor(`--blockHeight flag option API requires the --host flag`) } + this.networkMonitor.latestBlockHeight = await this.networkMonitor.loadLastBlocksHeights( BlockHeightProcessType.OPERATOR, ) @@ -590,6 +591,7 @@ export default class Operator extends OperatorJobAwareCommand { } else { setTimeout(this.processOperatorJobs.bind(this, network), 1000) } + this.log(`Job processing for network: ${network} completed.`) } catch (error) { this.handleError(`An error occurred while processing jobs for network: ${network}`, error) @@ -617,6 +619,7 @@ export default class Operator extends OperatorJobAwareCommand { return job } } + return null } From 46a441fadec59d0fd480ab81f47e10e46cc2cfb3 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 16:46:59 -0400 Subject: [PATCH 07/35] Add more logging to processOperatorJob --- src/commands/operator/index.ts | 35 +++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 8721082c..d3f92f3f 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -284,7 +284,7 @@ export default class Operator extends OperatorJobAwareCommand { scheduleJobsProcessing(): void { for (const network of this.networkMonitor.networks) { // Instantiate all network operator job watchers - setTimeout(this.processOperatorJobs.bind(this, network), 60_000) // Run every 60 seconds + setTimeout(this.processOperatorJobs.bind(this, network), 60_000) // Wait 60 seconds before processing jobs starts } } @@ -538,16 +538,37 @@ export default class Operator extends OperatorJobAwareCommand { } processOperatorJob = async (network: string, jobHash: string, tags: (string | number)[]): Promise => { - // if success then pass back payload hash to remove it from list - if (await this.executeJob(jobHash, tags)) { - // check job status just in case + this.networkMonitor.structuredLog(network, `Beginning to process job with hash: ${jobHash}`, tags) + + const isJobExecutedSuccessfully = await this.executeJob(jobHash, tags) + + if (isJobExecutedSuccessfully) { + this.networkMonitor.structuredLog( + network, + `Job with hash: ${jobHash} was executed successfully. Checking its status...`, + tags, + ) await this.checkJobStatus(jobHash, tags) - // job was a success + + this.networkMonitor.structuredLog( + network, + `Reprocessing operator jobs after successful execution of job: ${jobHash}`, + tags, + ) this.processOperatorJobs(network, jobHash) // here the jobHash will be deleted } else { - // check job status just in case + this.networkMonitor.structuredLog( + network, + `Job with hash: ${jobHash} failed to execute. Checking its status...`, + tags, + ) await this.checkJobStatus(jobHash, tags) - // job failed, gotta try again + + this.networkMonitor.structuredLog( + network, + `Reprocessing operator jobs after failed execution of job: ${jobHash}`, + tags, + ) this.processOperatorJobs(network) } } From 54fda4fe2d968a566380d05c0b49b1f4230b33f7 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 17:24:44 -0400 Subject: [PATCH 08/35] Add a bit more logging in the executeJob function --- src/commands/operator/index.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index d3f92f3f..c9304237 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -574,6 +574,7 @@ export default class Operator extends OperatorJobAwareCommand { } processOperatorJobs = (network: string, jobHash?: string): void => { + this.log(`Starting job processing for network: ${network}.`) // NOTE: It is possible that with only a 1 second delay before recalling this function via setTimeout // on the same network, it could interupt the current process before it completes // @@ -586,7 +587,7 @@ export default class Operator extends OperatorJobAwareCommand { try { this.processingJobsForNetworks[network] = true - this.log(`Starting job processing for network: ${network}.`) + this.log(`Continue job processing for network: ${network}.`) // If a specific jobHash is provided, delete it. // NOTE: This logic might be better placed elsewhere. @@ -617,6 +618,7 @@ export default class Operator extends OperatorJobAwareCommand { } catch (error) { this.handleError(`An error occurred while processing jobs for network: ${network}`, error) } finally { + this.log(`Resetting lock on processOperatorJobs`) this.processingJobsForNetworks[network] = false } } @@ -648,6 +650,7 @@ export default class Operator extends OperatorJobAwareCommand { * Execute the job */ async executeJob(jobHash: string, tags: (string | number)[]): Promise { + this.log(`Starting execute job`) try { // Idempotency check if (this.isJobBeingExecuted[jobHash]) { @@ -659,9 +662,11 @@ export default class Operator extends OperatorJobAwareCommand { this.isJobBeingExecuted[jobHash] = true // Check job status + this.log(`Checking job status...`) await this.checkJobStatus(jobHash, tags) if (!(jobHash in this.operatorJobs)) { + this.log(`Job hash is not in the operator jobs... returning`) return true } @@ -683,11 +688,12 @@ export default class Operator extends OperatorJobAwareCommand { } if (!operate) { - this.networkMonitor.structuredLog(network, 'Available job will not be executed', tags) + this.networkMonitor.structuredLog(network, 'Not in mode to execute. Available job will not be executed', tags) return false } // Transaction handling + this.log(`About to execute the transaction`) const receipt: TransactionReceipt | null = await this.networkMonitor.executeTransaction({ network, tags, @@ -703,6 +709,7 @@ export default class Operator extends OperatorJobAwareCommand { }) if (receipt && receipt.status === 1) { + this.log(`Execution succeeded. Removing job ${jobHash} from the operator jobs queue`) delete this.operatorJobs[jobHash] } @@ -711,6 +718,8 @@ export default class Operator extends OperatorJobAwareCommand { this.networkMonitor.structuredLogError(`An error occurred while executing job: ${jobHash}`, error) return false } finally { + // TODO: We might need to just delete the finished job hashes so they don't build up + this.log(`Removing lock on job hash`) this.isJobBeingExecuted[jobHash] = false } } From 6398fb90fd967542710b9cf2b321086fa49e1962 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 17:32:16 -0400 Subject: [PATCH 09/35] Add a trace to the exitRouter --- src/utils/network-monitor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index 7db455e6..57dbab8a 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -833,6 +833,7 @@ export class NetworkMonitor { } else { this.debug('exitRouter triggered') this.debug(`\nError: ${exitCode}`) + console.trace() } } From 89b969e8772c49c0ed161081372fffeda4b79d9a Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 18:04:09 -0400 Subject: [PATCH 10/35] Add more logging. Disable deleting operator job --- src/commands/operator/index.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index c9304237..2eb3a26a 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -575,6 +575,8 @@ export default class Operator extends OperatorJobAwareCommand { processOperatorJobs = (network: string, jobHash?: string): void => { this.log(`Starting job processing for network: ${network}.`) + this.log(`Current job count: ${Object.keys(this.operatorJobs).length}`) + // NOTE: It is possible that with only a 1 second delay before recalling this function via setTimeout // on the same network, it could interupt the current process before it completes // @@ -587,30 +589,39 @@ export default class Operator extends OperatorJobAwareCommand { try { this.processingJobsForNetworks[network] = true - this.log(`Continue job processing for network: ${network}.`) + this.log(`Continue job processing for network: ${network}. Current job hash: ${jobHash}`) + this.log(`NOTE: Deleting job is currently disabled`) // If a specific jobHash is provided, delete it. // NOTE: This logic might be better placed elsewhere. - if (jobHash && this.operatorJobs[jobHash]) { - delete this.operatorJobs[jobHash] - } + // if (jobHash && this.operatorJobs[jobHash]) { + // delete this.operatorJobs[jobHash] + // } + this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { this.networkMonitor.structuredLogError(network, `Missing gas pricing data for network ${network}`) return } + this.log(`Updating job times`) this.updateJobTimes() const jobs: OperatorJob[] = Object.values(this.operatorJobs).filter(job => job.network === network) + + this.log(`Sorting jobs by priority`) const sortedJobs = this.sortJobsByPriority(jobs) + + this.log(`Selecting job`) const chosenJob = this.selectJob(sortedJobs, gasPricing) if (chosenJob) { + this.log(`Chosen job: ${chosenJob?.hash}`) const tags = this.operatorJobs[chosenJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${chosenJob.hash} for execution`, tags) this.processOperatorJob(network, chosenJob.hash, tags) } else { + this.log(`No job selected. Waiting 1 second before trying again.`) setTimeout(this.processOperatorJobs.bind(this, network), 1000) } From 495f1e7fc4ba44d18cc2aa1a91f92e9460e36fcc Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 18:11:13 -0400 Subject: [PATCH 11/35] Rearrange the logic the deletes the job from the list --- src/commands/operator/index.ts | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 2eb3a26a..29d1cf7f 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -591,13 +591,6 @@ export default class Operator extends OperatorJobAwareCommand { this.processingJobsForNetworks[network] = true this.log(`Continue job processing for network: ${network}. Current job hash: ${jobHash}`) - this.log(`NOTE: Deleting job is currently disabled`) - // If a specific jobHash is provided, delete it. - // NOTE: This logic might be better placed elsewhere. - // if (jobHash && this.operatorJobs[jobHash]) { - // delete this.operatorJobs[jobHash] - // } - this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { @@ -613,13 +606,18 @@ export default class Operator extends OperatorJobAwareCommand { const sortedJobs = this.sortJobsByPriority(jobs) this.log(`Selecting job`) - const chosenJob = this.selectJob(sortedJobs, gasPricing) + const selectedJob = this.selectJob(sortedJobs, gasPricing) + + if (selectedJob) { + this.log(`Chosen job: ${selectedJob?.hash}`) + const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] + this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) + this.processOperatorJob(network, selectedJob.hash, tags) - if (chosenJob) { - this.log(`Chosen job: ${chosenJob?.hash}`) - const tags = this.operatorJobs[chosenJob.hash]?.tags ?? [this.networkMonitor.randomTag()] - this.networkMonitor.structuredLog(network, `Sending job ${chosenJob.hash} for execution`, tags) - this.processOperatorJob(network, chosenJob.hash, tags) + // We can now delete the job hash from the list of jobs being processed + if (jobHash && this.operatorJobs[jobHash]) { + delete this.operatorJobs[jobHash] + } } else { this.log(`No job selected. Waiting 1 second before trying again.`) setTimeout(this.processOperatorJobs.bind(this, network), 1000) From 4b49d114c0e2d49ca60bc7f5d771977760d0bd3c Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 18:19:19 -0400 Subject: [PATCH 12/35] Add a little more logging around gas pricing and selecting the job --- src/commands/operator/index.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 29d1cf7f..06335b0b 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -646,12 +646,23 @@ export default class Operator extends OperatorJobAwareCommand { // This function selects the best job based on the provided gas pricing. selectJob(jobs: OperatorJob[], gasPricing: GasPricing): OperatorJob | null { const compareGas: BigNumber = gasPricing.isEip1559 ? gasPricing.nextBlockFee! : gasPricing.gasPrice! + + let totalGas: BigNumber = BigNumber.from(0) for (const job of jobs) { + totalGas = totalGas.add(BigNumber.from(job.gasPrice)) if (BigNumber.from(job.gasPrice).gte(compareGas)) { return job } } + // Calculate average gas price. + const averageGasPrice: BigNumber = jobs.length > 0 ? totalGas.div(jobs.length) : BigNumber.from(0) + + this.log( + `None of the jobs in queue can be executed with the current gas pricing. ${ + jobs.length + } jobs in queue. Gas price: ${compareGas.toString()}. Average gas provided: ${averageGasPrice.toString()}`, + ) return null } From cf2d25e0a96aa70727784217b6a44fc763be32e4 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 18:23:56 -0400 Subject: [PATCH 13/35] Just select the first job if there is one --- src/commands/operator/index.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 06335b0b..98946be6 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -605,8 +605,13 @@ export default class Operator extends OperatorJobAwareCommand { this.log(`Sorting jobs by priority`) const sortedJobs = this.sortJobsByPriority(jobs) - this.log(`Selecting job`) - const selectedJob = this.selectJob(sortedJobs, gasPricing) + // this.log(`Selecting job`) + // const selectedJob = this.selectJob(sortedJobs, gasPricing) + // TEMPORARY: Select the first job in the list + let selectedJob: OperatorJob | null = null + if (sortedJobs.length > 0) { + selectedJob = sortedJobs[0] + } if (selectedJob) { this.log(`Chosen job: ${selectedJob?.hash}`) From f861ce6f5ec4b6285ad707990e4dca9e739efe42 Mon Sep 17 00:00:00 2001 From: Alexander Date: Thu, 17 Aug 2023 23:58:31 -0400 Subject: [PATCH 14/35] Bubble up uncaughtExceptions --- src/utils/network-monitor.ts | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index 57dbab8a..f9f74b2e 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -566,11 +566,17 @@ export class NetworkMonitor { }, 10_000) } - // Catch all exit events - for (const eventType of [`EEXIT`, `SIGINT`, `SIGUSR1`, `SIGUSR2`, `uncaughtException`, `SIGTERM`]) { + // Catch signals for graceful shutdown + for (const eventType of [`SIGINT`, `SIGUSR1`, `SIGUSR2`, `SIGTERM`]) { process.on(eventType, this.exitRouter.bind(this, {exit: true})) } + // Uncaught exceptions: log and then allow the process to exit. + process.on('uncaughtException', error => { + console.error('Uncaught Exception:', error) + this.exitRouter({exit: true}, error.message) + }) + process.on('exit', this.exitHandler) } @@ -804,7 +810,7 @@ export class NetworkMonitor { } } - exitRouter = (options: {[key: string]: boolean | string | number}, exitCode: number | string): void => { + exitRouter = (options: {[key: string]: boolean | string | number}, exitCode: number | string | Error): void => { /** * Before exit, save the block heights to the local db */ @@ -826,17 +832,18 @@ export class NetworkMonitor { if (this.exitCallback !== undefined) { this.exitCallback() } - - // eslint-disable-next-line no-process-exit, unicorn/no-process-exit process.exit() } + } else if (exitCode instanceof Error) { + // This handles the case where the exitCode is actually an Error object. + this.debug(`Uncaught exception: ${exitCode.message}`) + console.error(exitCode.stack) // This will print the stack trace of the actual error. } else { this.debug('exitRouter triggered') this.debug(`\nError: ${exitCode}`) - console.trace() + console.trace() // This is the trace for the exitRouter call } } - monitorBuilder: (network: string) => () => void = (network: string): (() => void) => { return () => { this.blockJobMonitor.bind(this)(network) From 0b787e6174c04a9942dbdacd88e13a36975d3954 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 00:11:01 -0400 Subject: [PATCH 15/35] Lint --- src/utils/network-monitor.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index f9f74b2e..b6ce9d80 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -832,6 +832,8 @@ export class NetworkMonitor { if (this.exitCallback !== undefined) { this.exitCallback() } + + // eslint-disable-next-line no-process-exit, unicorn/no-process-exit process.exit() } } else if (exitCode instanceof Error) { @@ -844,6 +846,7 @@ export class NetworkMonitor { console.trace() // This is the trace for the exitRouter call } } + monitorBuilder: (network: string) => () => void = (network: string): (() => void) => { return () => { this.blockJobMonitor.bind(this)(network) From cff231c305ffd74b491f82276c2f21663d35374d Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 00:34:22 -0400 Subject: [PATCH 16/35] Found the cause of the TypeError: Cannot read properties of undefined (reading 'name') --- src/commands/operator/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 98946be6..8a5cf14e 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -740,7 +740,9 @@ export default class Operator extends OperatorJobAwareCommand { return receipt !== null } catch (error: any) { - this.networkMonitor.structuredLogError(`An error occurred while executing job: ${jobHash}`, error) + // Network might not have been extracted from the job if there was an error so it is unknown + console.error('Original Error:', error) + this.networkMonitor.structuredLogError('Unknown', `An error occurred while executing job: ${jobHash}`, error) return false } finally { // TODO: We might need to just delete the finished job hashes so they don't build up From 9b0cb82efb6531c47252bb3c7f6cf9c3b099fbea Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 01:13:03 -0400 Subject: [PATCH 17/35] Allow unknown network in our structuredLog functions so they don't panic if we don't have the network --- src/commands/operator/index.ts | 2 +- src/utils/network-monitor.ts | 25 +++++++++++++++++-------- src/utils/operator-job.ts | 4 ++-- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 8a5cf14e..d1d7e44c 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -742,7 +742,7 @@ export default class Operator extends OperatorJobAwareCommand { } catch (error: any) { // Network might not have been extracted from the job if there was an error so it is unknown console.error('Original Error:', error) - this.networkMonitor.structuredLogError('Unknown', `An error occurred while executing job: ${jobHash}`, error) + this.networkMonitor.structuredLogError(undefined, `An error occurred while executing job: ${jobHash}`, error) return false } finally { // TODO: We might need to just delete the finished job hashes so they don't build up diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index b6ce9d80..31502522 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -1507,18 +1507,23 @@ export class NetworkMonitor { }) } - structuredLog(network: string, msg: string, tagId?: string | number | (number | string)[]): void { + structuredLog(network: string | undefined, msg: string, tagId?: string | number | (number | string)[]): void { const timestamp = new Date(Date.now()).toISOString() const timestampColor = color.keyword('green') + const parentName = this.parent?.constructor?.name || 'UNKNOWN_COMMAND' + + const networkName = (network ? networks[network]?.name : undefined) || 'UNKNOWN_NETWORK' + const networkColor = (network ? this.networkColors[network] : undefined) ?? color.keyword('white') + this.log( - `[${timestampColor(timestamp)}] [${this.parent.constructor.name}] [${this.networkColors[network]( - capitalize(networks[network].name), - )}]${cleanTags(tagId)} ${msg}`, + `[${timestampColor(timestamp)}] [${parentName}] [${networkColor(capitalize(networkName))}]${cleanTags( + tagId, + )} ${msg}`, ) } structuredLogError( - network: string, + network: string | undefined, error: string | Error | AbstractError, tagId?: string | number | (number | string)[], ): void { @@ -1536,11 +1541,15 @@ export class NetworkMonitor { const timestamp = new Date(Date.now()).toISOString() const timestampColor = color.keyword('green') const errorColor = color.keyword('red') + const parentName = this.parent?.constructor?.name || 'UNKNOWN_COMMAND' + + const networkName = (network ? networks[network]?.name : undefined) || 'UNKNOWN_NETWORK' + const networkColor = (network ? this.networkColors[network] : undefined) ?? color.keyword('white') this.warn( - `[${timestampColor(timestamp)}] [${this.parent.constructor.name}] [${this.networkColors[network]( - capitalize(networks[network].name), - )}] [${errorColor('error')}]${cleanTags(tagId)} ${errorMessage}`, + `[${timestampColor(timestamp)}] [${parentName}] [${networkColor(capitalize(networkName))}] [${errorColor( + 'error', + )}]${cleanTags(tagId)} ${errorMessage}`, ) } diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 7674dc84..3bee546a 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -226,12 +226,12 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { async checkJobStatus(operatorJobHash: string, tags: (string | number)[] = []): Promise { // First validate input (Network is not known until job is decoded) if (!operatorJobHash || !(operatorJobHash in this.operatorJobs)) { - this.networkMonitor.structuredLogError('Unknown', `Invalid job hash provided: ${operatorJobHash}`, tags) + this.networkMonitor.structuredLogError(undefined, `Invalid job hash provided: ${operatorJobHash}`, tags) return } // Fetch job from list - this.networkMonitor.structuredLog('Unknown', `Total jobs count: ${Object.keys(this.operatorJobs.length)}`, tags) + this.networkMonitor.structuredLog(undefined, `Total jobs count: ${Object.keys(this.operatorJobs).length}`, tags) const job: OperatorJob = this.operatorJobs[operatorJobHash] this.networkMonitor.structuredLog(job.network, `Checking status for job ${job.hash}.`, tags) From 91ae77f9fb4a87762888299e81f6b640ff3bb849 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 14:46:28 -0400 Subject: [PATCH 18/35] Reenable logic for selecting job by gasPricing --- src/commands/operator/index.ts | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index d1d7e44c..d191c9dd 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -605,13 +605,15 @@ export default class Operator extends OperatorJobAwareCommand { this.log(`Sorting jobs by priority`) const sortedJobs = this.sortJobsByPriority(jobs) - // this.log(`Selecting job`) - // const selectedJob = this.selectJob(sortedJobs, gasPricing) - // TEMPORARY: Select the first job in the list - let selectedJob: OperatorJob | null = null - if (sortedJobs.length > 0) { - selectedJob = sortedJobs[0] - } + // TODO: This is a temporary fix to ensure that the operator is always working on a job + // let selectedJob: OperatorJob | null = null + // if (sortedJobs.length > 0) { + // selectedJob = sortedJobs[0] + // } + + // TODO: This is the original code that selects the best job based on the provided gas pricing. + this.log(`Selecting job`) + const selectedJob = this.selectJob(sortedJobs, gasPricing) if (selectedJob) { this.log(`Chosen job: ${selectedJob?.hash}`) From 90bd34757fa7676953a1bbfc38ebbfa54a31c031 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 16:50:47 -0400 Subject: [PATCH 19/35] Move the logic that deletes job hash back to top of processing --- src/commands/operator/index.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index d191c9dd..796ecb72 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -591,6 +591,11 @@ export default class Operator extends OperatorJobAwareCommand { this.processingJobsForNetworks[network] = true this.log(`Continue job processing for network: ${network}. Current job hash: ${jobHash}`) + // Remove the job hash from the list of jobs being executed so it isn't reprocessed multiple times + if (jobHash && this.operatorJobs[jobHash]) { + delete this.operatorJobs[jobHash] + } + this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { @@ -620,11 +625,6 @@ export default class Operator extends OperatorJobAwareCommand { const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) this.processOperatorJob(network, selectedJob.hash, tags) - - // We can now delete the job hash from the list of jobs being processed - if (jobHash && this.operatorJobs[jobHash]) { - delete this.operatorJobs[jobHash] - } } else { this.log(`No job selected. Waiting 1 second before trying again.`) setTimeout(this.processOperatorJobs.bind(this, network), 1000) From 1f279851461624cb160b0ecb0b8a5deac63b3de4 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 17:13:21 -0400 Subject: [PATCH 20/35] Move the removing of job to after successful job is executed --- src/commands/operator/index.ts | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 796ecb72..ae6af3ab 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -545,17 +545,14 @@ export default class Operator extends OperatorJobAwareCommand { if (isJobExecutedSuccessfully) { this.networkMonitor.structuredLog( network, - `Job with hash: ${jobHash} was executed successfully. Checking its status...`, + `Job with hash: ${jobHash} was executed successfully. Removing it from the list of jobs being processed.`, tags, ) - await this.checkJobStatus(jobHash, tags) - this.networkMonitor.structuredLog( - network, - `Reprocessing operator jobs after successful execution of job: ${jobHash}`, - tags, - ) - this.processOperatorJobs(network, jobHash) // here the jobHash will be deleted + // We can now delete the job hash from the list of jobs being processed + if (jobHash && this.operatorJobs[jobHash]) { + delete this.operatorJobs[jobHash] + } } else { this.networkMonitor.structuredLog( network, @@ -569,8 +566,8 @@ export default class Operator extends OperatorJobAwareCommand { `Reprocessing operator jobs after failed execution of job: ${jobHash}`, tags, ) - this.processOperatorJobs(network) } + this.processOperatorJobs(network) } processOperatorJobs = (network: string, jobHash?: string): void => { @@ -591,11 +588,6 @@ export default class Operator extends OperatorJobAwareCommand { this.processingJobsForNetworks[network] = true this.log(`Continue job processing for network: ${network}. Current job hash: ${jobHash}`) - // Remove the job hash from the list of jobs being executed so it isn't reprocessed multiple times - if (jobHash && this.operatorJobs[jobHash]) { - delete this.operatorJobs[jobHash] - } - this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { From 27c5a40e66eec4a7b561f2579b75d80216605c72 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 18 Aug 2023 17:13:52 -0400 Subject: [PATCH 21/35] Lint --- src/commands/operator/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index ae6af3ab..48694f16 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -567,6 +567,7 @@ export default class Operator extends OperatorJobAwareCommand { tags, ) } + this.processOperatorJobs(network) } From 7cefad2ccbeed6b50b33603d552d67651caab8e0 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 21 Aug 2023 15:36:43 -0400 Subject: [PATCH 22/35] Swap setTimeout for setInterval skip logic if no jobs are available --- src/commands/operator/index.ts | 36 +++++++++++++++++----------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 48694f16..0385905d 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -283,8 +283,11 @@ export default class Operator extends OperatorJobAwareCommand { scheduleJobsProcessing(): void { for (const network of this.networkMonitor.networks) { - // Instantiate all network operator job watchers - setTimeout(this.processOperatorJobs.bind(this, network), 60_000) // Wait 60 seconds before processing jobs starts + // Wait 60 seconds before processing jobs starts + setTimeout(() => { + // Then start processing jobs every second + setInterval(this.processOperatorJobs.bind(this, network), 1000) + }, 60_000) } } @@ -567,15 +570,20 @@ export default class Operator extends OperatorJobAwareCommand { tags, ) } - - this.processOperatorJobs(network) } - processOperatorJobs = (network: string, jobHash?: string): void => { + processOperatorJobs = (network: string): void => { + const jobCount = Object.keys(this.operatorJobs).length this.log(`Starting job processing for network: ${network}.`) - this.log(`Current job count: ${Object.keys(this.operatorJobs).length}`) - // NOTE: It is possible that with only a 1 second delay before recalling this function via setTimeout + if (jobCount === 0) { + this.log(`No jobs to process for network: ${network}. We'll check again in 1 second.`) + return + } + + this.log(`New jobs to process detected. Current job count for ${network}: ${jobCount}`) + + // NOTE: It is possible that with only a 1 second delay before recalling this function via setInterval // on the same network, it could interupt the current process before it completes // // This lock is put in place to prevent race conditions and / or concurrency issues and ensure that the @@ -587,7 +595,7 @@ export default class Operator extends OperatorJobAwareCommand { try { this.processingJobsForNetworks[network] = true - this.log(`Continue job processing for network: ${network}. Current job hash: ${jobHash}`) + this.log(`Continue job processing for network: ${network}`) this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] @@ -603,24 +611,16 @@ export default class Operator extends OperatorJobAwareCommand { this.log(`Sorting jobs by priority`) const sortedJobs = this.sortJobsByPriority(jobs) - // TODO: This is a temporary fix to ensure that the operator is always working on a job - // let selectedJob: OperatorJob | null = null - // if (sortedJobs.length > 0) { - // selectedJob = sortedJobs[0] - // } - - // TODO: This is the original code that selects the best job based on the provided gas pricing. this.log(`Selecting job`) const selectedJob = this.selectJob(sortedJobs, gasPricing) if (selectedJob) { - this.log(`Chosen job: ${selectedJob?.hash}`) + this.log(`Selected job job: ${selectedJob?.hash}`) const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) this.processOperatorJob(network, selectedJob.hash, tags) } else { - this.log(`No job selected. Waiting 1 second before trying again.`) - setTimeout(this.processOperatorJobs.bind(this, network), 1000) + this.log(`No job selected. Will check again in the next interval.`) } this.log(`Job processing for network: ${network} completed.`) From 0b7f14108ca063174197cfea8b453c564d480c01 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 21 Aug 2023 16:13:06 -0400 Subject: [PATCH 23/35] Add logic to handle failed jobs in the case of error --- src/commands/operator/index.ts | 29 ++++++++++++++++++++--------- src/utils/operator-job.ts | 1 + 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 0385905d..eecb49ec 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -583,21 +583,17 @@ export default class Operator extends OperatorJobAwareCommand { this.log(`New jobs to process detected. Current job count for ${network}: ${jobCount}`) - // NOTE: It is possible that with only a 1 second delay before recalling this function via setInterval - // on the same network, it could interupt the current process before it completes - // - // This lock is put in place to prevent race conditions and / or concurrency issues and ensure that the - // current processing is complete before new jobs are processed if (this.processingJobsForNetworks[network]) { this.log(`Previous job processing for network: ${network} still in progress, skipping this cycle.`) return } + let selectedJob: OperatorJob | null = null + try { this.processingJobsForNetworks[network] = true this.log(`Continue job processing for network: ${network}`) - this.log(`Getting gas pricing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { this.networkMonitor.structuredLogError(network, `Missing gas pricing data for network ${network}`) @@ -612,10 +608,10 @@ export default class Operator extends OperatorJobAwareCommand { const sortedJobs = this.sortJobsByPriority(jobs) this.log(`Selecting job`) - const selectedJob = this.selectJob(sortedJobs, gasPricing) + selectedJob = this.selectJob(sortedJobs, gasPricing) if (selectedJob) { - this.log(`Selected job job: ${selectedJob?.hash}`) + this.log(`Selected job: ${selectedJob.hash}`) const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) this.processOperatorJob(network, selectedJob.hash, tags) @@ -624,8 +620,23 @@ export default class Operator extends OperatorJobAwareCommand { } this.log(`Job processing for network: ${network} completed.`) - } catch (error) { + } catch (error: any) { this.handleError(`An error occurred while processing jobs for network: ${network}`, error) + // Add the failed job to failed jobs list + if (selectedJob && selectedJob.hash) { + this.failedOperatorJobs[selectedJob.hash] = this.operatorJobs[selectedJob.hash] + + // Delete the job from the original operatorJobs + delete this.operatorJobs[selectedJob.hash] + + this.networkMonitor.structuredLogError( + network, + `Error processing job ${selectedJob.hash} for network: ${network}. Current failed jobs count: ${ + Object.keys(this.failedOperatorJobs).length + }`, + error, + ) + } } finally { this.log(`Resetting lock on processOperatorJobs`) this.processingJobsForNetworks[network] = false diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 3bee546a..68216c19 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -45,6 +45,7 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { } operatorJobs: {[key: string]: OperatorJob} = {} + failedOperatorJobs: {[key: string]: OperatorJob} = {} getTargetTime(network: string, jobDetails: OperatorJobDetails): number { let targetTime: number = new Date(BigNumber.from(jobDetails.startTimestamp).toNumber() * 1000).getTime() From c9c05968d6163ab920824cb5a11ba26fd68a0c52 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 21 Aug 2023 16:24:46 -0400 Subject: [PATCH 24/35] Make the job processing start up faster --- src/commands/operator/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index eecb49ec..f856feed 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -287,7 +287,7 @@ export default class Operator extends OperatorJobAwareCommand { setTimeout(() => { // Then start processing jobs every second setInterval(this.processOperatorJobs.bind(this, network), 1000) - }, 60_000) + }, 20_000) } } From 2a8473799613112a26dfd352060e97a5dc924ceb Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 21 Aug 2023 17:41:24 -0400 Subject: [PATCH 25/35] Clean up logs, add better validation logic in decodeOperatorJob --- src/commands/operator/index.ts | 81 ++++++++++++++++---------- src/utils/operator-job.ts | 103 ++++++++++++++++++++------------- 2 files changed, 114 insertions(+), 70 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index f856feed..81b2952c 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -143,7 +143,7 @@ export default class Operator extends OperatorJobAwareCommand { this.log(`Operator started running successfully.`) } catch (error) { - this.handleError('An error occurred in the run method', error) + this.handleErrorAndExit('An error occurred in the run method', error) } } @@ -172,13 +172,13 @@ export default class Operator extends OperatorJobAwareCommand { await this.apiService.operatorLogin() this.log(this.apiColor(`Successfully authenticated into API ${this.BASE_URL}`)) } catch (error: any) { - this.handleError('Failed to get Operator Token from API', error) + this.handleErrorAndExit('Failed to get Operator Token from API', error) } } } } - handleError(message: string, error: any): void { + handleErrorAndExit(message: string, error: any): void { this.log(`Error: ${message}`) this.log(JSON.stringify({...error, stack: error.stack})) this.exit() @@ -277,7 +277,7 @@ export default class Operator extends OperatorJobAwareCommand { this.log('Saved jobs file not found (not loaded).') } } catch (error) { - this.handleError('An error occurred while processing saved jobs', error) + this.handleErrorAndExit('An error occurred while processing saved jobs', error) } } @@ -287,7 +287,7 @@ export default class Operator extends OperatorJobAwareCommand { setTimeout(() => { // Then start processing jobs every second setInterval(this.processOperatorJobs.bind(this, network), 1000) - }, 20_000) + }, 30_000) } } @@ -574,17 +574,27 @@ export default class Operator extends OperatorJobAwareCommand { processOperatorJobs = (network: string): void => { const jobCount = Object.keys(this.operatorJobs).length - this.log(`Starting job processing for network: ${network}.`) + + this.networkMonitor.structuredLog(network, `Starting job processing for network: ${network}.`) if (jobCount === 0) { - this.log(`No jobs to process for network: ${network}. We'll check again in 1 second.`) + this.networkMonitor.structuredLog( + network, + `No jobs to process for network: ${network}. We'll check again in 1 second.`, + ) return } - this.log(`New jobs to process detected. Current job count for ${network}: ${jobCount}`) + this.networkMonitor.structuredLog( + network, + `New jobs to process detected. Current job count for ${network}: ${jobCount}`, + ) if (this.processingJobsForNetworks[network]) { - this.log(`Previous job processing for network: ${network} still in progress, skipping this cycle.`) + this.networkMonitor.structuredLog( + network, + `Previous job processing for network: ${network} still in progress, skipping this cycle.`, + ) return } @@ -592,7 +602,8 @@ export default class Operator extends OperatorJobAwareCommand { try { this.processingJobsForNetworks[network] = true - this.log(`Continue job processing for network: ${network}`) + + this.networkMonitor.structuredLog(network, `Continue job processing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { @@ -600,28 +611,32 @@ export default class Operator extends OperatorJobAwareCommand { return } - this.log(`Updating job times`) + this.networkMonitor.structuredLog(network, `Updating job times`) this.updateJobTimes() const jobs: OperatorJob[] = Object.values(this.operatorJobs).filter(job => job.network === network) - this.log(`Sorting jobs by priority`) + this.networkMonitor.structuredLog(network, `Sorting jobs by priority`) const sortedJobs = this.sortJobsByPriority(jobs) - this.log(`Selecting job`) + this.networkMonitor.structuredLog(network, `Selecting job`) selectedJob = this.selectJob(sortedJobs, gasPricing) if (selectedJob) { - this.log(`Selected job: ${selectedJob.hash}`) + this.networkMonitor.structuredLog(network, `Selected job: ${selectedJob.hash}`) const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) this.processOperatorJob(network, selectedJob.hash, tags) } else { - this.log(`No job selected. Will check again in the next interval.`) + this.networkMonitor.structuredLog(network, `No job selected. Will check again in the next interval.`) } - this.log(`Job processing for network: ${network} completed.`) + this.networkMonitor.structuredLog(network, `Job processing for network: ${network} completed.`) } catch (error: any) { - this.handleError(`An error occurred while processing jobs for network: ${network}`, error) + this.networkMonitor.structuredLogError( + network, + `An error occurred while processing jobs for network: ${network}`, + error, + ) // Add the failed job to failed jobs list if (selectedJob && selectedJob.hash) { this.failedOperatorJobs[selectedJob.hash] = this.operatorJobs[selectedJob.hash] @@ -638,7 +653,7 @@ export default class Operator extends OperatorJobAwareCommand { ) } } finally { - this.log(`Resetting lock on processOperatorJobs`) + this.networkMonitor.structuredLog(network, `Resetting lock on processOperatorJobs`) this.processingJobsForNetworks[network] = false } } @@ -669,7 +684,8 @@ export default class Operator extends OperatorJobAwareCommand { // Calculate average gas price. const averageGasPrice: BigNumber = jobs.length > 0 ? totalGas.div(jobs.length) : BigNumber.from(0) - this.log( + this.networkMonitor.structuredLog( + undefined, `None of the jobs in queue can be executed with the current gas pricing. ${ jobs.length } jobs in queue. Gas price: ${compareGas.toString()}. Average gas provided: ${averageGasPrice.toString()}`, @@ -681,28 +697,29 @@ export default class Operator extends OperatorJobAwareCommand { * Execute the job */ async executeJob(jobHash: string, tags: (string | number)[]): Promise { - this.log(`Starting execute job`) + this.networkMonitor.structuredLog(undefined, `Starting execute job`, tags) + let network: string | undefined + try { // Idempotency check if (this.isJobBeingExecuted[jobHash]) { - this.log('Job is already being executed', tags) - + this.networkMonitor.structuredLog(undefined, 'Job is already being executed', tags) return false } this.isJobBeingExecuted[jobHash] = true // Check job status - this.log(`Checking job status...`) + this.networkMonitor.structuredLog(undefined, `Checking job status...`, tags) await this.checkJobStatus(jobHash, tags) if (!(jobHash in this.operatorJobs)) { - this.log(`Job hash is not in the operator jobs... returning`) + this.networkMonitor.structuredLog(undefined, `Job hash is not in the operator jobs... returning`, tags) return true } const job: OperatorJob = this.operatorJobs[jobHash] - const network: string = job.network + network = job.network let operate = this.operatorMode === OperatorMode.auto // Operator mode handling @@ -724,7 +741,7 @@ export default class Operator extends OperatorJobAwareCommand { } // Transaction handling - this.log(`About to execute the transaction`) + this.networkMonitor.structuredLog(network, `About to execute the transaction`, tags) const receipt: TransactionReceipt | null = await this.networkMonitor.executeTransaction({ network, tags, @@ -740,19 +757,21 @@ export default class Operator extends OperatorJobAwareCommand { }) if (receipt && receipt.status === 1) { - this.log(`Execution succeeded. Removing job ${jobHash} from the operator jobs queue`) + this.networkMonitor.structuredLog( + network, + `Execution succeeded. Removing job ${jobHash} from the operator jobs queue`, + tags, + ) delete this.operatorJobs[jobHash] } return receipt !== null } catch (error: any) { - // Network might not have been extracted from the job if there was an error so it is unknown console.error('Original Error:', error) - this.networkMonitor.structuredLogError(undefined, `An error occurred while executing job: ${jobHash}`, error) + this.networkMonitor.structuredLogError(network, `An error occurred while executing job: ${jobHash}`, error) return false } finally { - // TODO: We might need to just delete the finished job hashes so they don't build up - this.log(`Removing lock on job hash`) + this.networkMonitor.structuredLog(network, `Removing lock on job hash`, tags) this.isJobBeingExecuted[jobHash] = false } } diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 68216c19..ba5b1dfd 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -1,6 +1,5 @@ import {BigNumber, BigNumberish} from '@ethersproject/bignumber' import {Contract} from '@ethersproject/contracts' -import {formatUnits} from '@ethersproject/units' import {NetworkMonitor} from './network-monitor' import {zeroAddress} from './web3' @@ -89,48 +88,16 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { try { const contract: Contract = this.networkMonitor.operatorContract.connect(this.networkMonitor.providers[network]) - // Try to fetch job details - const rawJobDetails: any[] = await contract.getJobDetails(operatorJobHash) - - // Validate rawJobDetails before processing - if (!rawJobDetails || rawJobDetails.length < 6) { - throw new Error(`Invalid job details for job ${operatorJobHash}`) - } - - const jobDetails: OperatorJobDetails = { - pod: rawJobDetails[0] as number, - blockTimes: rawJobDetails[1] as number, - operator: (rawJobDetails[2] as string).toLowerCase(), - startBlock: rawJobDetails[3] as number, - startTimestamp: BigNumber.from(rawJobDetails[4]), - fallbackOperators: rawJobDetails[5] as number[], - } as OperatorJobDetails - - if (jobDetails.startBlock <= 0) { - throw new Error(`Invalid startBlock for job ${operatorJobHash}`) - } + const rawJobDetails = await this.fetchJobDetails(contract, operatorJobHash) + const jobDetails = this.validateAndParseJobDetails(rawJobDetails, operatorJobHash) this.networkMonitor.structuredLog(network, `Decoded valid job ${operatorJobHash}`, tags) - this.networkMonitor.structuredLog(network, `Selected operator for job is ${jobDetails.operator}`, tags) const targetTime: number = this.getTargetTime(network, jobDetails) - // Extract gasLimit and gasPrice from payload - const gasLimit: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-128, -64)) - this.networkMonitor.structuredLog(network, `Job gas limit is ${gasLimit.toNumber()}`, tags) - - const gasPrice: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-64)) - this.networkMonitor.structuredLog(network, `Job maximum gas price is ${formatUnits(gasPrice, 'gwei')} GWEI`, tags) + const {gasLimit, gasPrice} = this.extractGasDetailsFromPayload(operatorJobPayload) - const remainingTime: number = Math.round((targetTime - Date.now()) / 1000) - this.networkMonitor.structuredLog( - network, - `Job can be operated ${remainingTime <= 0 ? 'immediately' : 'in ' + remainingTime + ' seconds'}`, - tags, - ) - - // Add job to list - this.operatorJobs[operatorJobHash] = { + const operatorJob: OperatorJob = { network, hash: operatorJobHash, payload: operatorJobPayload, @@ -139,7 +106,9 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { gasPrice, jobDetails, tags, - } as OperatorJob + } + + this.operatorJobs[operatorJobHash] = operatorJob this.networkMonitor.structuredLog( network, @@ -147,13 +116,69 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { tags, ) - return this.operatorJobs[operatorJobHash] + return operatorJob } catch (error: any) { this.networkMonitor.structuredLogError(network, `Error decoding job ${operatorJobHash}: ${error.message}`, tags) return undefined } } + private async fetchJobDetails(contract: Contract, operatorJobHash: string): Promise { + const rawJobDetails: any[] = await contract.getJobDetails(operatorJobHash) + if (!rawJobDetails || rawJobDetails.length < 6) { + throw new Error(`Invalid job details for job ${operatorJobHash}`) + } + + return rawJobDetails + } + + private validateAndParseJobDetails(rawJobDetails: any[], operatorJobHash: string): OperatorJobDetails { + if (!rawJobDetails) { + throw new Error(`No job details found for job ${operatorJobHash}`) + } + + if (rawJobDetails.length < 6) { + throw new Error(`Incomplete job details for job ${operatorJobHash}`) + } + + const validators = { + pod: (value: any) => typeof value === 'number', + blockTimes: (value: any) => typeof value === 'number', + operator: (value: any) => typeof value === 'string', + startBlock: (value: any) => typeof value === 'number' && value > 0, + startTimestamp: (value: any) => typeof value === 'string', // assuming it's a string for BigNumber conversion + fallbackOperators: (value: any) => Array.isArray(value) && value.every((v: any) => typeof v === 'number'), + } + + const jobDetails: Partial = {} + + for (const [key, validator] of Object.entries(validators)) { + const index = Number.parseInt(key, 10) + + if (!validator(rawJobDetails[index])) { + throw new Error(`Invalid ${key} value for job ${operatorJobHash}`) + } + + jobDetails[key as keyof OperatorJobDetails] = rawJobDetails[index] + } + + return { + pod: jobDetails.pod as number, + blockTimes: jobDetails.blockTimes as number, + operator: (jobDetails.operator as string).toLowerCase(), + startBlock: jobDetails.startBlock as number, + startTimestamp: BigNumber.from(jobDetails.startTimestamp as string), + fallbackOperators: jobDetails.fallbackOperators as number[], + } + } + + private extractGasDetailsFromPayload(operatorJobPayload: string): {gasLimit: BigNumber; gasPrice: BigNumber} { + const gasLimit: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-128, -64)) + const gasPrice: BigNumber = BigNumber.from('0x' + operatorJobPayload.slice(-64)) + + return {gasLimit, gasPrice} + } + updateJobTimes(): void { for (const hash of Object.keys(this.operatorJobs)) { const job: OperatorJob = this.operatorJobs[hash] From a6e6a50672c78d2de71d64ec1dab69fd4942be6e Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 21 Aug 2023 18:15:43 -0400 Subject: [PATCH 26/35] Fix bug with validation logic in validateAndParseJobDetails, fix missing await --- src/commands/operator/index.ts | 14 ++++++++++---- src/utils/operator-job.ts | 12 +++++++++++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 81b2952c..cb1bf9cf 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -283,10 +283,16 @@ export default class Operator extends OperatorJobAwareCommand { scheduleJobsProcessing(): void { for (const network of this.networkMonitor.networks) { - // Wait 60 seconds before processing jobs starts + // Wait 30 seconds before processing jobs starts setTimeout(() => { // Then start processing jobs every second - setInterval(this.processOperatorJobs.bind(this, network), 1000) + setInterval(async () => { + try { + await this.processOperatorJobs(network) + } catch (error) { + console.error(`Error processing jobs for network ${network}:`, error) + } + }, 1000) }, 30_000) } } @@ -572,7 +578,7 @@ export default class Operator extends OperatorJobAwareCommand { } } - processOperatorJobs = (network: string): void => { + processOperatorJobs = async (network: string): Promise => { const jobCount = Object.keys(this.operatorJobs).length this.networkMonitor.structuredLog(network, `Starting job processing for network: ${network}.`) @@ -625,7 +631,7 @@ export default class Operator extends OperatorJobAwareCommand { this.networkMonitor.structuredLog(network, `Selected job: ${selectedJob.hash}`) const tags = this.operatorJobs[selectedJob.hash]?.tags ?? [this.networkMonitor.randomTag()] this.networkMonitor.structuredLog(network, `Sending job ${selectedJob.hash} for execution`, tags) - this.processOperatorJob(network, selectedJob.hash, tags) + await this.processOperatorJob(network, selectedJob.hash, tags) } else { this.networkMonitor.structuredLog(network, `No job selected. Will check again in the next interval.`) } diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index ba5b1dfd..56a36c19 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -150,10 +150,20 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { fallbackOperators: (value: any) => Array.isArray(value) && value.every((v: any) => typeof v === 'number'), } + // Create a mapping from key names to their expected indices in the rawJobDetails array + const keyToIndexMapping: {[key: string]: number} = { + pod: 0, + blockTimes: 1, + operator: 2, + startBlock: 3, + startTimestamp: 4, + fallbackOperators: 5, + } + const jobDetails: Partial = {} for (const [key, validator] of Object.entries(validators)) { - const index = Number.parseInt(key, 10) + const index = keyToIndexMapping[key] if (!validator(rawJobDetails[index])) { throw new Error(`Invalid ${key} value for job ${operatorJobHash}`) From b293c6e74ae7f180037246f55892ee2d67ab50f9 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 22 Aug 2023 01:03:24 -0400 Subject: [PATCH 27/35] Allow startTimestamp to be a number too in the validation --- src/utils/operator-job.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 56a36c19..1e0aff58 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -146,7 +146,7 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { blockTimes: (value: any) => typeof value === 'number', operator: (value: any) => typeof value === 'string', startBlock: (value: any) => typeof value === 'number' && value > 0, - startTimestamp: (value: any) => typeof value === 'string', // assuming it's a string for BigNumber conversion + startTimestamp: (value: any) => typeof value === 'string' || typeof value === 'number', fallbackOperators: (value: any) => Array.isArray(value) && value.every((v: any) => typeof v === 'number'), } From c26371cdb25f3984566094664b5386452afbd7a8 Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 23 Aug 2023 01:08:29 -0400 Subject: [PATCH 28/35] Fix bug with startTimestamp --- src/utils/operator-job.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/utils/operator-job.ts b/src/utils/operator-job.ts index 1e0aff58..ae8b0a84 100644 --- a/src/utils/operator-job.ts +++ b/src/utils/operator-job.ts @@ -146,7 +146,9 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { blockTimes: (value: any) => typeof value === 'number', operator: (value: any) => typeof value === 'string', startBlock: (value: any) => typeof value === 'number' && value > 0, - startTimestamp: (value: any) => typeof value === 'string' || typeof value === 'number', + startTimestamp: (value: any) => { + return BigNumber.isBigNumber(value) + }, fallbackOperators: (value: any) => Array.isArray(value) && value.every((v: any) => typeof v === 'number'), } @@ -166,7 +168,11 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { const index = keyToIndexMapping[key] if (!validator(rawJobDetails[index])) { - throw new Error(`Invalid ${key} value for job ${operatorJobHash}`) + throw new Error( + `Invalid ${key} value for job ${operatorJobHash}. Value: ${ + rawJobDetails[index] + }. Type: ${typeof rawJobDetails[index]}`, + ) } jobDetails[key as keyof OperatorJobDetails] = rawJobDetails[index] @@ -177,7 +183,7 @@ export abstract class OperatorJobAwareCommand extends HealthCheck { blockTimes: jobDetails.blockTimes as number, operator: (jobDetails.operator as string).toLowerCase(), startBlock: jobDetails.startBlock as number, - startTimestamp: BigNumber.from(jobDetails.startTimestamp as string), + startTimestamp: jobDetails.startTimestamp!, // We know this is defined because of the validator fallbackOperators: jobDetails.fallbackOperators as number[], } } From a82fa8ade8366d93793da690deccd1df3dce1964 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 25 Aug 2023 12:47:33 -0400 Subject: [PATCH 29/35] Add special handling for gas limit too low --- src/utils/network-monitor.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index 31502522..4baea8b7 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -1858,12 +1858,18 @@ export class NetworkMonitor { const tx = await provider.sendTransaction(signedTx) if (tx === null) { - throw new Error('Failed submitting transaction') + throw new Error('Failed sending transaction') } else { this.structuredLog(network, `Transaction sent to mempool ${tx.hash}`, tags) return tx } } catch (error: any) { + this.structuredLogError(network, `Send transaction failed ${error}`, tags) + // Handle the "intrinsic gas too low" error and throw a unique error to prevent retries + if (error.message.includes('intrinsic gas too low')) { + throw new Error('IntrinsicGasTooLowError') + } + if (error.message === 'already known' || error.message === 'nonce has already been used') { const tx = await this.getTransaction({ transactionHash: txHash!, @@ -1893,6 +1899,12 @@ export class NetworkMonitor { try { return await this.retry(network, sendTransactionAttempt, attempts, interval) } catch (error: any) { + // Check for our unique error and log it differently + if (error.message === 'IntrinsicGasTooLowError') { + this.structuredLogError(network, 'Transaction gas limit too low, not retrying', tags) + return null + } + this.structuredLogError(network, 'Failed submitting transaction', tags) throw error } From 48f114f36356a015d26a857980c938a8a413a555 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 25 Aug 2023 17:58:29 -0400 Subject: [PATCH 30/35] Refactor sendTransaction, add handling for IntrinsicGasTooLowError, prevent continous loop --- src/commands/operator/index.ts | 12 ++- src/types/network-monitor.ts | 1 + src/utils/errors.ts | 13 ++++ src/utils/network-monitor.ts | 134 +++++++++++++++------------------ 4 files changed, 87 insertions(+), 73 deletions(-) create mode 100644 src/utils/errors.ts diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index cb1bf9cf..6ff027f6 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -45,6 +45,7 @@ import {BigNumber} from '@ethersproject/bignumber' import {GasPricing} from '../../utils/gas' import {checkOptionFlag} from '../../utils/validation' import {OperatorJobAwareCommand, OperatorJob} from '../../utils/operator-job' +import {IntrinsicGasTooLowError} from '../../utils/errors' /* END NEED TO CHECK */ @@ -773,8 +774,17 @@ export default class Operator extends OperatorJobAwareCommand { return receipt !== null } catch (error: any) { - console.error('Original Error:', error) this.networkMonitor.structuredLogError(network, `An error occurred while executing job: ${jobHash}`, error) + + if (error instanceof IntrinsicGasTooLowError) { + this.networkMonitor.structuredLogError( + network, + `IntrinsicGasTooLowError occurred while executing job: ${jobHash}. The job's gas limit is to low to ever succeed. Removing job from queue.`, + error.message, + ) + delete this.operatorJobs[jobHash] + } + return false } finally { this.networkMonitor.structuredLog(network, `Removing lock on job hash`, tags) diff --git a/src/types/network-monitor.ts b/src/types/network-monitor.ts index 062aecf4..cce7c00f 100644 --- a/src/types/network-monitor.ts +++ b/src/types/network-monitor.ts @@ -33,6 +33,7 @@ export type SendTransactionParams = { attempts?: number canFail?: boolean interval?: number + greedy?: boolean } export type PopulateTransactionParams = { diff --git a/src/utils/errors.ts b/src/utils/errors.ts new file mode 100644 index 00000000..56c2384b --- /dev/null +++ b/src/utils/errors.ts @@ -0,0 +1,13 @@ +export class IntrinsicGasTooLowError extends Error { + constructor() { + super('IntrinsicGasTooLowError') + this.name = 'IntrinsicGasTooLowError' + } +} + +export class KnownTransactionError extends Error { + constructor(message: string) { + super(message) + this.name = 'KnownTransactionError' + } +} diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index 4baea8b7..5e97c8f5 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -60,6 +60,7 @@ import { } from '../types/network-monitor' import {BlockHeightOptions} from '../flags/update-block-height.flag' import {NETWORK_COLORS, zeroAddress} from './web3' +import {IntrinsicGasTooLowError, KnownTransactionError} from './errors' export const replayFlag = { replay: Flags.string({ @@ -1816,97 +1817,78 @@ export class NetworkMonitor { tags = [] as (string | number)[], attempts = 10, interval = 3000, + greedy = false, }: SendTransactionParams): Promise { const wallet = this.wallets[network] const provider = this.providers[network] const gasPricing: GasPricing = this.gasPrices[network] - let gasPrice: BigNumber | undefined - const rawTxGasPrice: BigNumber = BigNumber.from(rawTx.gasPrice ?? 0) + let gasPrice: BigNumber | undefined = BigNumber.from(rawTx.gasPrice ?? 0) - // Remove the gasPrice from rawTx to avoid EIP1559 error that type2 tx does not allow for use of gasPrice - delete rawTx.gasPrice + const prepareTransaction = () => { + delete rawTx.gasPrice // Remove gasPrice to avoid EIP1559 error - // Define function that attempts to send transaction - const sendTransactionAttempt = async (): Promise => { - let txHash: string | null = null - - try { - // move gas price info around to support EIP-1559 - if (gasPricing.isEip1559) { - if (gasPrice === undefined) { - gasPrice = BigNumber.from(rawTxGasPrice) - } - - rawTx.type = 2 - rawTx.maxPriorityFeePerGas = gasPrice! - rawTx.maxFeePerGas = gasPrice! - } - - if ('value' in rawTx && rawTx.value!.eq(ZERO)) { - delete rawTx.value - } - - const populatedTx = await wallet.populateTransaction(rawTx) - const signedTx = await wallet.signTransaction(populatedTx) + if (gasPricing.isEip1559) { + rawTx.type = 2 + rawTx.maxPriorityFeePerGas = gasPrice! + rawTx.maxFeePerGas = gasPrice! + } + if ('value' in rawTx && rawTx.value!.eq(ZERO)) { + delete rawTx.value + } + } - if (txHash === null) { - txHash = keccak256(signedTx) - } + const sendTransactionAttempt = async (increaseGasAttempts = 0): Promise => { + prepareTransaction() - this.structuredLog(network, 'Attempting to send transaction -> ' + JSON.stringify(populatedTx), tags) + const populatedTx = await wallet.populateTransaction(rawTx) + const signedTx = await wallet.signTransaction(populatedTx) + const txHash = keccak256(signedTx) + this.structuredLog(network, 'Attempting to send transaction -> ' + JSON.stringify(populatedTx), tags) + try { const tx = await provider.sendTransaction(signedTx) + if (!tx) throw new Error('Failed sending transaction') - if (tx === null) { - throw new Error('Failed sending transaction') - } else { - this.structuredLog(network, `Transaction sent to mempool ${tx.hash}`, tags) - return tx - } + this.structuredLog(network, `Transaction sent to mempool ${tx.hash}`, tags) + return tx } catch (error: any) { this.structuredLogError(network, `Send transaction failed ${error}`, tags) - // Handle the "intrinsic gas too low" error and throw a unique error to prevent retries + if (error.message.includes('intrinsic gas too low')) { - throw new Error('IntrinsicGasTooLowError') - } + if (greedy && increaseGasAttempts < attempts) { + // Increase the gas limit by 10% and retry + rawTx.gasLimit = rawTx.gasLimit?.mul(110).div(100) + return sendTransactionAttempt(increaseGasAttempts + 1) + } - if (error.message === 'already known' || error.message === 'nonce has already been used') { - const tx = await this.getTransaction({ - transactionHash: txHash!, - network, - tags, - attempts, - interval, - }) + throw new IntrinsicGasTooLowError() + } else if (error.message === 'already known' || error.message === 'nonce has already been used') { + const tx = await this.getTransaction({transactionHash: txHash, network, tags, attempts, interval}) - if (tx === null) { - throw error - } else { - this.structuredLog( - network, - error.message === 'already known' ? 'Transaction already submitted' : 'Transaction already mined', - tags, - ) - return tx - } + if (!tx) throw new KnownTransactionError(error.message) + return tx } else { throw error } } } - // Attempt to send the transaction, with retries + // Primary flow with retries + // If greedy is true, we will increase the gas limit by 10% and retry + // If greedy is false, we will not increase the gas limit and not retry try { return await this.retry(network, sendTransactionAttempt, attempts, interval) } catch (error: any) { - // Check for our unique error and log it differently - if (error.message === 'IntrinsicGasTooLowError') { + if (error instanceof IntrinsicGasTooLowError) { this.structuredLogError(network, 'Transaction gas limit too low, not retrying', tags) - return null + throw error + } else if (error instanceof KnownTransactionError) { + this.structuredLogError(network, error.message, tags) + throw error + } else { + this.structuredLogError(network, 'Failed submitting transaction', tags) + throw error } - - this.structuredLogError(network, 'Failed submitting transaction', tags) - throw error } } @@ -2100,24 +2082,32 @@ export class NetworkMonitor { // Generic retry function async retry(network: string, func: () => Promise, attempts = 10, interval = 5000): Promise { let result: T | null = null + let i = 0 - let i = 0 // declare i outside of loop so it can be used later - for (i; i < attempts; i++) { + for (; i < attempts; i++) { try { result = await func() if (result !== null) { return result } } catch (error: any) { - this.structuredLogError(network, error.message) + // Exit immediately if error is IntrinsicGasTooLowError + if (error instanceof IntrinsicGasTooLowError) { + throw error + } + + // Log the error with more context + this.structuredLogError(network, `Attempt ${i + 1} failed: ${error.message}`) } - // If we haven't returned by now, it means the function call was unsuccessful. - // We sleep for the specified interval before the next attempt. - await sleep(interval) + // Sleep before the next attempt + if (i < attempts - 1) { + // To avoid unnecessary sleep after the last failed attempt + await sleep(interval) + } } - // If we've exited the loop without returning, it means all attempts were unsuccessful. - throw new Error(`Maximum attempts reached for ${func.name}, function did not succeed after ${attempts} attempts`) + const functionName = func.name || 'Anonymous function' + throw new Error(`Maximum attempts reached for ${functionName}. Did not succeed after ${attempts} attempts.`) } } From 3ff25078e7334df20bd7b3960e8abfa0d9ef2226 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 25 Aug 2023 17:59:23 -0400 Subject: [PATCH 31/35] Lint --- src/utils/network-monitor.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index 5e97c8f5..cf7957cd 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -1822,7 +1822,7 @@ export class NetworkMonitor { const wallet = this.wallets[network] const provider = this.providers[network] const gasPricing: GasPricing = this.gasPrices[network] - let gasPrice: BigNumber | undefined = BigNumber.from(rawTx.gasPrice ?? 0) + const gasPrice: BigNumber | undefined = BigNumber.from(rawTx.gasPrice ?? 0) const prepareTransaction = () => { delete rawTx.gasPrice // Remove gasPrice to avoid EIP1559 error @@ -1832,6 +1832,7 @@ export class NetworkMonitor { rawTx.maxPriorityFeePerGas = gasPrice! rawTx.maxFeePerGas = gasPrice! } + if ('value' in rawTx && rawTx.value!.eq(ZERO)) { delete rawTx.value } From 3111c9a1eb1ba02cc7d5bd202fb4027e175725df Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 28 Aug 2023 17:46:08 -0400 Subject: [PATCH 32/35] Add greedy mode, removing locking in favor of sequential processing --- src/commands/operator/index.ts | 41 +++++++++++++--------------------- src/utils/network-monitor.ts | 22 +++++++++++++----- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 6ff027f6..4a746d37 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -72,6 +72,10 @@ export default class Operator extends OperatorJobAwareCommand { char: 'h', required: false, }), + greedy: Flags.boolean({ + description: 'Enable greedy mode', + default: false, + }), ...syncFlag, ...blockHeightFlag, ...networksFlag, @@ -80,7 +84,6 @@ export default class Operator extends OperatorJobAwareCommand { ...HealthCheck.flags, } - private processingJobsForNetworks: {[network: string]: boolean} = {} private isJobBeingExecuted: {[jobHash: string]: boolean} = {} // API Params @@ -209,6 +212,7 @@ export default class Operator extends OperatorJobAwareCommand { apiService: this.apiService, BlockHeightOptions: this.updateBlockHeight as BlockHeightOptions, processBlockRange: flags['process-block-range'], + greedy: flags.greedy, }) this.jobsFile = path.join(this.config.configDir, this.networkMonitor.environment + '.operator-job-details.json') } @@ -284,17 +288,17 @@ export default class Operator extends OperatorJobAwareCommand { scheduleJobsProcessing(): void { for (const network of this.networkMonitor.networks) { - // Wait 30 seconds before processing jobs starts - setTimeout(() => { - // Then start processing jobs every second - setInterval(async () => { + // This starts processing jobs after an initial delay (currently set to 0 seconds) + setTimeout(async () => { + while (true) { try { await this.processOperatorJobs(network) + await new Promise(resolve => setTimeout(resolve, 1000)) // Waits for 1 second } catch (error) { console.error(`Error processing jobs for network ${network}:`, error) } - }, 1000) - }, 30_000) + } + }, 0) // You can increase this value for a longer initial delay } } @@ -582,13 +586,10 @@ export default class Operator extends OperatorJobAwareCommand { processOperatorJobs = async (network: string): Promise => { const jobCount = Object.keys(this.operatorJobs).length - this.networkMonitor.structuredLog(network, `Starting job processing for network: ${network}.`) + this.debug(`Starting job processing for network: ${network}.`) if (jobCount === 0) { - this.networkMonitor.structuredLog( - network, - `No jobs to process for network: ${network}. We'll check again in 1 second.`, - ) + this.debug(`No jobs to process for network: ${network}.`) return } @@ -597,20 +598,10 @@ export default class Operator extends OperatorJobAwareCommand { `New jobs to process detected. Current job count for ${network}: ${jobCount}`, ) - if (this.processingJobsForNetworks[network]) { - this.networkMonitor.structuredLog( - network, - `Previous job processing for network: ${network} still in progress, skipping this cycle.`, - ) - return - } - let selectedJob: OperatorJob | null = null try { - this.processingJobsForNetworks[network] = true - - this.networkMonitor.structuredLog(network, `Continue job processing for network: ${network}`) + this.debug(`Continue job processing for network: ${network}`) const gasPricing: GasPricing = this.networkMonitor.gasPrices[network] if (!gasPricing) { @@ -644,6 +635,7 @@ export default class Operator extends OperatorJobAwareCommand { `An error occurred while processing jobs for network: ${network}`, error, ) + // Add the failed job to failed jobs list if (selectedJob && selectedJob.hash) { this.failedOperatorJobs[selectedJob.hash] = this.operatorJobs[selectedJob.hash] @@ -659,9 +651,6 @@ export default class Operator extends OperatorJobAwareCommand { error, ) } - } finally { - this.networkMonitor.structuredLog(network, `Resetting lock on processOperatorJobs`) - this.processingJobsForNetworks[network] = false } } diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index cf7957cd..d2054ecc 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -275,6 +275,7 @@ type NetworkMonitorOptions = { apiService?: ApiService BlockHeightOptions?: BlockHeightOptions processBlockRange?: boolean + greedy?: boolean } export class NetworkMonitor { @@ -317,6 +318,7 @@ export class NetworkMonitor { lastProcessBlockDone: {[key: string]: number} = {} lastBlockJobDone: {[key: string]: number} = {} processBlocksByRange: {[key: string]: boolean} = {} + greedy = false blockJobMonitorProcess: {[key: string]: NodeJS.Timer} = {} gasPrices: {[key: string]: GasPricing} = {} contracts: Partial = {} @@ -428,6 +430,11 @@ export class NetworkMonitor { this.verbose = options.verbose } + if (options.greedy !== undefined) { + this.log('Greedy mode enabled') + this.greedy = options.greedy + } + if (options.processTransactions !== undefined) { this.processTransactions = options.processTransactions.bind(this.parent) } @@ -1817,7 +1824,6 @@ export class NetworkMonitor { tags = [] as (string | number)[], attempts = 10, interval = 3000, - greedy = false, }: SendTransactionParams): Promise { const wallet = this.wallets[network] const provider = this.providers[network] @@ -1856,9 +1862,14 @@ export class NetworkMonitor { this.structuredLogError(network, `Send transaction failed ${error}`, tags) if (error.message.includes('intrinsic gas too low')) { - if (greedy && increaseGasAttempts < attempts) { - // Increase the gas limit by 10% and retry - rawTx.gasLimit = rawTx.gasLimit?.mul(110).div(100) + if (this.greedy && increaseGasAttempts < attempts) { + this.structuredLog( + network, + 'Gas limit is too low and greedy mode is active. Increasing gas limit by double and retrying', + tags, + ) + // Increase the gas limit by double and retry + rawTx.gasLimit = rawTx.gasLimit?.mul(200).div(100) return sendTransactionAttempt(increaseGasAttempts + 1) } @@ -1875,7 +1886,7 @@ export class NetworkMonitor { } // Primary flow with retries - // If greedy is true, we will increase the gas limit by 10% and retry + // If greedy is true, we will increase the gas limit by double and retry // If greedy is false, we will not increase the gas limit and not retry try { return await this.retry(network, sendTransactionAttempt, attempts, interval) @@ -2041,7 +2052,6 @@ export class NetworkMonitor { tags, rawTx, attempts, - interval, }) if (tx === null) { From f68c27b1a3a4959cc7903085e92b110756049007 Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 28 Aug 2023 17:49:37 -0400 Subject: [PATCH 33/35] Lint --- src/commands/operator/index.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 4a746d37..3016d868 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -290,9 +290,11 @@ export default class Operator extends OperatorJobAwareCommand { for (const network of this.networkMonitor.networks) { // This starts processing jobs after an initial delay (currently set to 0 seconds) setTimeout(async () => { + // eslint-disable-next-line no-constant-condition while (true) { try { await this.processOperatorJobs(network) + // eslint-disable-next-line no-promise-executor-return await new Promise(resolve => setTimeout(resolve, 1000)) // Waits for 1 second } catch (error) { console.error(`Error processing jobs for network ${network}:`, error) From f26fea53f9da97150331044750b16689e421b045 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 29 Aug 2023 16:16:47 -0400 Subject: [PATCH 34/35] Make greedy mode smart --- src/commands/operator/index.ts | 2 +- src/utils/network-monitor.ts | 57 ++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/commands/operator/index.ts b/src/commands/operator/index.ts index 3016d868..9453b7b0 100644 --- a/src/commands/operator/index.ts +++ b/src/commands/operator/index.ts @@ -73,7 +73,7 @@ export default class Operator extends OperatorJobAwareCommand { required: false, }), greedy: Flags.boolean({ - description: 'Enable greedy mode', + description: 'Enable greedy mode which will retry failed jobs with a higher gas limit in order to execute', default: false, }), ...syncFlag, diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index d2054ecc..a31f22ff 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -1844,7 +1844,7 @@ export class NetworkMonitor { } } - const sendTransactionAttempt = async (increaseGasAttempts = 0): Promise => { + const sendTransactionAttempt = async (): Promise => { prepareTransaction() const populatedTx = await wallet.populateTransaction(rawTx) @@ -1862,15 +1862,26 @@ export class NetworkMonitor { this.structuredLogError(network, `Send transaction failed ${error}`, tags) if (error.message.includes('intrinsic gas too low')) { - if (this.greedy && increaseGasAttempts < attempts) { + if (this.greedy) { this.structuredLog( network, - 'Gas limit is too low and greedy mode is active. Increasing gas limit by double and retrying', + 'Gas limit issue detected and greedy mode is active. Estimating required gas limit.', tags, ) - // Increase the gas limit by double and retry - rawTx.gasLimit = rawTx.gasLimit?.mul(200).div(100) - return sendTransactionAttempt(increaseGasAttempts + 1) + + const bufferFactor = BigNumber.from('125').div(100) + + // Create a gas-less version of the transaction for estimating gas. + const gaslessTx = { + ...rawTx, + gasLimit: undefined, + maxPriorityFeePerGas: undefined, + maxFeePerGas: undefined, + } + + const estimatedGasLimit = await provider.estimateGas(gaslessTx) + rawTx.gasLimit = estimatedGasLimit.mul(bufferFactor) + this.structuredLog(network, `Adjusted gas limit to: ${rawTx.gasLimit}`, tags) } throw new IntrinsicGasTooLowError() @@ -1886,7 +1897,7 @@ export class NetworkMonitor { } // Primary flow with retries - // If greedy is true, we will increase the gas limit by double and retry + // If greedy is true, the operator will estimate the required gas limit and retry // If greedy is false, we will not increase the gas limit and not retry try { return await this.retry(network, sendTransactionAttempt, attempts, interval) @@ -2090,35 +2101,33 @@ export class NetworkMonitor { return receipt } - // Generic retry function + /** + * Retries a function multiple times if it fails. + * @param network - The network name. + * @param func - The function to retry. + * @param attempts - The maximum number of attempts. + * @param interval - The interval between retries. + */ async retry(network: string, func: () => Promise, attempts = 10, interval = 5000): Promise { - let result: T | null = null - let i = 0 - - for (; i < attempts; i++) { + for (let i = 0; i < attempts; i++) { try { - result = await func() + const result = await func() if (result !== null) { return result } } catch (error: any) { - // Exit immediately if error is IntrinsicGasTooLowError - if (error instanceof IntrinsicGasTooLowError) { + this.structuredLogError(network, `Attempt ${i + 1} failed: ${error.message}`) + + if (i === attempts - 1) { + // If this was the last attempt, throw the error. throw error } - // Log the error with more context - this.structuredLogError(network, `Attempt ${i + 1} failed: ${error.message}`) - } - - // Sleep before the next attempt - if (i < attempts - 1) { - // To avoid unnecessary sleep after the last failed attempt + // Sleep before the next attempt. await sleep(interval) } } - const functionName = func.name || 'Anonymous function' - throw new Error(`Maximum attempts reached for ${functionName}. Did not succeed after ${attempts} attempts.`) + throw new Error(`Maximum attempts reached. Did not succeed after ${attempts} attempts.`) } } From b73cea45d86040e4e2901daee738137674446710 Mon Sep 17 00:00:00 2001 From: Alexander Date: Tue, 29 Aug 2023 16:22:11 -0400 Subject: [PATCH 35/35] Bring back the logging of func name in the retry function --- src/utils/network-monitor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/network-monitor.ts b/src/utils/network-monitor.ts index a31f22ff..68f03e33 100644 --- a/src/utils/network-monitor.ts +++ b/src/utils/network-monitor.ts @@ -2128,6 +2128,6 @@ export class NetworkMonitor { } } - throw new Error(`Maximum attempts reached. Did not succeed after ${attempts} attempts.`) + throw new Error(`Maximum attempts reached for ${func.name}, function did not succeed after ${attempts} attempts`) } }