From ae6358a4da0889a8b74a8ba7717d1c91f489780f Mon Sep 17 00:00:00 2001 From: Jean-Louis Leysens Date: Tue, 24 Mar 2020 11:23:49 +0100 Subject: [PATCH] Update worker scheduler and add a new util The worker scheduler should only sleep when it cannot process any in progress operations. Additionally, logic has been added for handling of queue operations that have been in the queue for a long time and may be viewed as still in small window of time by wokers that do not have the credentials to process those reindex operations. --- .../server/lib/reindexing/op_utils.ts | 3 ++ .../server/lib/reindexing/worker.ts | 34 ++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts index ecba02e0d5466..7e921d8c571df 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/op_utils.ts @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({ ), }); +export const isQueuedOp = (op: ReindexSavedObject) => + Boolean(op.attributes.reindexOptions?.queueSettings); + export const queuedOpHasStarted = (op: ReindexSavedObject) => Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt); diff --git a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts index 2a045ad7a57d8..9a597932963e1 100644 --- a/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts +++ b/x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts @@ -10,7 +10,7 @@ import { Credential, CredentialStore } from './credential_store'; import { reindexActionsFactory } from './reindex_actions'; import { ReindexService, reindexServiceFactory } from './reindex_service'; import { LicensingPluginSetup } from '../../../../licensing/server'; -import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils'; +import { sortAndOrderReindexOperations, queuedOpHasStarted, isQueuedOp } from './op_utils'; const POLL_INTERVAL = 30000; // If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused. @@ -20,7 +20,7 @@ const PAUSE_WINDOW = POLL_INTERVAL * 4; * To avoid running the worker loop very tightly and causing a CPU bottleneck we use this * padding to simulate an asynchronous sleep. See the description of the tight loop below. */ -const WORKER_PADDING_MS = 500; +const WORKER_PADDING_MS = 1000; /** * A singleton worker that will coordinate two polling loops: @@ -108,19 +108,25 @@ export class ReindexWorker { /** * Runs an async loop until all inProgress jobs are complete or failed. */ - private startUpdateOperationLoop = async () => { + private startUpdateOperationLoop = async (): Promise => { this.updateOperationLoopRunning = true; - try { while (this.inProgressOps.length > 0) { this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`); // Push each operation through the state machine and refresh. await Promise.all(this.inProgressOps.map(this.processNextStep)); - // TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added. - // This scheduler should be revisited in future. - await new Promise(res => setTimeout(res, WORKER_PADDING_MS)); + await this.refresh(); + + if ( + this.inProgressOps.length && + this.inProgressOps.every(op => !this.credentialStore.get(op)) + ) { + // TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added. + // This scheduler should be revisited in future. + await new Promise(res => setTimeout(res, WORKER_PADDING_MS)); + } } } finally { this.updateOperationLoopRunning = false; @@ -182,10 +188,22 @@ export class ReindexWorker { } }; - private processNextStep = async (reindexOp: ReindexSavedObject): void => { + private lastCheckedQueuedOpId: string | undefined; + private processNextStep = async (reindexOp: ReindexSavedObject): Promise => { const credential = this.credentialStore.get(reindexOp); if (!credential) { + // If this is a queued reindex op, and we know there can only ever be one in progress at a + // given time, there is a small chance it may have just reached the front of the queue so + // we give it a chance to be updated by another worker with credentials by making this a + // noop once. If it has not been updated by the next loop we will mark it paused if it + // falls outside of PAUSE_WINDOW. + if (isQueuedOp(reindexOp)) { + if (this.lastCheckedQueuedOpId !== reindexOp.id) { + this.lastCheckedQueuedOpId = reindexOp.id; + return; + } + } // This indicates that no Kibana nodes currently have credentials to update this job. const now = moment(); const updatedAt = moment(reindexOp.updated_at);