From 0891db0d53135b50f57298f9e8ed92c49f498266 Mon Sep 17 00:00:00 2001 From: Mengwei Ding Date: Mon, 12 Aug 2019 00:33:06 -0700 Subject: [PATCH] [Code] Cancel clone/update job in the middle if disk space over the watermark (#42890) * [Code] interrupt clone if disk usage goes above watermark during clone * minor refactor * add unit tests * minor change * fix type check * minor fix * fix test --- .../legacy/plugins/code/model/repository.ts | 4 +- .../code/server/__tests__/clone_worker.ts | 66 +++++++++++++++---- .../esqueue/helpers/cancellation_token.d.ts | 4 +- .../lib/esqueue/helpers/cancellation_token.js | 5 +- .../code/server/queue/abstract_git_worker.ts | 34 ++++++---- .../server/queue/cancellation_service.test.ts | 10 +-- .../code/server/queue/cancellation_service.ts | 28 +++++--- .../plugins/code/server/queue/clone_worker.ts | 50 +++++++++----- .../code/server/queue/delete_worker.ts | 8 +-- .../code/server/queue/index_worker.test.ts | 4 +- .../plugins/code/server/queue/index_worker.ts | 4 +- .../code/server/queue/update_worker.test.ts | 41 ++++++++++-- .../code/server/queue/update_worker.ts | 43 +++++++++--- .../plugins/code/server/repository_service.ts | 26 ++++++-- 14 files changed, 239 insertions(+), 88 deletions(-) diff --git a/x-pack/legacy/plugins/code/model/repository.ts b/x-pack/legacy/plugins/code/model/repository.ts index e74305ae21e6..959869a60856 100644 --- a/x-pack/legacy/plugins/code/model/repository.ts +++ b/x-pack/legacy/plugins/code/model/repository.ts @@ -5,6 +5,7 @@ */ import { IndexRequest } from './search'; +import { CancellationReason } from '../server/queue/cancellation_service'; export type RepositoryUri = string; @@ -86,12 +87,13 @@ export enum FileTreeItemType { export interface WorkerResult { uri: string; cancelled?: boolean; + cancelledReason?: CancellationReason; } // TODO(mengwei): create a AbstractGitWorkerResult since we now have an // AbstractGitWorker now. export interface CloneWorkerResult extends WorkerResult { - repo: Repository; + repo?: Repository; } export interface DeleteWorkerResult extends WorkerResult { diff --git a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts index 988f4d75d53e..2046a973f0c8 100644 --- a/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/__tests__/clone_worker.ts @@ -12,13 +12,13 @@ import path from 'path'; import rimraf from 'rimraf'; import sinon from 'sinon'; -import { Repository } from '../../model'; +import { CloneWorkerResult, Repository } from '../../model'; import { DiskWatermarkService } from '../disk_watermark'; import { GitOperations } from '../git_operations'; import { EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { CloneWorker, IndexWorker } from '../queue'; -import { CancellationSerivce } from '../queue/cancellation_service'; +import { CancellationReason, CancellationSerivce } from '../queue/cancellation_service'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { createTestServerOption, emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; @@ -372,34 +372,34 @@ describe('clone_worker_tests', () => { diskWatermarkService as DiskWatermarkService ); - const result1 = await cloneWorker.executeJob({ + const result1 = (await cloneWorker.executeJob({ payload: { url: 'file:///foo/bar.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; - assert.ok(result1.repo === null); + assert.ok(!result1.repo); assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); assert.ok(isLowWatermarkSpy.calledOnce); - const result2 = await cloneWorker.executeJob({ + const result2 = (await cloneWorker.executeJob({ payload: { url: '/foo/bar.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; - assert.ok(result2.repo === null); + assert.ok(!result2.repo); assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); assert.ok(isLowWatermarkSpy.calledTwice); }); - it('Execute clone job failed because of low disk watermark', async () => { + it('Execute clone job failed because of low available disk space', async () => { // Setup RepositoryService const cloneSpy = sinon.spy(); const repoService = { @@ -428,28 +428,44 @@ describe('clone_worker_tests', () => { const isLowWatermarkSpy = sinon.stub().resolves(true); const diskWatermarkService: any = { isLowWatermark: isLowWatermarkSpy, + diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'), + }; + + // Setup EsClient + const updateSpy = sinon.spy(); + const esClient = { + update: emptyAsyncFunc, }; + esClient.update = updateSpy; + + // Setup IndexWorker + const enqueueJobSpy = sinon.spy(); + const indexWorker = { + enqueueJob: emptyAsyncFunc, + }; + indexWorker.enqueueJob = enqueueJobSpy; const cloneWorker = new CloneWorker( esQueue as Esqueue, log, - {} as EsClient, + esClient as EsClient, serverOptions, gitOps, - {} as IndexWorker, + (indexWorker as any) as IndexWorker, (repoServiceFactory as any) as RepositoryServiceFactory, cancellationService as CancellationSerivce, diskWatermarkService as DiskWatermarkService ); + let res: CloneWorkerResult = { uri: 'github.com/Microsoft/TypeScript-Node-Starter' }; try { - await cloneWorker.executeJob({ + res = (await cloneWorker.executeJob({ payload: { url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git', }, options: {}, timestamp: 0, - }); + })) as CloneWorkerResult; // This step should not be touched. assert.ok(false); } catch (error) { @@ -457,5 +473,29 @@ describe('clone_worker_tests', () => { assert.ok(newInstanceSpy.notCalled); assert.ok(cloneSpy.notCalled); } + + assert.ok(res.cancelled); + assert.ok(res.cancelledReason === CancellationReason.LOW_DISK_SPACE); + + const onJobExecutionErrorSpy = sinon.spy(); + cloneWorker.onJobExecutionError = onJobExecutionErrorSpy; + + await cloneWorker.onJobCompleted( + { + payload: { + url: 'https://github.com/Microsoft/TypeScript-Node-Starter.git', + }, + options: {}, + timestamp: 0, + }, + res + ); + + assert.ok(onJobExecutionErrorSpy.calledOnce); + // Non of the follow up steps of a normal complete job should not be called + // because the job is going to be forwarded as execution error. + assert.ok(updateSpy.notCalled); + await delay(1000); + assert.ok(enqueueJobSpy.notCalled); }); }); diff --git a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts index 0cdee0927a91..b0452b61022d 100644 --- a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts +++ b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.d.ts @@ -5,6 +5,6 @@ */ export class CancellationToken { - public on(callback: () => void): void; - public cancel(): void; + public on(callback: (reason: string) => void): void; + public cancel(reason: string): void; } diff --git a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js index f874b857889f..0996d9725132 100644 --- a/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js +++ b/x-pack/legacy/plugins/code/server/lib/esqueue/helpers/cancellation_token.js @@ -16,15 +16,14 @@ export class CancellationToken { } if (this.isCancelled) { - callback(); return; } this._callbacks.push(callback); }; - cancel = () => { + cancel = (reason) => { this.isCancelled = true; - this._callbacks.forEach(callback => callback()); + this._callbacks.forEach(callback => callback(reason)); }; } diff --git a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts index ae8e7ecae665..57d051f870f6 100644 --- a/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/abstract_git_worker.ts @@ -18,6 +18,7 @@ import { Logger } from '../log'; import { RepositoryObjectClient } from '../search'; import { ServerOptions } from '../server_options'; import { AbstractWorker } from './abstract_worker'; +import { CancellationReason } from './cancellation_service'; import { Job } from './job'; export abstract class AbstractGitWorker extends AbstractWorker { @@ -36,23 +37,21 @@ export abstract class AbstractGitWorker extends AbstractWorker { this.objectClient = new RepositoryObjectClient(client); } - public async executeJob(_: Job): Promise { + public async executeJob(job: Job): Promise { + const uri = job.payload.uri; if (await this.watermarkService.isLowWatermark()) { - const msg = this.watermarkService.diskWatermarkViolationMessage(); - this.log.error(msg); - throw new Error(msg); + // Return job result as cancelled. + return { + uri, + cancelled: true, + cancelledReason: CancellationReason.LOW_DISK_SPACE, + }; } - return new Promise((resolve, reject) => { - resolve(); - }); + return { uri }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { - if (res.cancelled) { - // Skip updating job progress if the job is done because of cancellation. - return; - } await super.onJobCompleted(job, res); // Update the default branch. @@ -108,4 +107,17 @@ export abstract class AbstractGitWorker extends AbstractWorker { // this.log.warn(err); } } + + protected async onJobCancelled(job: Job, reason?: CancellationReason) { + if (reason && reason === CancellationReason.LOW_DISK_SPACE) { + // If the clone/update job is cancelled because of the disk watermark, manually + // trigger onJobExecutionError. + const msg = this.watermarkService.diskWatermarkViolationMessage(); + this.log.error( + 'Git clone/update job completed because of low disk space. Move forward as error.' + ); + const error = new Error(msg); + await this.onJobExecutionError({ job, error }); + } + } } diff --git a/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts b/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts index 735a32a05594..475925d7c82c 100644 --- a/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/cancellation_service.test.ts @@ -8,7 +8,7 @@ import { CancellationToken } from '../lib/esqueue'; import sinon from 'sinon'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; afterEach(() => { sinon.restore(); @@ -30,9 +30,9 @@ test('Register and cancel cancellation token', async () => { const promise = new Promise(resolve => { promiseResolve = resolve; }); - await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise); + await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise); // do not wait on the promise, or there will be a dead lock - const cancelPromise = service.cancelIndexJob(repoUri); + const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN); // resolve the promise now promiseResolve(); @@ -57,10 +57,10 @@ test('Register and cancel cancellation token while an exception is thrown from t const promise = new Promise((resolve, reject) => { promiseReject = reject; }); - await service.registerCancelableIndexJob(repoUri, token as CancellationToken, promise); + await service.registerCancelableIndexJob(repoUri, (token as any) as CancellationToken, promise); // expect no exceptions are thrown when cancelling the job // do not wait on the promise, or there will be a dead lock - const cancelPromise = service.cancelIndexJob(repoUri); + const cancelPromise = service.cancelIndexJob(repoUri, CancellationReason.NEW_JOB_OVERRIDEN); // reject the promise now promiseReject(); diff --git a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts index 251d767ce26d..0679ec7986c7 100644 --- a/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts +++ b/x-pack/legacy/plugins/code/server/queue/cancellation_service.ts @@ -12,6 +12,12 @@ interface CancellableJob { jobPromise: Promise; } +export enum CancellationReason { + REPOSITORY_DELETE = 'Cancel job because of deleting the entire repository', + LOW_DISK_SPACE = 'Cancel job because of low available disk space', + NEW_JOB_OVERRIDEN = 'Cancel job because of a new job of the same type has been registered', +} + export class CancellationSerivce { private cloneCancellationMap: Map; private updateCancellationMap: Map; @@ -23,16 +29,16 @@ export class CancellationSerivce { this.indexCancellationMap = new Map(); } - public async cancelCloneJob(repoUri: RepositoryUri) { - await this.cancelJob(this.cloneCancellationMap, repoUri); + public async cancelCloneJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.cloneCancellationMap, repoUri, reason); } - public async cancelUpdateJob(repoUri: RepositoryUri) { - await this.cancelJob(this.updateCancellationMap, repoUri); + public async cancelUpdateJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.updateCancellationMap, repoUri, reason); } - public async cancelIndexJob(repoUri: RepositoryUri) { - await this.cancelJob(this.indexCancellationMap, repoUri); + public async cancelIndexJob(repoUri: RepositoryUri, reason: CancellationReason) { + await this.cancelJob(this.indexCancellationMap, repoUri, reason); } public async registerCancelableCloneJob( @@ -66,7 +72,7 @@ export class CancellationSerivce { jobPromise: Promise ) { // Try to cancel the job first. - await this.cancelJob(jobMap, repoUri); + await this.cancelJob(jobMap, repoUri, CancellationReason.NEW_JOB_OVERRIDEN); jobMap.set(repoUri, { token, jobPromise }); // remove the record from the cancellation service when the promise is fulfilled or rejected. jobPromise.finally(() => { @@ -74,12 +80,16 @@ export class CancellationSerivce { }); } - private async cancelJob(jobMap: Map, repoUri: RepositoryUri) { + private async cancelJob( + jobMap: Map, + repoUri: RepositoryUri, + reason: CancellationReason + ) { const payload = jobMap.get(repoUri); if (payload) { const { token, jobPromise } = payload; // 1. Use the cancellation token to pass cancel message to job - token.cancel(); + token.cancel(reason); // 2. waiting on the actual job promise to be resolved try { await jobPromise; diff --git a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts index 4761a2bb20b2..2c6e004a062b 100644 --- a/x-pack/legacy/plugins/code/server/queue/clone_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/clone_worker.ts @@ -21,7 +21,7 @@ import { Logger } from '../log'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { AbstractGitWorker } from './abstract_git_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { IndexWorker } from './index_worker'; import { Job } from './job'; @@ -43,7 +43,10 @@ export class CloneWorker extends AbstractGitWorker { } public async executeJob(job: Job) { - await super.executeJob(job); + const superRes = await super.executeJob(job); + if (superRes.cancelled) { + return superRes; + } const { payload, cancellationToken } = job; const { url } = payload; @@ -58,9 +61,7 @@ export class CloneWorker extends AbstractGitWorker { this.log.error(error); return { uri: url, - // Return a null repo for invalid git url. - repo: null, - }; + } as CloneWorkerResult; } this.log.info(`Execute clone job for ${url}`); @@ -73,22 +74,36 @@ export class CloneWorker extends AbstractGitWorker { const repo = RepositoryUtils.buildRepository(url); // Try to cancel any existing clone job for this repository. - this.cancellationService.cancelCloneJob(repo.uri); + this.cancellationService.cancelCloneJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN); let cancelled = false; + let cancelledReason; if (cancellationToken) { - cancellationToken.on(() => { + cancellationToken.on((reason: string) => { cancelled = true; + cancelledReason = reason; }); } const cloneJobPromise = repoService.clone( repo, - (progress: number, cloneProgress?: CloneProgress) => { + async (progress: number, cloneProgress?: CloneProgress) => { if (cancelled) { // return false to stop the clone progress return false; } + + // Keep an eye on the disk usage during clone in case it goes above the + // disk watermark config. + if (await this.watermarkService.isLowWatermark()) { + // Cancel this clone job + if (cancellationToken) { + cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE); + } + // return false to stop the clone progress + return false; + } + // For clone job payload, it only has the url. Populate back the // repository uri before update progress. job.payload.uri = repo.uri; @@ -104,26 +119,31 @@ export class CloneWorker extends AbstractGitWorker { cloneJobPromise ); } - return await cloneJobPromise; + const res = await cloneJobPromise; + return { + ...res, + cancelled, + cancelledReason, + }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { if (res.cancelled) { + await this.onJobCancelled(job, res.cancelledReason); // Skip updating job progress if the job is done because of cancellation. return; } - this.log.info(`Clone job done for ${res.repo.uri}`); + + const { uri, revision } = res.repo!; + this.log.info(`Clone job done for ${uri}`); // For clone job payload, it only has the url. Populate back the // repository uri. - job.payload.uri = res.repo.uri; + job.payload.uri = uri; await super.onJobCompleted(job, res); // Throw out a repository index request after 1 second. return delay(async () => { - const payload = { - uri: res.repo.uri, - revision: res.repo.revision, - }; + const payload = { uri, revision }; await this.indexWorker.enqueueJob(payload, {}); }, 1000); } diff --git a/x-pack/legacy/plugins/code/server/queue/delete_worker.ts b/x-pack/legacy/plugins/code/server/queue/delete_worker.ts index d0ab45abb1ce..e75867141322 100644 --- a/x-pack/legacy/plugins/code/server/queue/delete_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/delete_worker.ts @@ -17,7 +17,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory'; import { RepositoryObjectClient } from '../search'; import { ServerOptions } from '../server_options'; import { AbstractWorker } from './abstract_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class DeleteWorker extends AbstractWorker { @@ -42,9 +42,9 @@ export class DeleteWorker extends AbstractWorker { const { uri } = job.payload; // 1. Cancel running workers - await this.cancellationService.cancelCloneJob(uri); - await this.cancellationService.cancelUpdateJob(uri); - await this.cancellationService.cancelIndexJob(uri); + await this.cancellationService.cancelCloneJob(uri, CancellationReason.REPOSITORY_DELETE); + await this.cancellationService.cancelUpdateJob(uri, CancellationReason.REPOSITORY_DELETE); + await this.cancellationService.cancelIndexJob(uri, CancellationReason.REPOSITORY_DELETE); // 2. Delete git repository and all related data. const repoService = this.repoServiceFactory.newInstance( diff --git a/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts b/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts index 360fdc002970..39ac1fc75f29 100644 --- a/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/index_worker.test.ts @@ -17,7 +17,7 @@ import { CancellationToken, EsClient, Esqueue } from '../lib/esqueue'; import { Logger } from '../log'; import { emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { IndexWorker } from './index_worker'; const log: Logger = new ConsoleLoggerFactory().getLogger(['test']); @@ -164,7 +164,7 @@ test('Execute index job and then cancel.', async () => { }); // Cancel the index job. - cToken.cancel(); + cToken.cancel(CancellationReason.REPOSITORY_DELETE); expect(cancelIndexJobSpy.calledOnce).toBeTruthy(); expect(getSpy.calledOnce).toBeTruthy(); diff --git a/x-pack/legacy/plugins/code/server/queue/index_worker.ts b/x-pack/legacy/plugins/code/server/queue/index_worker.ts index 85f315b48d02..76f7a8b9a8f7 100644 --- a/x-pack/legacy/plugins/code/server/queue/index_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/index_worker.ts @@ -23,7 +23,7 @@ import { Logger } from '../log'; import { RepositoryObjectClient } from '../search'; import { aggregateIndexStats } from '../utils/index_stats_aggregator'; import { AbstractWorker } from './abstract_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class IndexWorker extends AbstractWorker { @@ -84,7 +84,7 @@ export class IndexWorker extends AbstractWorker { // Binding the index cancellation logic let cancelled = false; - this.cancellationService.cancelIndexJob(uri); + this.cancellationService.cancelIndexJob(uri, CancellationReason.NEW_JOB_OVERRIDEN); const indexPromises: Array> = this.indexerFactories.map( async (indexerFactory: IndexerFactory, index: number) => { const indexer = await indexerFactory.create(uri, revision, enforceReindex); diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts index 2d4a70a101a9..b868ee067e92 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.test.ts @@ -7,7 +7,7 @@ import sinon from 'sinon'; import { EsClient, Esqueue } from '../lib/esqueue'; -import { Repository } from '../../model'; +import { Repository, UpdateWorkerResult } from '../../model'; import { DiskWatermarkService } from '../disk_watermark'; import { GitOperations } from '../git_operations'; import { Logger } from '../log'; @@ -15,7 +15,7 @@ import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { emptyAsyncFunc } from '../test_utils'; import { ConsoleLoggerFactory } from '../utils/console_logger_factory'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { UpdateWorker } from './update_worker'; const log: Logger = new ConsoleLoggerFactory().getLogger(['test']); @@ -141,6 +141,7 @@ test('On update job completed because of cancellation ', async () => { } as any) as Repository, // Update job is done because of cancellation. cancelled: true, + cancelledReason: CancellationReason.REPOSITORY_DELETE, } ); @@ -149,7 +150,7 @@ test('On update job completed because of cancellation ', async () => { expect(updateSpy.notCalled).toBeTruthy(); }); -test('Execute update job failed because of low disk watermark ', async () => { +test('Execute update job failed because of low available disk space', async () => { // Setup RepositoryService const updateSpy = sinon.spy(); const repoService = { @@ -178,6 +179,7 @@ test('Execute update job failed because of low disk watermark ', async () => { const isLowWatermarkSpy = sinon.stub().resolves(true); const diskWatermarkService: any = { isLowWatermark: isLowWatermarkSpy, + diskWatermarkViolationMessage: sinon.stub().returns('No enough disk space'), }; const updateWorker = new UpdateWorker( @@ -199,14 +201,19 @@ test('Execute update job failed because of low disk watermark ', async () => { diskWatermarkService as DiskWatermarkService ); + let res: UpdateWorkerResult = { + uri: 'mockrepo', + branch: 'mockbranch', + revision: 'mockrevision', + }; try { - await updateWorker.executeJob({ + res = (await updateWorker.executeJob({ payload: { uri: 'mockrepo', }, options: {}, timestamp: 0, - }); + })) as UpdateWorkerResult; // This step should not be touched. expect(false).toBeTruthy(); } catch (error) { @@ -215,9 +222,31 @@ test('Execute update job failed because of low disk watermark ', async () => { expect(newInstanceSpy.notCalled).toBeTruthy(); expect(updateSpy.notCalled).toBeTruthy(); } + + expect(res.cancelled).toBeTruthy(); + expect(res.cancelledReason).toEqual(CancellationReason.LOW_DISK_SPACE); + + const onJobExecutionErrorSpy = sinon.spy(); + updateWorker.onJobExecutionError = onJobExecutionErrorSpy; + + await updateWorker.onJobCompleted( + { + payload: { + uri: 'mockrepo', + }, + options: {}, + timestamp: 0, + }, + res + ); + + expect(onJobExecutionErrorSpy.calledOnce).toBeTruthy(); + // Non of the follow up steps of a normal complete job should not be called + // because the job is going to be forwarded as execution error. + expect(updateSpy.notCalled).toBeTruthy(); }); -test('On update job error or timeout will not persis error', async () => { +test('On update job error or timeout will not persist as error', async () => { // Setup EsClient const esUpdateSpy = sinon.spy(); esClient.update = esUpdateSpy; diff --git a/x-pack/legacy/plugins/code/server/queue/update_worker.ts b/x-pack/legacy/plugins/code/server/queue/update_worker.ts index 60411c4dcae6..a434ffd61ac8 100644 --- a/x-pack/legacy/plugins/code/server/queue/update_worker.ts +++ b/x-pack/legacy/plugins/code/server/queue/update_worker.ts @@ -12,7 +12,7 @@ import { Logger } from '../log'; import { RepositoryServiceFactory } from '../repository_service_factory'; import { ServerOptions } from '../server_options'; import { AbstractGitWorker } from './abstract_git_worker'; -import { CancellationSerivce } from './cancellation_service'; +import { CancellationReason, CancellationSerivce } from './cancellation_service'; import { Job } from './job'; export class UpdateWorker extends AbstractGitWorker { @@ -32,7 +32,10 @@ export class UpdateWorker extends AbstractGitWorker { } public async executeJob(job: Job) { - await super.executeJob(job); + const superRes = await super.executeJob(job); + if (superRes.cancelled) { + return superRes; + } const { payload, cancellationToken } = job; const repo: Repository = payload; @@ -45,20 +48,34 @@ export class UpdateWorker extends AbstractGitWorker { ); // Try to cancel any existing update job for this repository. - this.cancellationService.cancelUpdateJob(repo.uri); + this.cancellationService.cancelUpdateJob(repo.uri, CancellationReason.NEW_JOB_OVERRIDEN); let cancelled = false; + let cancelledReason; if (cancellationToken) { - cancellationToken.on(() => { + cancellationToken.on((reason: string) => { cancelled = true; + cancelledReason = reason; }); } - const updateJobPromise = repoService.update(repo, () => { + const updateJobPromise = repoService.update(repo, async () => { if (cancelled) { - // return false to stop the clone progress + // return false to stop the update progress + return false; + } + + // Keep an eye on the disk usage during update in case it goes above the + // disk watermark config. + if (await this.watermarkService.isLowWatermark()) { + // Cancel this update job + if (cancellationToken) { + cancellationToken.cancel(CancellationReason.LOW_DISK_SPACE); + } + // return false to stop the update progress return false; } + return true; }); @@ -69,11 +86,21 @@ export class UpdateWorker extends AbstractGitWorker { updateJobPromise ); } - - return await updateJobPromise; + const res = await updateJobPromise; + return { + ...res, + cancelled, + cancelledReason, + }; } public async onJobCompleted(job: Job, res: CloneWorkerResult) { + if (res.cancelled) { + await this.onJobCancelled(job, res.cancelledReason); + // Skip updating job progress if the job is done because of cancellation. + return; + } + this.log.info(`Update job done for ${job.payload.uri}`); return await super.onJobCompleted(job, res); } diff --git a/x-pack/legacy/plugins/code/server/repository_service.ts b/x-pack/legacy/plugins/code/server/repository_service.ts index e0a39efb5eb2..213590285c07 100644 --- a/x-pack/legacy/plugins/code/server/repository_service.ts +++ b/x-pack/legacy/plugins/code/server/repository_service.ts @@ -22,8 +22,11 @@ import { import { Logger } from './log'; // Return false to stop the clone progress. Return true to keep going; -export type CloneProgressHandler = (progress: number, cloneProgress?: CloneProgress) => boolean; -export type UpdateProgressHandler = () => boolean; +export type CloneProgressHandler = ( + progress: number, + cloneProgress?: CloneProgress +) => Promise; +export type UpdateProgressHandler = () => Promise; const GIT_FETCH_PROGRESS_CANCEL = -1; // TODO: Cannot directly access Git.Error.CODE.EUSER (-7). Investigate why. @@ -127,10 +130,18 @@ export class RepositoryService { let repo: Git.Repository | undefined; try { repo = await Git.Repository.open(localPath); + let lastProgressUpdate = moment(); const cbs: RemoteCallbacks = { - transferProgress: (_: any) => { + transferProgress: async (_: any) => { + // Update progress update throttling. + const now = moment(); + if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) { + return 0; + } + lastProgressUpdate = now; + if (handler) { - const resumeUpdate = handler(); + const resumeUpdate = await handler(); if (!resumeUpdate) { return GIT_FETCH_PROGRESS_CANCEL; } @@ -219,6 +230,7 @@ export class RepositoryService { throw SSH_AUTH_ERROR; } + private PROGRESS_UPDATE_THROTTLING_FREQ_MS = 1000; private async doClone( repo: Repository, localPath: string, @@ -228,10 +240,10 @@ export class RepositoryService { try { let lastProgressUpdate = moment(); const cbs: RemoteCallbacks = { - transferProgress: (stats: any) => { + transferProgress: async (stats: any) => { // Clone progress update throttling. const now = moment(); - if (now.diff(lastProgressUpdate) < 1000) { + if (now.diff(lastProgressUpdate) < this.PROGRESS_UPDATE_THROTTLING_FREQ_MS) { return 0; } lastProgressUpdate = now; @@ -250,7 +262,7 @@ export class RepositoryService { indexedDeltas: stats.indexedDeltas(), receivedBytes: stats.receivedBytes(), }; - const resumeClone = handler(progress, cloneProgress); + const resumeClone = await handler(progress, cloneProgress); if (!resumeClone) { return GIT_FETCH_PROGRESS_CANCEL; }