Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.7] [UA] Tight worker loop can cause high CPU usage (#60950) #62001

Merged
merged 1 commit into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const orderQueuedReindexOperations = ({
),
});

export const isQueuedOp = (op: ReindexSavedObject) =>
Boolean(op.attributes.reindexOptions?.queueSettings);

export const queuedOpHasStarted = (op: ReindexSavedObject) =>
Boolean(op.attributes.reindexOptions?.queueSettings?.startedAt);

Expand Down
41 changes: 34 additions & 7 deletions x-pack/plugins/upgrade_assistant/server/lib/reindexing/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ import { Credential, CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations, queuedOpHasStarted } from './op_utils';
import { sortAndOrderReindexOperations, queuedOpHasStarted, isQueuedOp } from './op_utils';

const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
const PAUSE_WINDOW = POLL_INTERVAL * 4;

/**
* To avoid running the worker loop very tightly and causing a CPU bottleneck we use this
* padding to simulate an asynchronous sleep. See the description of the tight loop below.
*/
const WORKER_PADDING_MS = 1000;

/**
* A singleton worker that will coordinate two polling loops:
* (1) A longer loop that polls for reindex operations that are in progress. If any are found, loop (2) is started.
Expand Down Expand Up @@ -104,16 +110,25 @@ export class ReindexWorker {
/**
* Runs an async loop until all inProgress jobs are complete or failed.
*/
private startUpdateOperationLoop = async () => {
private startUpdateOperationLoop = async (): Promise<void> => {
this.updateOperationLoopRunning = true;

try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);

// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));

await this.refresh();

if (
this.inProgressOps.length &&
this.inProgressOps.every(op => !this.credentialStore.get(op))
) {
// TODO: This tight loop needs something to relax potentially high CPU demands so this padding is added.
// This scheduler should be revisited in future.
await new Promise(resolve => setTimeout(resolve, WORKER_PADDING_MS));
}
}
} finally {
this.updateOperationLoopRunning = false;
Expand Down Expand Up @@ -181,20 +196,32 @@ export class ReindexWorker {
}
};

private processNextStep = async (reindexOp: ReindexSavedObject) => {
private lastCheckedQueuedOpId: string | undefined;
private processNextStep = async (reindexOp: ReindexSavedObject): Promise<void> => {
const credential = this.credentialStore.get(reindexOp);

if (!credential) {
// Set to paused state if the job hasn't been updated in PAUSE_WINDOW.
// If this is a queued reindex op, and we know there can only ever be one in progress at a
// given time, there is a small chance it may have just reached the front of the queue so
// we give it a chance to be updated by another worker with credentials by making this a
// noop once. If it has not been updated by the next loop we will mark it paused if it
// falls outside of PAUSE_WINDOW.
if (isQueuedOp(reindexOp)) {
if (this.lastCheckedQueuedOpId !== reindexOp.id) {
this.lastCheckedQueuedOpId = reindexOp.id;
return;
}
}
// This indicates that no Kibana nodes currently have credentials to update this job.
const now = moment();
const updatedAt = moment(reindexOp.updated_at);
if (updatedAt < now.subtract(PAUSE_WINDOW)) {
return this.reindexService.pauseReindexOperation(reindexOp.attributes.indexName);
await this.reindexService.pauseReindexOperation(reindexOp.attributes.indexName);
return;
} else {
// If it has been updated recently, we assume another node has the necessary credentials,
// and this becomes a noop.
return reindexOp;
return;
}
}

Expand Down