Skip to content

Commit

Permalink
Remove types from Esqueue (elastic#32146)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshdover committed Feb 28, 2019
1 parent f6ea650 commit cb76047
Show file tree
Hide file tree
Showing 13 changed files with 16 additions and 107 deletions.
2 changes: 0 additions & 2 deletions x-pack/plugins/reporting/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/

export const QUEUE_DOCTYPE = 'esqueue';

export const JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY =
'xpack.reporting.jobCompletionNotifications';

Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugins/reporting/server/lib/create_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import { Esqueue } from './esqueue';
import { createWorkersFactory } from './create_workers';
import { QUEUE_DOCTYPE } from '../../common/constants';
import { oncePerServer } from './once_per_server';
import { createTaggedLogger } from './create_tagged_logger';

Expand All @@ -20,7 +19,6 @@ function createQueueFn(server) {

const logger = createTaggedLogger(server, ['reporting', 'esqueue']);
const queueOptions = {
doctype: QUEUE_DOCTYPE,
interval: queueConfig.indexInterval,
timeout: queueConfig.timeout,
dateSeparator: dateSeparator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ ClientMock.prototype.index = function (params = {}) {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_seq_no: 1,
_primary_term: 1,
Expand Down Expand Up @@ -52,7 +51,6 @@ ClientMock.prototype.get = function (params = {}, source = {}) {

return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_seq_no: params._seq_no || 1,
_primary_term: params._primary_term || 1,
Expand All @@ -65,7 +63,6 @@ ClientMock.prototype.search = function (params = {}, count = 5, source = {}) {
const hits = times(count, () => {
return {
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: uniqueId('documentId'),
_seq_no: random(1, 5),
_primar_term: random(1, 5),
Expand Down Expand Up @@ -97,7 +94,6 @@ ClientMock.prototype.update = function (params = {}) {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_seq_no: params.if_seq_no + 1 || 2,
_primary_term: params.if_primary_term + 1 || 2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ describe('Create Index', function () {
.then((exists) => expect(exists).to.be(true));
});

it('should create the index with type mappings and default settings', function () {
it('should create the index with mappings and default settings', function () {
const indexName = 'test-index';
const docType = constants.DEFAULT_SETTING_DOCTYPE;
const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS;
const result = createIndex(client, indexName);

Expand All @@ -44,42 +43,20 @@ describe('Create Index', function () {
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
});
});

it('should accept a custom doctype', function () {
const indexName = 'test-index';
const docType = 'my_type';
const settings = constants.DEFAULT_SETTING_INDEX_SETTINGS;
const result = createIndex(client, indexName, docType);

return result
.then(function () {
const payload = createSpy.getCall(0).args[0];
sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('index', indexName);
expect(payload).to.have.property('body');
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
expect(payload.body.mappings).to.have.property('properties');
});
});

it('should create the index with custom settings', function () {
const indexName = 'test-index';
const docType = constants.DEFAULT_SETTING_DOCTYPE;
const settings = {
...constants.DEFAULT_SETTING_INDEX_SETTINGS,
auto_expand_replicas: false,
number_of_shards: 3000,
number_of_replicas: 1,
format: '3000',
};
const result = createIndex(client, indexName, docType, settings);
const result = createIndex(client, indexName, settings);

return result
.then(function () {
Expand All @@ -90,8 +67,7 @@ describe('Create Index', function () {
expect(payload.body).to.have.property('settings');
expect(payload.body.settings).to.eql(settings);
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
expect(payload.body.mappings).to.have.property('properties');
});
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ describe('Esqueue class', function () {
const job = queue.addJob(jobType, payload);
const options = job.getProp('options');
expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT);
expect(options).to.have.property('doctype', constants.DEFAULT_SETTING_DOCTYPE);
});

it('should pass queue index settings', function () {
Expand Down Expand Up @@ -149,14 +148,12 @@ describe('Esqueue class', function () {
it('should pass worker options', function () {
const workerOptions = {
size: 12,
doctype: 'tests'
};

queue = new Esqueue('esqueue', { client });
const worker = queue.registerWorker('type', noop, workerOptions);
const options = worker.getProp('options');
expect(options.size).to.equal(workerOptions.size);
expect(options.doctype).to.equal(workerOptions.doctype);
});
});

Expand Down
8 changes: 0 additions & 8 deletions x-pack/plugins/reporting/server/lib/esqueue/__tests__/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ describe('Job Class', function () {
const args = createIndexMock.getCall(0).args;
expect(args[0]).to.equal(client);
expect(args[1]).to.equal(index);
expect(args[2]).to.equal(constants.DEFAULT_SETTING_DOCTYPE);
});
});

Expand All @@ -92,7 +91,6 @@ describe('Job Class', function () {
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('payload', payload);
});
Expand All @@ -103,7 +101,6 @@ describe('Job Class', function () {
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('jobtype', type);
});
Expand Down Expand Up @@ -134,7 +131,6 @@ describe('Job Class', function () {
try {
expect(jobDoc).to.have.property('id');
expect(jobDoc).to.have.property('index');
expect(jobDoc).to.have.property('type');
expect(jobDoc).to.have.property('_seq_no');
expect(jobDoc).to.have.property('_primary_term');
done();
Expand Down Expand Up @@ -351,7 +347,6 @@ describe('Job Class', function () {
const args = createIndexMock.getCall(0).args;
expect(args[0]).to.equal(newClient);
expect(args[1]).to.equal(index);
expect(args[2]).to.equal(constants.DEFAULT_SETTING_DOCTYPE);
});
});

Expand All @@ -362,7 +357,6 @@ describe('Job Class', function () {

const newDoc = newClient.index.getCall(0).args[0];
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload);
});
Expand All @@ -382,7 +376,6 @@ describe('Job Class', function () {
.then((doc) => {
const jobDoc = job.document; // document should be resolved
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('_seq_no', jobDoc._seq_no);
expect(doc).to.have.property('_primary_term', jobDoc._primary_term);
Expand Down Expand Up @@ -433,7 +426,6 @@ describe('Job Class', function () {

const doc = job.toJSON();
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout);
Expand Down
25 changes: 0 additions & 25 deletions x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ describe('Worker class', function () {
expect(worker).to.have.property('jobtype', jobtype);
expect(worker).to.have.property('workerFn', workerFn);
expect(worker).to.have.property('checkSize');
expect(worker).to.have.property('doctype');
});

it('should have a unique ID', function () {
Expand Down Expand Up @@ -330,24 +329,6 @@ describe('Worker class', function () {
});
});


describe('query parameters', function () {
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search').callsFake(() => Promise.resolve({ hits: { hits: [] } }));
});

it('should query by default doctype', function () {
const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});

it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype);
});
});

describe('query body', function () {
const conditionPath = 'query.bool.filter.bool';
const jobtype = 'test_jobtype';
Expand Down Expand Up @@ -449,7 +430,6 @@ describe('Worker class', function () {
worker._claimJob(job);
const query = updateSpy.firstCall.args[0];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
Expand Down Expand Up @@ -522,7 +502,6 @@ describe('Worker class', function () {
describe('find a pending job to claim', function () {
const getMockJobs = (status = 'pending') => ([{
_index: 'myIndex',
_type: 'test',
_id: 12345,
_seq_no: 3,
_primary_term: 3,
Expand Down Expand Up @@ -580,7 +559,6 @@ describe('Worker class', function () {
return worker._claimPendingJobs(getMockJobs())
.then(claimedJob => {
expect(claimedJob._index).to.be('myIndex');
expect(claimedJob._type).to.be('test');
expect(claimedJob._source.jobtype).to.be('jobtype');
expect(claimedJob._source.status).to.be('processing');
expect(claimedJob.test).to.be('cool');
Expand Down Expand Up @@ -613,7 +591,6 @@ describe('Worker class', function () {
worker._failJob(job);
const query = updateSpy.firstCall.args[0];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
Expand Down Expand Up @@ -735,7 +712,6 @@ describe('Worker class', function () {
sinon.assert.calledOnce(updateSpy);
const query = updateSpy.firstCall.args[0];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('type', job._type);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
expect(query).to.have.property('if_primary_term', job._primary_term);
Expand Down Expand Up @@ -776,7 +752,6 @@ describe('Worker class', function () {
expect(workerJob).to.have.property('job');
expect(workerJob.job).to.have.property('id');
expect(workerJob.job).to.have.property('index');
expect(workerJob.job).to.have.property('type');

expect(workerJob).to.have.property('output');
expect(workerJob.output).to.have.property('content');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ export const defaultSettings = {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_DATE_SEPARATOR: '-',
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
DEFAULT_SETTING_INDEX_SETTINGS: {
number_of_shards: 1,
auto_expand_replicas: '0-1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,14 @@ const schema = {
}
};

export function createIndex(client, indexName,
doctype = constants.DEFAULT_SETTING_DOCTYPE,
indexSettings = { }) {
export function createIndex(client, indexName, indexSettings = {}) {
const body = {
settings: {
...constants.DEFAULT_SETTING_INDEX_SETTINGS,
...indexSettings
},
mappings: {
[doctype]: {
properties: schema
}
properties: schema
}
};

Expand All @@ -88,7 +84,6 @@ export function createIndex(client, indexName,
if (!exists) {
return client.indices.create({
index: indexName,
include_type_name: true,
body: body
})
.then(() => true)
Expand Down
6 changes: 2 additions & 4 deletions x-pack/plugins/reporting/server/lib/esqueue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export class Esqueue extends EventEmitter {
this.settings = {
interval: constants.DEFAULT_SETTING_INTERVAL,
timeout: constants.DEFAULT_SETTING_TIMEOUT,
doctype: constants.DEFAULT_SETTING_DOCTYPE,
dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR,
...omit(options, [ 'client' ])
};
Expand All @@ -44,20 +43,19 @@ export class Esqueue extends EventEmitter {
});
}

addJob(type, payload, opts = {}) {
addJob(jobtype, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
const index = `${this.index}-${timestamp}`;
const defaults = {
timeout: this.settings.timeout,
};

const options = Object.assign(defaults, opts, {
doctype: this.settings.doctype,
indexSettings: this.settings.indexSettings,
logger: this._logger
});

return new Job(this, index, type, payload, options);
return new Job(this, index, jobtype, payload, options);
}

registerWorker(type, workerFn, opts) {
Expand Down
Loading

0 comments on commit cb76047

Please sign in to comment.