Skip to content

Commit

Permalink
Reporting: register a single ESQueue worker, simultaneous poll for a…
Browse files Browse the repository at this point in the history
…ll export types (elastic#32839)

* Reporting: register a single ESQueue worker, simultaneous poll for all export types

* more typescript

* PLUGIN_ID constant

* move down log / internal state

* fix tests

* jest test for createWorker

* assert arguments to queue.registerWorker

* logic move

* make ts ignore specific

* minor reversion to fix some esqueue worker tests
  • Loading branch information
tsullivan committed Mar 15, 2019
1 parent 87b7899 commit bef356f
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 73 deletions.
4 changes: 2 additions & 2 deletions docs/settings/reporting-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ security is enabled, `xpack.security.encryptionKey`.
============

`xpack.reporting.queue.pollInterval`::
Specifies the number of milliseconds that idle workers wait between polling the
index for pending jobs. Defaults to `3000` (3 seconds).
Specifies the number of milliseconds that the reporting poller waits between polling the
index for any pending Reporting jobs. Defaults to `3000` (3 seconds).

[[xpack-reporting-q-timeout]]`xpack.reporting.queue.timeout`::
How long each worker has to produce a report. If your machine is slow or under
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/reporting/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

export const PLUGIN_ID = 'reporting';

export const JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY =
'xpack.reporting.jobCompletionNotifications';

Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/reporting/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { resolve } from 'path';
import { UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants';
import { PLUGIN_ID, UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants';
import { mirrorPluginStatus } from '../../server/lib/mirror_plugin_status';
import { registerRoutes } from './server/routes';

Expand All @@ -26,7 +26,7 @@ const kbToBase64Length = (kb) => {

export const reporting = (kibana) => {
return new kibana.Plugin({
id: 'reporting',
id: PLUGIN_ID,
configPrefix: 'xpack.reporting',
publicDir: resolve(__dirname, 'public'),
require: ['kibana', 'elasticsearch', 'xpack_main'],
Expand Down Expand Up @@ -60,7 +60,7 @@ export const reporting = (kibana) => {
description: '200 kB',
}
},
category: ['reporting'],
category: [PLUGIN_ID],
}
}
},
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/reporting/server/lib/create_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import { Esqueue } from './esqueue';
import { createWorkersFactory } from './create_workers';
import { createWorkerFactory } from './create_worker';
import { oncePerServer } from './once_per_server';
import { createTaggedLogger } from './create_tagged_logger';

Expand All @@ -14,7 +14,7 @@ const dateSeparator = '.';
function createQueueFn(server) {
const queueConfig = server.config().get('xpack.reporting.queue');
const index = server.config().get('xpack.reporting.index');
const createWorkers = createWorkersFactory(server);
const createWorker = createWorkerFactory(server);

const logger = createTaggedLogger(server, ['reporting', 'esqueue']);
const queueOptions = {
Expand All @@ -29,7 +29,7 @@ function createQueueFn(server) {

if (queueConfig.pollEnabled) {
// create workers to poll the index for idle jobs waiting to be claimed and executed
createWorkers(queue);
createWorker(queue);
} else {
logger(
'xpack.reporting.queue.pollEnabled is set to false. This Kibana instance ' +
Expand Down
99 changes: 99 additions & 0 deletions x-pack/plugins/reporting/server/lib/create_worker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import * as sinon from 'sinon';
import { KbnServer } from '../../types';
import { createWorkerFactory } from './create_worker';
// @ts-ignore
import { Esqueue } from './esqueue';
// @ts-ignore
import { ClientMock } from './esqueue/__tests__/fixtures/elasticsearch';

const configGetStub = sinon.stub();
configGetStub.withArgs('xpack.reporting.queue').returns({
pollInterval: 3300,
pollIntervalErrorMultiplier: 10,
});
configGetStub.withArgs('server.name').returns('test-server-123');
configGetStub.withArgs('server.uuid').returns('g9ymiujthvy6v8yrh7567g6fwzgzftzfr');

const executeJobFactoryStub = sinon.stub();

const getMockServer = (
exportTypes: any[] = [{ executeJobFactory: executeJobFactoryStub }]
): Partial<KbnServer> => ({
log: sinon.stub(),
expose: sinon.stub(),
config: () => ({ get: configGetStub }),
plugins: { reporting: { exportTypesRegistry: { getAll: () => exportTypes } } },
});

describe('Create Worker', () => {
let queue: Esqueue;
let client: ClientMock;

beforeEach(() => {
client = new ClientMock();
queue = new Esqueue('reporting-queue', { client });
executeJobFactoryStub.reset();
});

test('Creates a single Esqueue worker for Reporting', async () => {
const createWorker = createWorkerFactory(getMockServer());
const registerWorkerSpy = sinon.spy(queue, 'registerWorker');

createWorker(queue);

sinon.assert.callCount(executeJobFactoryStub, 1);
sinon.assert.callCount(registerWorkerSpy, 1);

const { firstCall } = registerWorkerSpy;
const [workerName, workerFn, workerOpts] = firstCall.args;

expect(workerName).toBe('reporting');
expect(workerFn).toMatchInlineSnapshot(`[Function]`);
expect(workerOpts).toMatchInlineSnapshot(`
Object {
"interval": 3300,
"intervalErrorMultiplier": 10,
"kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr",
"kibanaName": "test-server-123",
}
`);
});

test('Creates a single Esqueue worker for Reporting, even if there are multiple export types', async () => {
const createWorker = createWorkerFactory(
getMockServer([
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
])
);
const registerWorkerSpy = sinon.spy(queue, 'registerWorker');

createWorker(queue);

sinon.assert.callCount(executeJobFactoryStub, 5);
sinon.assert.callCount(registerWorkerSpy, 1);

const { firstCall } = registerWorkerSpy;
const [workerName, workerFn, workerOpts] = firstCall.args;

expect(workerName).toBe('reporting');
expect(workerFn).toMatchInlineSnapshot(`[Function]`);
expect(workerOpts).toMatchInlineSnapshot(`
Object {
"interval": 3300,
"intervalErrorMultiplier": 10,
"kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr",
"kibanaName": "test-server-123",
}
`);
});
});
70 changes: 70 additions & 0 deletions x-pack/plugins/reporting/server/lib/create_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

// @ts-ignore
import { PLUGIN_ID } from '../../common/constants';
import {
ESQueueInstance,
ESQueueWorkerExecuteFn,
ExportType,
JobDoc,
JobSource,
KbnServer,
} from '../../types';
// @ts-ignore untyped dependency
import { events as esqueueEvents } from './esqueue';
// @ts-ignore untyped dependency
import { LevelLogger } from './level_logger';
// @ts-ignore untyped dependency
import { oncePerServer } from './once_per_server';

function createWorkerFn(server: KbnServer) {
const config = server.config();
const queueConfig = config.get('xpack.reporting.queue');
const kibanaName = config.get('server.name');
const kibanaId = config.get('server.uuid');
const exportTypesRegistry = server.plugins.reporting.exportTypesRegistry;
const logger = LevelLogger.createForServer(server, [PLUGIN_ID, 'queue', 'worker']);

// Once more document types are added, this will need to be passed in
return function createWorker(queue: ESQueueInstance) {
// export type / execute job map
const jobExectors: Map<string, ESQueueWorkerExecuteFn> = new Map();

for (const exportType of exportTypesRegistry.getAll() as ExportType[]) {
const executeJob = exportType.executeJobFactory(server);
jobExectors.set(exportType.jobType, executeJob);
}

const workerFn = (job: JobSource, jobdoc: JobDoc, cancellationToken: any) => {
// pass the work to the jobExecutor
const jobExecutor = jobExectors.get(job._source.jobtype);
if (!jobExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${job._id}]`);
}
return jobExecutor(jobdoc, cancellationToken);
};
const workerOptions = {
kibanaName,
kibanaId,
interval: queueConfig.pollInterval,
intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier,
};
const worker = queue.registerWorker(PLUGIN_ID, workerFn, workerOptions);

worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res: any) => {
logger.debug(`Worker completed: (${res.job.id})`);
});
worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res: any) => {
logger.debug(`Worker error: (${res.job.id})`);
});
worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res: any) => {
logger.debug(`Job timeout exceeded: (${res.job.id})`);
});
};
}

export const createWorkerFactory = oncePerServer(createWorkerFn);
45 changes: 0 additions & 45 deletions x-pack/plugins/reporting/server/lib/create_workers.js

This file was deleted.

13 changes: 4 additions & 9 deletions x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,6 @@ describe('Worker class', function () {
expect(body._source).to.eql({ excludes: excludedFields });
});

it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
});

it('should search for pending or expired jobs', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
Expand Down Expand Up @@ -709,7 +703,7 @@ describe('Worker class', function () {
});

it('should update the job with the workerFn output', function () {
const workerFn = function (jobPayload) {
const workerFn = function (job, jobPayload) {
expect(jobPayload).to.eql(payload);
return payload;
};
Expand All @@ -719,6 +713,7 @@ describe('Worker class', function () {
.then(() => {
sinon.assert.calledOnce(updateSpy);
const query = updateSpy.firstCall.args[1];

expect(query).to.have.property('index', job._index);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
Expand All @@ -731,7 +726,7 @@ describe('Worker class', function () {

it('should update the job status and completed time', function () {
const startTime = moment().valueOf();
const workerFn = function (jobPayload) {
const workerFn = function (job, jobPayload) {
expect(jobPayload).to.eql(payload);
return new Promise(function (resolve) {
setTimeout(() => resolve(payload), 10);
Expand Down Expand Up @@ -906,7 +901,7 @@ describe('Worker class', function () {
const timeout = 20;
cancellationCallback = function () {};

const workerFn = function (payload, cancellationToken) {
const workerFn = function (job, payload, cancellationToken) {
cancellationToken.on(cancellationCallback);
return new Promise(function (resolve) {
setTimeout(() => {
Expand Down
24 changes: 14 additions & 10 deletions x-pack/plugins/reporting/server/lib/esqueue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class Worker extends events.EventEmitter {
this.warn = getLogger(opts, this.id, 'warn');

this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
this.debug(`Created worker for ${this.jobtype} jobs`);

this._poller = new Poller({
functionToPoll: () => {
Expand Down Expand Up @@ -201,27 +201,32 @@ export class Worker extends events.EventEmitter {
// run the worker's workerFn
let isResolved = false;
const cancellationToken = new CancellationToken();
Promise.resolve(this.workerFn.call(null, job._source.payload, cancellationToken))
.then((res) => {
const jobSource = job._source;

Promise.resolve(this.workerFn.call(null, job, jobSource.payload, cancellationToken))
.then(res => {
isResolved = true;
resolve(res);
})
.catch((err) => {
.catch(err => {
isResolved = true;
reject(err);
});

// fail if workerFn doesn't finish before timeout
const { timeout } = jobSource;
setTimeout(() => {
if (isResolved) return;

cancellationToken.cancel();
this.warn(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
}));
}, job._source.timeout);
reject(
new WorkerTimeoutError(`Worker timed out, timeout = ${timeout}`, {
jobId: job._id,
timeout,
})
);
}, timeout);
});

return workerOutput.then((output) => {
Expand Down Expand Up @@ -357,7 +362,6 @@ export class Worker extends events.EventEmitter {
filter: {
bool: {
minimum_should_match: 1,
must: { term: { jobtype: this.jobtype } },
should: [
{ term: { status: 'pending' } },
{
Expand Down
Loading

0 comments on commit bef356f

Please sign in to comment.