From f05d0fa0bf2235950326dd82ffa78731255aab9b Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 8 Aug 2019 11:29:57 -0700 Subject: [PATCH 1/7] [Code] interrupt clone if disk usage goes above watermark during clone --- .../legacy/plugins/code/model/repository.ts | 2 + .../esqueue/helpers/cancellation_token.d.ts | 4 +- .../lib/esqueue/helpers/cancellation_token.js | 7 +-- .../code/server/queue/cancellation_service.ts | 28 ++++++--- .../plugins/code/server/queue/clone_worker.ts | 59 +++++++++++++++++-- .../code/server/queue/delete_worker.ts | 8 +-- .../plugins/code/server/queue/index_worker.ts | 4 +- .../code/server/queue/update_worker.ts | 4 +- .../plugins/code/server/repository_service.ts | 12 ++-- 9 files changed, 96 insertions(+), 32 deletions(-) diff --git a/x-pack/legacy/plugins/code/model/repository.ts b/x-pack/legacy/plugins/code/model/repository.ts index e74305ae21e6..e01902fd01a5 100644 --- a/x-pack/legacy/plugins/code/model/repository.ts +++ b/x-pack/legacy/plugins/code/model/repository.ts @@ -5,6 +5,7 @@ */ import { IndexRequest } from './search'; +import { CancellationReason } from '../server/queue/cancellation_service'; export type RepositoryUri = string; @@ -86,6 +87,7 @@ export enum FileTreeItemType { export interface WorkerResult { uri: string; cancelled?: boolean; + cancelledReason?: CancellationReason; } // TODO(mengwei): create a AbstractGitWorkerResult since we now have an diff --git a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts index 0cdee0927a91..b0452b61022d 100644 --- a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts +++ b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts @@ -5,6 +5,6 @@ */ export class CancellationToken { - public on(callback: () => void): void; - public cancel(): void; + public on(callback: (reason: string) => void): void; + public cancel(reason: string): void; } diff --git a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js index f874b857889f..afb9fa79f0d3 100644 --- a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js +++ b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js @@ -16,15 +16,14 @@ export class CancellationToken { } if (this.isCancelled) { - callback(); - return; + throw new Error('Cancellation token has already been cancelled'); } this._callbacks.push(callback); }; - cancel = () => { + cancel = (reason) => { this.isCancelled = true; - this._callbacks.forEach(callback => callback()); + this._callbacks.forEach(callback => callback(reason)); }; } diff --git a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts index 251d767ce26d..cc6631c5247b 100644 --- a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts +++ b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts @@ -12,6 +12,12 @@ interface CancellableJob { jobPromise: Promise; } +export enum CancellationReason { + REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository', + CLONE_OVER_DISK_WATERMARK = 'Cancel job because of low disk waterwark', + NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered', +} + export class CancellationSerivce { private cloneCancellationMap: Map; private updateCancellationMap: Map; @@ -23,16 +29,16 @@ export class CancellationSerivce { this.indexCancellationMap = new Map(); } - public async cancelCloneJob(repoUri: RepositoryUri) { - await this.cancelJob(this.cloneCancellationMap, repoUri); + public async cancelCloneJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.cloneCancellationMap, repoUri, reason); } - public async cancelUpdateJob(repoUri: RepositoryUri) { - await this.cancelJob(this.updateCancellationMap, repoUri); + public async cancelUpdateJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.updateCancellationMap, repoUri, reason); } - public async cancelIndexJob(repoUri: RepositoryUri) { - await this.cancelJob(this.indexCancellationMap, repoUri); + public async cancelIndexJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.indexCancellationMap, repoUri, reason); } public async registerCancelableCloneJob( @@ -66,7 +72,7 @@ export class CancellationSerivce { jobPromise: Promise ) { // Try to cancel the job first. - await this.cancelJob(jobMap, repoUri); + await this.cancelJob(jobMap, repoUri, CancellationReason.NEW_JOB_OVERRIDEN); jobMap.set(repoUri, { token, jobPromise }); // remove the record from the cancellation service when the promise is fulfilled or rejected. jobPromise.finally(() => { @@ -74,12 +80,16 @@ export class CancellationSerivce { }); } - private async cancelJob(jobMap: Map, repoUri: RepositoryUri) { + private async cancelJob( + jobMap: Map, + repoUri: RepositoryUri, + reason: CancellationReason + ) { const payload = jobMap.get(repoUri); if (payload) { const { token, jobPromise } = payload; // 1. Use the cancellation token to pass cancel message to job - token.cancel(); + token.cancel(reason); // 2. waiting on the actual job promise to be resolved try { await jobPromise; diff --git a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts index 4761a2bb20b2..f5ca1b7a9ef2 100644 --- a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts @@ -5,6 +5,7 @@ */ import { delay } from 'lodash'; +import { i18n } from '@kbn/i18n'; import { validateGitUrl } from '../../common/git_url_utils'; import { RepositoryUtils } from '../../common/repository_utils'; @@ -21,7 +22,7 @@ import { Logger } from '../log'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { AbstractGitWorker } from './abstract_git_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { IndexWorker } from './index_worker'; import { Job } from './job'; @@ -73,22 +74,48 @@ export class CloneWorker extends AbstractGitWorker { const repo = RepositoryUtils.buildRepository(url); // Try to cancel any existing clone job for this repository. - this.cancellationService.cancelCloneJob(repo.uri); + this.cancellationService.cancelCloneJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN); let cancelled = false; + let cancelledReason; if (cancellationToken) { - cancellationToken.on(() => { + cancellationToken.on((reason: string) => { cancelled = true; + cancelledReason = reason; }); } + const { thresholdEnabled, watermarkLowMb } = this.serverOptions.disk; + const cloneJobPromise = repoService.clone( repo, - (progress: number, cloneProgress?: CloneProgress) => { + async (progress: number, cloneProgress?: CloneProgress) => { if (cancelled) { // return false to stop the clone progress return false; } + + // Keep an eye on the disk usage during clone in case it goes above the + // disk watermark config. + if (thresholdEnabled) { + const isLowWatermark = await this.watermarkService.isLowWatermark(); + if (isLowWatermark) { + const msg = i18n.translate('xpack.code.git.diskWatermarkLowMessage', { + defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, + values: { + watermarkLowMb, + }, + }); + this.log.error(msg); + // Cancel this clone job + if (cancellationToken) { + cancellationToken.cancel(CancellationReason.CLONE_OVER_DISK_WATERMARK); + } + // return false to stop the clone progress + return false; + } + } + // For clone job payload, it only has the url. Populate back the // repository uri before update progress. job.payload.uri = repo.uri; @@ -104,11 +131,33 @@ export class CloneWorker extends AbstractGitWorker { cloneJobPromise ); } - return await cloneJobPromise; + const res = await cloneJobPromise; + return { + ...res, + cancelled, + cancelledReason, + }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { if (res.cancelled) { + // If the clone job is cancelled because of the disk watermark, manually + // trigger onJobExecutionError to persist the clone error message. + if ( + res.cancelledReason && + res.cancelledReason === CancellationReason.CLONE_OVER_DISK_WATERMARK + ) { + const { watermarkLowMb } = this.serverOptions.disk; + const error = new Error( + i18n.translate('xpack.code.git.diskWatermarkLowMessage', { + defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, + values: { + watermarkLowMb, + }, + }) + ); + await this.onJobExecutionError({ job, error }); + } // Skip updating job progress if the job is done because of cancellation. return; } diff --git a/x-pack/legacy/plugins/code/server/queue/delete_worker.ts b/x-pack/legacy/plugins/code/server/queue/delete_worker.ts index d0ab45abb1ce..e75867141322 100644 --- a/x-pack/legacy/plugins/code/server/queue/delete_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/delete_worker.ts @@ -17,7 +17,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory'; import { RepositoryObjectClient } from '../search'; import { ServerOptions } from '../server_options'; import { AbstractWorker } from './abstract_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class DeleteWorker extends AbstractWorker { @@ -42,9 +42,9 @@ export class DeleteWorker extends AbstractWorker { const { uri } = job.payload; // 1. Cancel running workers - await this.cancellationService.cancelCloneJob(uri); - await this.cancellationService.cancelUpdateJob(uri); - await this.cancellationService.cancelIndexJob(uri); + await this.cancellationService.cancelCloneJob(uri, CancellationReason.REPOSITORY_DELETE); + await this.cancellationService.cancelUpdateJob(uri, CancellationReason.REPOSITORY_DELETE); + await this.cancellationService.cancelIndexJob(uri, CancellationReason.REPOSITORY_DELETE); // 2. Delete git repository and all related data. const repoService = this.repoServiceFactory.newInstance( diff --git a/x-pack/legacy/plugins/code/server/queue/index_worker.ts b/x-pack/legacy/plugins/code/server/queue/index_worker.ts index 85f315b48d02..76f7a8b9a8f7 100644 --- a/x-pack/legacy/plugins/code/server/queue/index_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/index_worker.ts @@ -23,7 +23,7 @@ import { Logger } from '../log'; import { RepositoryObjectClient } from '../search'; import { aggregateIndexStats } from '../utils/index_stats_aggregator'; import { AbstractWorker } from './abstract_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class IndexWorker extends AbstractWorker { @@ -84,7 +84,7 @@ export class IndexWorker extends AbstractWorker { // Binding the index cancellation logic let cancelled = false; - this.cancellationService.cancelIndexJob(uri); + this.cancellationService.cancelIndexJob(uri, CancellationReason.NEW_JOB_OVERRIDEN); const indexPromises: Array> = this.indexerFactories.map( async (indexerFactory: IndexerFactory, index: number) => { const indexer = await indexerFactory.create(uri, revision, enforceReindex); diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.ts index 60411c4dcae6..d56e612b7622 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.ts @@ -12,7 +12,7 @@ import { Logger } from '../log'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { AbstractGitWorker } from './abstract_git_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class UpdateWorker extends AbstractGitWorker { @@ -45,7 +45,7 @@ export class UpdateWorker extends AbstractGitWorker { ); // Try to cancel any existing update job for this repository. - this.cancellationService.cancelUpdateJob(repo.uri); + this.cancellationService.cancelUpdateJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN); let cancelled = false; if (cancellationToken) { diff --git a/x-pack/legacy/plugins/code/server/repository_service.ts b/x-pack/legacy/plugins/code/server/repository_service.ts index e0a39efb5eb2..b7159f000f10 100644 --- a/x-pack/legacy/plugins/code/server/repository_service.ts +++ b/x-pack/legacy/plugins/code/server/repository_service.ts @@ -22,7 +22,10 @@ import { import { Logger } from './log'; // Return false to stop the clone progress. Return true to keep going; -export type CloneProgressHandler = (progress: number, cloneProgress?: CloneProgress) => boolean; +export type CloneProgressHandler = ( + progress: number, + cloneProgress?: CloneProgress +) => Promise; export type UpdateProgressHandler = () => boolean; const GIT_FETCH_PROGRESS_CANCEL = -1; @@ -219,6 +222,7 @@ export class RepositoryService { throw SSH_AUTH_ERROR; } + private PROGRESS_UPDATE_THROTTLING_FREQ_MS = 1000; private async doClone( repo: Repository, localPath: string, @@ -228,10 +232,10 @@ export class RepositoryService { try { let lastProgressUpdate = moment(); const cbs: RemoteCallbacks = { - transferProgress: (stats: any) => { + transferProgress: async (stats: any) => { // Clone progress update throttling. const now = moment(); - if (now.diff(lastProgressUpdate) < 1000) { + if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) { return 0; } lastProgressUpdate = now; @@ -250,7 +254,7 @@ export class RepositoryService { indexedDeltas: stats.indexedDeltas(), receivedBytes: stats.receivedBytes(), }; - const resumeClone = handler(progress, cloneProgress); + const resumeClone = await handler(progress, cloneProgress); if (!resumeClone) { return GIT_FETCH_PROGRESS_CANCEL; } From 41b59337385283bd2e7a4e1f0f32201be833b6f2 Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 8 Aug 2019 11:56:09 -0700 Subject: [PATCH 2/7] minor refactor --- .../code/server/queue/abstract_git_worker.ts | 44 ++++++++++---- .../code/server/queue/cancellation_service.ts | 2 +- .../plugins/code/server/queue/clone_worker.ts | 59 +++++-------------- .../code/server/queue/update_worker.ts | 39 ++++++++++-- .../plugins/code/server/repository_service.ts | 14 ++++- 5 files changed, 94 insertions(+), 64 deletions(-) diff --git a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts index ae8e7ecae665..bd87719fd32c 100644 --- a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts @@ -18,6 +18,7 @@ import { Logger } from '../log'; import { RepositoryObjectClient } from '../search'; import { ServerOptions } from '../server_options'; import { AbstractWorker } from './abstract_worker'; +import { CancellationReason } from './cancellation_service'; import { Job } from './job'; export abstract class AbstractGitWorker extends AbstractWorker { @@ -36,23 +37,21 @@ export abstract class AbstractGitWorker extends AbstractWorker { this.objectClient = new RepositoryObjectClient(client); } - public async executeJob(_: Job): Promise { + public async executeJob(job: Job): Promise { + const uri = job.payload.uri; if (await this.watermarkService.isLowWatermark()) { - const msg = this.watermarkService.diskWatermarkViolationMessage(); - this.log.error(msg); - throw new Error(msg); + // Return job result as cancelled. + return { + uri, + cancelled: true, + cancelledReason: CancellationReason.LOW_DISK_SPACE, + }; } - return new Promise((resolve, reject) => { - resolve(); - }); + return { uri }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { - if (res.cancelled) { - // Skip updating job progress if the job is done because of cancellation. - return; - } await super.onJobCompleted(job, res); // Update the default branch. @@ -108,4 +107,27 @@ export abstract class AbstractGitWorker extends AbstractWorker { // this.log.warn(err); } } + + protected async onJobCancelled(job: Job, reason?: CancellationReason) { + if (reason && reason === CancellationReason.LOW_DISK_SPACE) { + // If the clone/update job is cancelled because of the disk watermark, manually + // trigger onJobExecutionError. + await this.handleLowDiskSpaceCancellation(job); + } + } + + private async handleLowDiskSpaceCancellation(job: Job) { + const { watermarkLowMb } = this.serverOptions.disk; + const msg = i18n.translate('xpack.code.git.diskWatermarkLowMessage', { + defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, + values: { + watermarkLowMb, + }, + }); + this.log.error( + 'Git clone/update job completed because of low disk space. Move forward as error.' + ); + const error = new Error(msg); + await this.onJobExecutionError({ job, error }); + } } diff --git a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts index cc6631c5247b..0679ec7986c7 100644 --- a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts +++ b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts @@ -14,7 +14,7 @@ interface CancellableJob { export enum CancellationReason { REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository', - CLONE_OVER_DISK_WATERMARK = 'Cancel job because of low disk waterwark', + LOW_DISK_SPACE = 'Cancel job because of low available disk space', NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered', } diff --git a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts index f5ca1b7a9ef2..9e406c200862 100644 --- a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts @@ -5,7 +5,6 @@ */ import { delay } from 'lodash'; -import { i18n } from '@kbn/i18n'; import { validateGitUrl } from '../../common/git_url_utils'; import { RepositoryUtils } from '../../common/repository_utils'; @@ -44,7 +43,10 @@ export class CloneWorker extends AbstractGitWorker { } public async executeJob(job: Job) { - await super.executeJob(job); + const superRes = await super.executeJob(job); + if (superRes.cancelled) { + return superRes; + } const { payload, cancellationToken } = job; const { url } = payload; @@ -85,8 +87,6 @@ export class CloneWorker extends AbstractGitWorker { }); } - const { thresholdEnabled, watermarkLowMb } = this.serverOptions.disk; - const cloneJobPromise = repoService.clone( repo, async (progress: number, cloneProgress?: CloneProgress) => { @@ -97,23 +97,13 @@ export class CloneWorker extends AbstractGitWorker { // Keep an eye on the disk usage during clone in case it goes above the // disk watermark config. - if (thresholdEnabled) { - const isLowWatermark = await this.watermarkService.isLowWatermark(); - if (isLowWatermark) { - const msg = i18n.translate('xpack.code.git.diskWatermarkLowMessage', { - defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, - values: { - watermarkLowMb, - }, - }); - this.log.error(msg); - // Cancel this clone job - if (cancellationToken) { - cancellationToken.cancel(CancellationReason.CLONE_OVER_DISK_WATERMARK); - } - // return false to stop the clone progress - return false; + if (await this.watermarkService.isLowWatermark()) { + // Cancel this clone job + if (cancellationToken) { + cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE); } + // return false to stop the clone progress + return false; } // For clone job payload, it only has the url. Populate back the @@ -141,38 +131,21 @@ export class CloneWorker extends AbstractGitWorker { public async onJobCompleted(job: Job, res: CloneWorkerResult) { if (res.cancelled) { - // If the clone job is cancelled because of the disk watermark, manually - // trigger onJobExecutionError to persist the clone error message. - if ( - res.cancelledReason && - res.cancelledReason === CancellationReason.CLONE_OVER_DISK_WATERMARK - ) { - const { watermarkLowMb } = this.serverOptions.disk; - const error = new Error( - i18n.translate('xpack.code.git.diskWatermarkLowMessage', { - defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, - values: { - watermarkLowMb, - }, - }) - ); - await this.onJobExecutionError({ job, error }); - } + await this.onJobCancelled(job, res.cancelledReason); // Skip updating job progress if the job is done because of cancellation. return; } - this.log.info(`Clone job done for ${res.repo.uri}`); + + const { uri, revision } = res.repo!; + this.log.info(`Clone job done for ${uri}`); // For clone job payload, it only has the url. Populate back the // repository uri. - job.payload.uri = res.repo.uri; + job.payload.uri = uri; await super.onJobCompleted(job, res); // Throw out a repository index request after 1 second. return delay(async () => { - const payload = { - uri: res.repo.uri, - revision: res.repo.revision, - }; + const payload = { uri, revision }; await this.indexWorker.enqueueJob(payload, {}); }, 1000); } diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.ts index d56e612b7622..82e787023645 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.ts @@ -32,7 +32,10 @@ export class UpdateWorker extends AbstractGitWorker { } public async executeJob(job: Job) { - await super.executeJob(job); + const superRes = await super.executeJob(job); + if (superRes.cancelled) { + return superRes; + } const { payload, cancellationToken } = job; const repo: Repository = payload; @@ -48,17 +51,31 @@ export class UpdateWorker extends AbstractGitWorker { this.cancellationService.cancelUpdateJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN); let cancelled = false; + let cancelledReason; if (cancellationToken) { - cancellationToken.on(() => { + cancellationToken.on((reason: string) => { cancelled = true; + cancelledReason = reason; }); } - const updateJobPromise = repoService.update(repo, () => { + const updateJobPromise = repoService.update(repo, async () => { if (cancelled) { - // return false to stop the clone progress + // return false to stop the update progress + return false; + } + + // Keep an eye on the disk usage during update in case it goes above the + // disk watermark config. + if (await this.watermarkService.isLowWatermark()) { + // Cancel this update job + if (cancellationToken) { + cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE); + } + // return false to stop the update progress return false; } + return true; }); @@ -69,11 +86,21 @@ export class UpdateWorker extends AbstractGitWorker { updateJobPromise ); } - - return await updateJobPromise; + const res = await updateJobPromise; + return { + ...res, + cancelled, + cancelledReason, + }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { + if (res.cancelled) { + await this.onJobCancelled(job); + // Skip updating job progress if the job is done because of cancellation. + return; + } + this.log.info(`Update job done for ${job.payload.uri}`); return await super.onJobCompleted(job, res); } diff --git a/x-pack/legacy/plugins/code/server/repository_service.ts b/x-pack/legacy/plugins/code/server/repository_service.ts index b7159f000f10..213590285c07 100644 --- a/x-pack/legacy/plugins/code/server/repository_service.ts +++ b/x-pack/legacy/plugins/code/server/repository_service.ts @@ -26,7 +26,7 @@ export type CloneProgressHandler = ( progress: number, cloneProgress?: CloneProgress ) => Promise; -export type UpdateProgressHandler = () => boolean; +export type UpdateProgressHandler = () => Promise; const GIT_FETCH_PROGRESS_CANCEL = -1; // TODO: Cannot directly access Git.Error.CODE.EUSER (-7). Investigate why. @@ -130,10 +130,18 @@ export class RepositoryService { let repo: Git.Repository | undefined; try { repo = await Git.Repository.open(localPath); + let lastProgressUpdate = moment(); const cbs: RemoteCallbacks = { - transferProgress: (_: any) => { + transferProgress: async (_: any) => { + // Update progress update throttling. + const now = moment(); + if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) { + return 0; + } + lastProgressUpdate = now; + if (handler) { - const resumeUpdate = handler(); + const resumeUpdate = await handler(); if (!resumeUpdate) { return GIT_FETCH_PROGRESS_CANCEL; } From 324b45950d2098ab564a08228e77dd7a37baafa9 Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 8 Aug 2019 13:02:37 -0700 Subject: [PATCH 3/7] add unit tests --- .../legacy/plugins/code/model/repository.ts | 2 +- .../code/server/__tests__/clone_worker.ts | 65 +++++++++++++++---- .../plugins/code/server/queue/clone_worker.ts | 4 +- .../code/server/queue/update_worker.test.ts | 40 ++++++++++-- .../code/server/queue/update_worker.ts | 2 +- 5 files changed, 89 insertions(+), 24 deletions(-) diff --git a/x-pack/legacy/plugins/code/model/repository.ts b/x-pack/legacy/plugins/code/model/repository.ts index e01902fd01a5..959869a60856 100644 --- a/x-pack/legacy/plugins/code/model/repository.ts +++ b/x-pack/legacy/plugins/code/model/repository.ts @@ -93,7 +93,7 @@ export interface WorkerResult { // TODO(mengwei): create a AbstractGitWorkerResult since we now have an // AbstractGitWorker now. export interface CloneWorkerResult extends WorkerResult { - repo: Repository; + repo?: Repository; } export interface DeleteWorkerResult extends WorkerResult { diff --git a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts index 988f4d75d53e..964b663d9d16 100644 --- a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts @@ -12,13 +12,13 @@ import path from 'path'; import rimraf from 'rimraf'; import sinon from 'sinon'; -import { Repository } from '../../model'; +import { CloneWorkerResult, Repository } from '../../model'; import { DiskWatermarkService } from '../disk_watermark'; import { GitOperations } from '../git_operations'; import { EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { CloneWorker, IndexWorker } from '../queue'; -import { CancellationSerivce } from '../queue/cancellation_service'; +import { CancellationReason, CancellationSerivce } from '../queue/cancellation_service'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { createTestServerOption, emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; @@ -372,34 +372,34 @@ describe('clone_worker_tests', () => { diskWatermarkService as DiskWatermarkService ); - const result1 = await cloneWorker.executeJob({ + const result1 = (await cloneWorker.executeJob({ payload: { url: 'file:///foo/bar.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; - assert.ok(result1.repo === null); + assert.ok(!result1.repo); assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); assert.ok(isLowWatermarkSpy.calledOnce); - const result2 = await cloneWorker.executeJob({ + const result2 = (await cloneWorker.executeJob({ payload: { url: '/foo/bar.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; - assert.ok(result2.repo === null); + assert.ok(!result2.repo); assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); assert.ok(isLowWatermarkSpy.calledTwice); }); - it('Execute clone job failed because of low disk watermark', async () => { + it('Execute clone job failed because of low available disk space', async () => { // Setup RepositoryService const cloneSpy = sinon.spy(); const repoService = { @@ -430,26 +430,41 @@ describe('clone_worker_tests', () => { isLowWatermark: isLowWatermarkSpy, }; + // Setup EsClient + const updateSpy = sinon.spy(); + const esClient = { + update: emptyAsyncFunc, + }; + esClient.update = updateSpy; + + // Setup IndexWorker + const enqueueJobSpy = sinon.spy(); + const indexWorker = { + enqueueJob: emptyAsyncFunc, + }; + indexWorker.enqueueJob = enqueueJobSpy; + const cloneWorker = new CloneWorker( esQueue as Esqueue, log, - {} as EsClient, + esClient as EsClient, serverOptions, gitOps, - {} as IndexWorker, + (indexWorker as any) as IndexWorker, (repoServiceFactory as any) as RepositoryServiceFactory, cancellationService as CancellationSerivce, diskWatermarkService as DiskWatermarkService ); + let res: CloneWorkerResult = { uri: 'github.com/Microsoft/TypeScript-Node-Starter' }; try { - await cloneWorker.executeJob({ + res = (await cloneWorker.executeJob({ payload: { url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; // This step should not be touched. assert.ok(false); } catch (error) { @@ -457,5 +472,29 @@ describe('clone_worker_tests', () => { assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); } + + assert.ok(res.cancelled); + assert.ok(res.cancelledReason === CancellationReason.LOW_DISK_SPACE); + + const onJobExecutionErrorSpy = sinon.spy(); + cloneWorker.onJobExecutionError = onJobExecutionErrorSpy; + + await cloneWorker.onJobCompleted( + { + payload: { + url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git', + }, + options: {}, + timestamp: 0, + }, + res + ); + + assert.ok(onJobExecutionErrorSpy.calledOnce); + // Non of the follow up steps of a normal complete job should not be called + // because the job is going to be forwarded as execution error. + assert.ok(updateSpy.notCalled); + await delay(1000); + assert.ok(enqueueJobSpy.notCalled); }); }); diff --git a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts index 9e406c200862..2c6e004a062b 100644 --- a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts @@ -61,9 +61,7 @@ export class CloneWorker extends AbstractGitWorker { this.log.error(error); return { uri: url, - // Return a null repo for invalid git url. - repo: null, - }; + } as CloneWorkerResult; } this.log.info(`Execute clone job for ${url}`); diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts index 2d4a70a101a9..bae9b0fc023c 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts @@ -7,7 +7,7 @@ import sinon from 'sinon'; import { EsClient, Esqueue } from '../lib/esqueue'; -import { Repository } from '../../model'; +import { Repository, UpdateWorkerResult } from '../../model'; import { DiskWatermarkService } from '../disk_watermark'; import { GitOperations } from '../git_operations'; import { Logger } from '../log'; @@ -15,7 +15,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { UpdateWorker } from './update_worker'; const log: Logger = new ConsoleLoggerFactory().getLogger(['test']); @@ -141,6 +141,7 @@ test('On update job completed because of cancellation ', async () => { } as any) as Repository, // Update job is done because of cancellation. cancelled: true, + cancelledReason: CancellationReason.REPOSITORY_DELETE, } ); @@ -149,7 +150,7 @@ test('On update job completed because of cancellation ', async () => { expect(updateSpy.notCalled).toBeTruthy(); }); -test('Execute update job failed because of low disk watermark ', async () => { +test('Execute update job failed because of low available disk space', async () => { // Setup RepositoryService const updateSpy = sinon.spy(); const repoService = { @@ -199,14 +200,19 @@ test('Execute update job failed because of low disk watermark ', async () => { diskWatermarkService as DiskWatermarkService ); + let res: UpdateWorkerResult = { + uri: 'mockrepo', + branch: 'mockbranch', + revision: 'mockrevision', + }; try { - await updateWorker.executeJob({ + res = (await updateWorker.executeJob({ payload: { uri: 'mockrepo', }, options: {}, timestamp: 0, - }); + })) as UpdateWorkerResult; // This step should not be touched. expect(false).toBeTruthy(); } catch (error) { @@ -215,9 +221,31 @@ test('Execute update job failed because of low disk watermark ', async () => { expect(newInstanceSpy.notCalled).toBeTruthy(); expect(updateSpy.notCalled).toBeTruthy(); } + + expect(res.cancelled).toBeTruthy(); + expect(res.cancelledReason).toEqual(CancellationReason.LOW_DISK_SPACE); + + const onJobExecutionErrorSpy = sinon.spy(); + updateWorker.onJobExecutionError = onJobExecutionErrorSpy; + + await updateWorker.onJobCompleted( + { + payload: { + uri: 'mockrepo', + }, + options: {}, + timestamp: 0, + }, + res + ); + + expect(onJobExecutionErrorSpy.calledOnce).toBeTruthy(); + // Non of the follow up steps of a normal complete job should not be called + // because the job is going to be forwarded as execution error. + expect(updateSpy.notCalled).toBeTruthy(); }); -test('On update job error or timeout will not persis error', async () => { +test('On update job error or timeout will not persist as error', async () => { // Setup EsClient const esUpdateSpy = sinon.spy(); esClient.update = esUpdateSpy; diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.ts index 82e787023645..a434ffd61ac8 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.ts @@ -96,7 +96,7 @@ export class UpdateWorker extends AbstractGitWorker { public async onJobCompleted(job: Job, res: CloneWorkerResult) { if (res.cancelled) { - await this.onJobCancelled(job); + await this.onJobCancelled(job, res.cancelledReason); // Skip updating job progress if the job is done because of cancellation. return; } From b53a5ad66d892fbff85c05e9a214088b5a352292 Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 8 Aug 2019 13:07:57 -0700 Subject: [PATCH 4/7] minor change --- .../server/lib/esqueue/helpers/cancellation_token.js | 2 +- .../code/server/queue/cancellation_service.test.ts | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js index afb9fa79f0d3..0996d9725132 100644 --- a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js +++ b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js @@ -16,7 +16,7 @@ export class CancellationToken { } if (this.isCancelled) { - throw new Error('Cancellation token has already been cancelled'); + return; } this._callbacks.push(callback); diff --git a/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts b/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts index 735a32a05594..475925d7c82c 100644 --- a/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts @@ -8,7 +8,7 @@ import { CancellationToken } from '../lib/esqueue'; import sinon from 'sinon'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; afterEach(() => { sinon.restore(); @@ -30,9 +30,9 @@ test('Register and cancel cancellation token', async () => { const promise = new Promise(resolve => { promiseResolve = resolve; }); - await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise); + await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise); // do not wait on the promise, or there will be a dead lock - const cancelPromise = service.cancelIndexJob(repoUri); + const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN); // resolve the promise now promiseResolve(); @@ -57,10 +57,10 @@ test('Register and cancel cancellation token while an exception is thrown from t const promise = new Promise((resolve, reject) => { promiseReject = reject; }); - await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise); + await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise); // expect no exceptions are thrown when cancelling the job // do not wait on the promise, or there will be a dead lock - const cancelPromise = service.cancelIndexJob(repoUri); + const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN); // reject the promise now promiseReject(); From 57e2b829600270fd32d71167f95c270a22a814ec Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Thu, 8 Aug 2019 15:19:46 -0700 Subject: [PATCH 5/7] fix type check --- x-pack/legacy/plugins/code/server/queue/index_worker.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts b/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts index 360fdc002970..39ac1fc75f29 100644 --- a/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts @@ -17,7 +17,7 @@ import { CancellationToken, EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { IndexWorker } from './index_worker'; const log: Logger = new ConsoleLoggerFactory().getLogger(['test']); @@ -164,7 +164,7 @@ test('Execute index job and then cancel.', async () => { }); // Cancel the index job. - cToken.cancel(); + cToken.cancel(CancellationReason.REPOSITORY_DELETE); expect(cancelIndexJobSpy.calledOnce).toBeTruthy(); expect(getSpy.calledOnce).toBeTruthy(); From 91612b5a969dcda2e2ca5913842681b900c1feee Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Fri, 9 Aug 2019 12:50:39 -0700 Subject: [PATCH 6/7] minor fix --- .../code/server/queue/abstract_git_worker.ts | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts index bd87719fd32c..57d051f870f6 100644 --- a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts @@ -112,22 +112,12 @@ export abstract class AbstractGitWorker extends AbstractWorker { if (reason && reason === CancellationReason.LOW_DISK_SPACE) { // If the clone/update job is cancelled because of the disk watermark, manually // trigger onJobExecutionError. - await this.handleLowDiskSpaceCancellation(job); + const msg = this.watermarkService.diskWatermarkViolationMessage(); + this.log.error( + 'Git clone/update job completed because of low disk space. Move forward as error.' + ); + const error = new Error(msg); + await this.onJobExecutionError({ job, error }); } } - - private async handleLowDiskSpaceCancellation(job: Job) { - const { watermarkLowMb } = this.serverOptions.disk; - const msg = i18n.translate('xpack.code.git.diskWatermarkLowMessage', { - defaultMessage: `Disk watermark level lower than {watermarkLowMb} MB`, - values: { - watermarkLowMb, - }, - }); - this.log.error( - 'Git clone/update job completed because of low disk space. Move forward as error.' - ); - const error = new Error(msg); - await this.onJobExecutionError({ job, error }); - } } From dd95bdfff85aab7db5823617a0d33d36792b0f70 Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Fri, 9 Aug 2019 13:34:49 -0700 Subject: [PATCH 7/7] fix test --- x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts | 1 + x-pack/legacy/plugins/code/server/queue/update_worker.test.ts | 1 + 2 files changed, 2 insertions(+) diff --git a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts index 964b663d9d16..2046a973f0c8 100644 --- a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts @@ -428,6 +428,7 @@ describe('clone_worker_tests', () => { const isLowWatermarkSpy = sinon.stub().resolves(true); const diskWatermarkService: any = { isLowWatermark: isLowWatermarkSpy, + diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'), }; // Setup EsClient diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts index bae9b0fc023c..b868ee067e92 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts @@ -179,6 +179,7 @@ test('Execute update job failed because of low available disk space', async () = const isLowWatermarkSpy = sinon.stub().resolves(true); const diskWatermarkService: any = { isLowWatermark: isLowWatermarkSpy, + diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'), }; const updateWorker = new UpdateWorker(