Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.x] [ML] Removing hardcoded datafeed ID in jobs list (#20815) #20816

Merged
merged 1 commit into from
Jul 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions x-pack/plugins/ml/server/models/job_service/datafeeds.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@
export function datafeedsProvider(callWithRequest) {

async function forceStartDatafeeds(datafeedIds, start, end) {
const jobIds = {};
const doStartsCalled = {};
const jobIds = await getJobIdsByDatafeedId();
const doStartsCalled = datafeedIds.reduce((p, c) => {
p[c] = false;
return p;
}, {});

const results = {};
const START_TIMEOUT = 10000; // 10s

Expand All @@ -26,29 +30,26 @@ export function datafeedsProvider(callWithRequest) {
}
}

datafeedIds.forEach((dId) => {
const jId = dId.replace('datafeed-', ''); // change this. this should be from a look up from the datafeeds endpoint
jobIds[dId] = jId;
doStartsCalled[dId] = false;
});

for (const datafeedId of datafeedIds) {
const jobId = jobIds[datafeedId];

setTimeout(async () => {
// in 10 seconds start the datafeed.
// this should give the openJob enough time.
// if not, the start request will be queued
// behind the open request on the server.
results[datafeedId] = await doStart(datafeedId);
}, START_TIMEOUT);

try {
if (await openJob(jobId)) {
if (jobId !== undefined) {
setTimeout(async () => {
// in 10 seconds start the datafeed.
// this should give the openJob enough time.
// if not, the start request will be queued
// behind the open request on the server.
results[datafeedId] = await doStart(datafeedId);
}, START_TIMEOUT);

try {
if (await openJob(jobId)) {
results[datafeedId] = await doStart(datafeedId);
}
} catch (error) {
results[datafeedId] = { started: false, error };
}
} catch (error) {
results[datafeedId] = { started: false, error };
} else {
results[datafeedId] = { started: false, error: 'Job has no datafeed' };
}
}

Expand Down Expand Up @@ -88,9 +89,27 @@ export function datafeedsProvider(callWithRequest) {
return callWithRequest('ml.deleteDatafeed', { datafeedId, force: true });
}

async function getDatafeedIdsByJobId() {
const datafeeds = await callWithRequest('ml.datafeeds');
return datafeeds.datafeeds.reduce((p, c) => {
p[c.job_id] = c.datafeed_id;
return p;
}, {});
}

async function getJobIdsByDatafeedId() {
const datafeeds = await callWithRequest('ml.datafeeds');
return datafeeds.datafeeds.reduce((p, c) => {
p[c.datafeed_id] = c.job_id;
return p;
}, {});
}

return {
forceStartDatafeeds,
stopDatafeeds,
forceDeleteDatafeed,
getDatafeedIdsByJobId,
getJobIdsByDatafeedId,
};
}
16 changes: 8 additions & 8 deletions x-pack/plugins/ml/server/models/job_service/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const TIME_FORMAT = 'YYYY-MM-DD HH:mm:ss';

export function jobsProvider(callWithRequest) {

const { forceDeleteDatafeed } = datafeedsProvider(callWithRequest);
const { forceDeleteDatafeed, getDatafeedIdsByJobId } = datafeedsProvider(callWithRequest);
const { getAuditMessagesSummary } = jobAuditMessagesProvider(callWithRequest);
const calMngr = new CalendarManager(callWithRequest);

Expand All @@ -24,14 +24,14 @@ export function jobsProvider(callWithRequest) {

async function deleteJobs(jobIds) {
const results = {};
const datafeedIds = jobIds.reduce((p, c) => {
p[c] = `datafeed-${c}`;
return p;
}, {});
const datafeedIds = await getDatafeedIdsByJobId();

for (const jobId of jobIds) {
try {
const datafeedResp = await forceDeleteDatafeed(datafeedIds[jobId]);
const datafeedResp = (datafeedIds[jobId] === undefined) ?
{ acknowledged: true } :
await forceDeleteDatafeed(datafeedIds[jobId]);

if (datafeedResp.acknowledged) {
try {
await forceDeleteJob(jobId);
Expand All @@ -56,15 +56,15 @@ export function jobsProvider(callWithRequest) {
}, {});

const jobs = fullJobsList.map((job) => {
const hasDatafeed = (job.datafeed_config !== undefined);
const hasDatafeed = (typeof job.datafeed_config === 'object' && Object.keys(job.datafeed_config).length);
const {
earliest: earliestTimeStamp,
latest: latestTimeStamp } = earliestAndLatestTimeStamps(job.data_counts);

const tempJob = {
id: job.job_id,
description: (job.description || ''),
groups: (job.groups || []),
groups: (Array.isArray(job.groups) ? job.groups.sort() : []),
processed_record_count: job.data_counts.processed_record_count,
memory_status: (job.model_size_stats) ? job.model_size_stats.memory_status : '',
jobState: job.state,
Expand Down