Skip to content

Commit

Permalink
[Task Manager] ignore version conflicts that exceed max_docs in the c…
Browse files Browse the repository at this point in the history
…laiming process (elastic#89415)

This is a first step in attempting to address the over zealous shifting we've identified in TM.

It [turns out](elastic/elasticsearch#63671) `version_conflicts` don't always count against `max_docs`, so in this PR we correct the `version_conflicts` returned by updateByQuery in TaskManager to only count the conflicts that _may_ have counted against `max_docs`.
This correction isn't necessarily accurate, but it will ensure we don't shift if we are in fact managing to claim tasks.
  • Loading branch information
gmmorris committed Jan 28, 2021
1 parent ab1699d commit c0fd92f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
74 changes: 73 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,13 @@ describe('TaskStore', () => {
opts = {},
hits = generateFakeTasks(1),
claimingOpts,
versionConflicts = 2,
}: {
opts: Partial<StoreOpts>;
hits?: unknown[];
claimingOpts: OwnershipClaimingOpts;
versionConflicts?: number;
}) {
const versionConflicts = 2;
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
esClient.search.mockResolvedValue(asApiResponse({ hits: { hits } }));
esClient.updateByQuery.mockResolvedValue(
Expand Down Expand Up @@ -971,6 +972,77 @@ if (doc['task.runAt'].size()!=0) {
]);
});

test('it returns version_conflicts that do not include conflicts that were proceeded against', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
const tasks = [
{
_id: 'task:aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'claiming',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
_id: 'task:bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
schedule: { interval: '5m' },
attempts: 2,
status: 'claiming',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
];
const maxDocs = 10;
const {
result: { stats: { tasksUpdated, tasksConflicted, tasksClaimed } = {} } = {},
} = await testClaimAvailableTasks({
opts: {
taskManagerId,
},
claimingOpts: {
claimOwnershipUntil,
size: maxDocs,
},
hits: tasks,
// assume there were 20 version conflists, but thanks to `conflicts="proceed"`
// we proceeded to claim tasks
versionConflicts: 20,
});

expect(tasksUpdated).toEqual(2);
// ensure we only count conflicts that *may* have counted against max_docs, no more than that
expect(tasksConflicted).toEqual(10 - tasksUpdated!);
expect(tasksClaimed).toEqual(2);
});

test('pushes error from saved objects client to errors$', async () => {
const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const store = new TaskStore({
Expand Down
16 changes: 14 additions & 2 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ export class TaskStore {
private async updateByQuery(
opts: UpdateByQuerySearchOpts = {},
// eslint-disable-next-line @typescript-eslint/naming-convention
{ max_docs }: UpdateByQueryOpts = {}
{ max_docs: max_docs }: UpdateByQueryOpts = {}
): Promise<UpdateByQueryResult> {
const { query } = ensureQueryOnlyReturnsTaskObjects(opts);
try {
Expand All @@ -548,10 +548,22 @@ export class TaskStore {
},
});

/**
* When we run updateByQuery with conflicts='proceed', it's possible for the `version_conflicts`
* to count against the specified `max_docs`, as per https://github.com/elastic/elasticsearch/issues/63671
* In order to correct for that happening, we only count `version_conflicts` if we haven't updated as
* many docs as we could have.
* This is still no more than an estimation, as there might have been less docuemnt to update that the
* `max_docs`, but we bias in favour of over zealous `version_conflicts` as that's the best indicator we
* have for an unhealthy cluster distribution of Task Manager polling intervals
*/
const conflictsCorrectedForContinuation =
max_docs && version_conflicts + updated > max_docs ? max_docs - updated : version_conflicts;

return {
total,
updated,
version_conflicts,
version_conflicts: conflictsCorrectedForContinuation,
};
} catch (e) {
this.errors$.next(e);
Expand Down

0 comments on commit c0fd92f

Please sign in to comment.