Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] refactored bulk update tags retry #147594

Merged
merged 13 commits into from
Dec 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,16 @@ describe('TagsAddRemove', () => {

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2', 'newTag'],
['newTag'],
[],
expect.anything(),
'Tag created',
'Tag creation failed'
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'query',
['newTag2'],
[],
expect.anything(),
'Tag created',
Expand All @@ -316,7 +325,16 @@ describe('TagsAddRemove', () => {
expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2', 'tag1'],
['tag1'],
expect.anything(),
undefined,
undefined
);

expect(mockBulkUpdateTags).toHaveBeenCalledWith(
'',
[],
['tag2'],
expect.anything(),
undefined,
undefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,10 @@ export const TagsAddRemove: React.FC<Props> = ({
errorMessage
);
} else {
// sending updated tags to add/remove, in case multiple actions are done quickly and the first one is not yet propagated
const updatedTagsToAdd = tagsToAdd.concat(
labels
.filter(
(tag) =>
tag.checked === 'on' &&
!selectedTags.includes(tag.label) &&
!tagsToRemove.includes(tag.label)
)
.map((tag) => tag.label)
);
const updatedTagsToRemove = tagsToRemove.concat(
labels
.filter(
(tag) =>
tag.checked !== 'on' &&
selectedTags.includes(tag.label) &&
!tagsToAdd.includes(tag.label)
)
.map((tag) => tag.label)
);

updateTagsHook.bulkUpdateTags(
agents!,
updatedTagsToAdd,
updatedTagsToRemove,
tagsToAdd,
tagsToRemove,
(hasCompleted) => handleTagsUpdated(tagsToAdd, tagsToRemove, hasCompleted),
successMessage,
errorMessage
Expand Down
5 changes: 4 additions & 1 deletion x-pack/plugins/fleet/server/services/agents/crud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ export function getElasticsearchQuery(
kuery: string,
showInactive = false,
includeHosted = false,
hostedPolicies: string[] = []
hostedPolicies: string[] = [],
extraFilters: string[] = []
): estypes.QueryDslQueryContainer | undefined {
const filters = [];

Expand All @@ -171,6 +172,8 @@ export function getElasticsearchQuery(
filters.push('NOT (policy_id:{policyIds})'.replace('{policyIds}', hostedPolicies.join(',')));
}

filters.push(...extraFilters);

const kueryNode = _joinFilters(filters);
return kueryNode ? toElasticsearchQuery(kueryNode) : undefined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { elasticsearchServiceMock, savedObjectsClientMock } from '@kbn/core/serv

import { createClientMock } from './action.mock';
import { updateAgentTags } from './update_agent_tags';
import { updateTagsBatch } from './update_agent_tags_action_runner';

jest.mock('../app_context', () => {
return {
Expand All @@ -28,6 +29,7 @@ jest.mock('../agent_policy', () => {
return {
agentPolicyService: {
getByIDs: jest.fn().mockResolvedValue([{ id: 'hosted-agent-policy', is_managed: true }]),
list: jest.fn().mockResolvedValue({ items: [] }),
},
};
});
Expand Down Expand Up @@ -73,7 +75,7 @@ describe('update_agent_tags', () => {

expect(esClient.updateByQuery).toHaveBeenCalledWith(
expect.objectContaining({
conflicts: 'abort',
conflicts: 'proceed',
index: '.fleet-agents',
query: { terms: { _id: ['agent1'] } },
script: expect.objectContaining({
Expand Down Expand Up @@ -152,6 +154,19 @@ describe('update_agent_tags', () => {
expect(errorResults.body[1].error).toEqual('error reason');
});

it('should throw error on version conflicts', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({
failures: [],
updated: 0,
version_conflicts: 100,
} as any);

await expect(
updateAgentTags(soClient, esClient, { agentIds: ['agent1'] }, ['one'], [])
).rejects.toThrowError('version conflict of 100 agents');
});

it('should run add tags async when actioning more agents than batch size', async () => {
esClient.search.mockResolvedValue({
hits: {
Expand Down Expand Up @@ -180,4 +195,79 @@ describe('update_agent_tags', () => {

expect(mockRunAsync).toHaveBeenCalled();
});

it('should add tags filter if only one tag to add', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(updateByQuery.query).toEqual({
bool: {
filter: [
{ bool: { minimum_should_match: 1, should: [{ match: { active: true } }] } },
{
bool: {
must_not: { bool: { minimum_should_match: 1, should: [{ match: { tags: 'new' } }] } },
},
},
],
},
});
});

it('should add tags filter if only one tag to remove', async () => {
await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: [],
tagsToRemove: ['remove'],
kuery: '',
}
);

const updateByQuery = esClient.updateByQuery.mock.calls[0][0] as any;
expect(JSON.stringify(updateByQuery.query)).toContain(
'{"bool":{"should":[{"match":{"tags":"remove"}}],"minimum_should_match":1}}'
);
});

it('should write total from updateByQuery result if query returns less results', async () => {
esClient.updateByQuery.mockReset();
esClient.updateByQuery.mockResolvedValue({ failures: [], updated: 0, total: 50 } as any);

await updateTagsBatch(
soClient,
esClient,
[],
{},
{
tagsToAdd: ['new'],
tagsToRemove: [],
kuery: '',
total: 100,
}
);

const agentAction = esClient.create.mock.calls[0][0] as any;
expect(agentAction?.body).toEqual(
expect.objectContaining({
action_id: expect.anything(),
agents: [],
type: 'UPDATE_TAGS',
total: 50,
})
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export class UpdateAgentTagsActionRunner extends ActionRunner {
actionId: this.actionParams.actionId,
total: this.actionParams.total,
kuery: this.actionParams.kuery,
isRetry: !!this.retryParams.retryCount,
}
);

Expand All @@ -82,6 +83,7 @@ export async function updateTagsBatch(
actionId?: string;
total?: number;
kuery?: string;
isRetry?: boolean;
}
): Promise<{ actionId: string; updated?: number; took?: number }> {
const errors: Record<Agent['id'], Error> = { ...outgoingErrors };
Expand All @@ -108,7 +110,13 @@ export async function updateTagsBatch(
});
const hostedIds = hostedPolicies.items.map((item) => item.id);

query = getElasticsearchQuery(options.kuery, false, false, hostedIds);
const extraFilters = [];
if (options.tagsToAdd.length === 1 && options.tagsToRemove.length === 0) {
hop-dev marked this conversation as resolved.
Show resolved Hide resolved
extraFilters.push(`NOT (tags:${options.tagsToAdd[0]})`);
} else if (options.tagsToRemove.length === 1) {
extraFilters.push(`tags:${options.tagsToRemove[0]}`);
}
query = getElasticsearchQuery(options.kuery, false, false, hostedIds, extraFilters);
} else {
query = {
terms: {
Expand Down Expand Up @@ -150,7 +158,7 @@ export async function updateTagsBatch(
updatedAt: new Date().toISOString(),
},
},
conflicts: 'abort', // relying on the task to retry in case of conflicts
conflicts: 'proceed', // relying on the task to retry in case of conflicts - retry only conflicted agents
});
} catch (error) {
throw new Error('Caught error: ' + JSON.stringify(error).slice(0, 1000));
Expand All @@ -161,14 +169,17 @@ export async function updateTagsBatch(
const actionId = options.actionId ?? uuid();
const total = options.total ?? givenAgents.length;

// creating an action doc so that update tags shows up in activity
await createAgentAction(esClient, {
id: actionId,
agents: options.kuery === undefined ? agentIds : [],
created_at: new Date().toISOString(),
type: 'UPDATE_TAGS',
total,
});
if (!options.isRetry) {
// creating an action doc so that update tags shows up in activity
await createAgentAction(esClient, {
id: actionId,
agents: options.kuery === undefined ? agentIds : [],
created_at: new Date().toISOString(),
type: 'UPDATE_TAGS',
// for kuery cases, the total can be less than the initial selection, as we filter out tags that are already added/removed
total: options.kuery === undefined ? total : res.total,
});
}

// creating unique 0...n ids to use as agentId, as we don't have all agent ids in case of action by kuery
const getArray = (count: number) => [...Array(count).keys()];
Expand Down Expand Up @@ -198,18 +209,20 @@ export async function updateTagsBatch(
}

// writing hosted agent errors - hosted agents filtered out
if ((res.total ?? total) < total) {
if (options.kuery === undefined && hostedAgentIds.length > 0) {
await bulkCreateAgentActionResults(
esClient,
(options.kuery === undefined ? hostedAgentIds : getArray(total - (res.total ?? total))).map(
(id) => ({
agentId: id + '',
actionId,
error: hostedAgentError,
})
)
hostedAgentIds.map((id) => ({
agentId: id + '',
actionId,
error: hostedAgentError,
}))
);
}

if (res.version_conflicts ?? 0 > 0) {
throw new Error(`version conflict of ${res.version_conflicts} agents`);
}

return { actionId, updated: res.updated, took: res.took };
}