diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts index ecba02e0d5466..7e921d8c571df 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({ ), }); +export const isQueuedOp = (op: ReindexSavedObject) => + Boolean(op.attributes.reindexOptions?.queueSettings); + export const queuedOpHasStarted = (op: ReindexSavedObject) => Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index 06349d069a813..6c73b861a162d 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -10,12 +10,18 @@ import { Credential, CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils'; +import { sortAndOrderReindexOperations, queuedOpHasStarted, isQueuedOp } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. const PAUSE_WINDOW = POLL_INTERVAL * 4; +/** + * To avoid running the worker loop very tightly and causing a CPU bottleneck we use this + * padding to simulate an asynchronous sleep. See the description of the tight loop below. + */ +const WORKER_PADDING_MS = 1000; + /** * A singleton worker that will coordinate two polling loops: * (1) A longer loop that polls for reindex operations that are in progress. If any are found, loop (2) is started. @@ -104,16 +110,25 @@ export class ReindexWorker { /** * Runs an async loop until all inProgress jobs are complete or failed. */ - private startUpdateOperationLoop = async () => { + private startUpdateOperationLoop = async (): Promise => { this.updateOperationLoopRunning = true; - try { while (this.inProgressOps.length > 0) { this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); // Push each operation through the state machine and refresh. await Promise.all(this.inProgressOps.map(this.processNextStep)); + await this.refresh(); + + if ( + this.inProgressOps.length && + this.inProgressOps.every(op => !this.credentialStore.get(op)) + ) { + // TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added. + // This scheduler should be revisited in future. + await new Promise(resolve => setTimeout(resolve, WORKER_PADDING_MS)); + } } } finally { this.updateOperationLoopRunning = false; @@ -181,20 +196,32 @@ export class ReindexWorker { } }; - private processNextStep = async (reindexOp: ReindexSavedObject) => { + private lastCheckedQueuedOpId: string | undefined; + private processNextStep = async (reindexOp: ReindexSavedObject): Promise => { const credential = this.credentialStore.get(reindexOp); if (!credential) { - // Set to paused state if the job hasn't been updated in PAUSE_WINDOW. + // If this is a queued reindex op, and we know there can only ever be one in progress at a + // given time, there is a small chance it may have just reached the front of the queue so + // we give it a chance to be updated by another worker with credentials by making this a + // noop once. If it has not been updated by the next loop we will mark it paused if it + // falls outside of PAUSE_WINDOW. + if (isQueuedOp(reindexOp)) { + if (this.lastCheckedQueuedOpId !== reindexOp.id) { + this.lastCheckedQueuedOpId = reindexOp.id; + return; + } + } // This indicates that no Kibana nodes currently have credentials to update this job. const now = moment(); const updatedAt = moment(reindexOp.updated_at); if (updatedAt < now.subtract(PAUSE_WINDOW)) { - return this.reindexService.pauseReindexOperation(reindexOp.attributes.indexName); + await this.reindexService.pauseReindexOperation(reindexOp.attributes.indexName); + return; } else { // If it has been updated recently, we assume another node has the necessary credentials, // and this becomes a noop. - return reindexOp; + return; } }