Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search Sessions] fix updating deleting sessions from non-default space #96123

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import {
EQL_SEARCH_STRATEGY,
} from '../../../common';
import { savedObjectsClientMock } from '../../../../../../src/core/server/mocks';
import type { SavedObjectsClientContract } from 'kibana/server';
import { SearchSessionsConfig, SearchStatus } from './types';
import moment from 'moment';
import {
SavedObjectsBulkUpdateObject,
SavedObjectsDeleteOptions,
SavedObjectsClientContract,
} from '../../../../../../src/core/server';

describe('getSearchStatus', () => {
let mockClient: any;
Expand Down Expand Up @@ -263,6 +267,45 @@ describe('getSearchStatus', () => {
expect(savedObjectsClient.delete).not.toBeCalled();
});

test('deletes in space', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
{
id: '123',
namespaces: ['awesome'],
attributes: {
persisted: false,
status: SearchSessionStatus.IN_PROGRESS,
created: moment().subtract(moment.duration(3, 'm')),
touched: moment().subtract(moment.duration(2, 'm')),
idMapping: {
'map-key': {
strategy: ENHANCED_ES_SEARCH_STRATEGY,
id: 'async-id',
},
},
},
},
],
total: 1,
} as any);

await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);

expect(savedObjectsClient.delete).toBeCalled();

const [, id, opts] = savedObjectsClient.delete.mock.calls[0];
expect(id).toBe('123');
expect((opts as SavedObjectsDeleteOptions).namespace).toBe('awesome');
});

test('deletes a non persisted, abandoned session', async () => {
savedObjectsClient.find.mockResolvedValue({
saved_objects: [
Expand Down Expand Up @@ -479,6 +522,50 @@ describe('getSearchStatus', () => {
expect(savedObjectsClient.delete).not.toBeCalled();
});

test('updates in space', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
namespaces: ['awesome'],
attributes: {
status: SearchSessionStatus.IN_PROGRESS,
touched: '123',
idMapping: {
'search-hash': {
id: 'search-id',
strategy: 'cool',
status: SearchStatus.IN_PROGRESS,
},
},
},
};
savedObjectsClient.find.mockResolvedValue({
saved_objects: [so],
total: 1,
} as any);

mockClient.asyncSearch.status.mockResolvedValue({
body: {
is_partial: false,
is_running: false,
completion_status: 200,
},
});

await checkRunningSessions(
{
savedObjectsClient,
client: mockClient,
logger: mockLogger,
},
config
);

expect(mockClient.asyncSearch.status).toBeCalledWith({ id: 'search-id' });
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];
const updatedAttributes = updateInput[0] as SavedObjectsBulkUpdateObject;
expect(updatedAttributes.namespace).toBe('awesome');
});

test('updates to complete if the search is done', async () => {
savedObjectsClient.bulkUpdate = jest.fn();
const so = {
Expand Down Expand Up @@ -563,7 +650,6 @@ describe('getSearchStatus', () => {
config
);
const [updateInput] = savedObjectsClient.bulkUpdate.mock.calls[0];

const updatedAttributes = updateInput[0].attributes as SearchSessionSavedObjectAttributes;
expect(updatedAttributes.status).toBe(SearchSessionStatus.ERROR);
expect(updatedAttributes.idMapping['search-hash'].status).toBe(SearchStatus.ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
Logger,
SavedObjectsClientContract,
SavedObjectsFindResult,
SavedObjectsUpdateResponse,
} from 'kibana/server';
import moment from 'moment';
import { EMPTY, from } from 'rxjs';
Expand Down Expand Up @@ -169,12 +170,20 @@ export async function checkRunningSessions(

if (!session.attributes.persisted) {
if (isSessionStale(session, config, logger)) {
deleted = true;
// delete saved object to free up memory
// TODO: there's a potential rare edge case of deleting an object and then receiving a new trackId for that same session!
// Maybe we want to change state to deleted and cleanup later?
logger.debug(`Deleting stale session | ${session.id}`);
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id);
try {
await savedObjectsClient.delete(SEARCH_SESSION_TYPE, session.id, {
namespace: session.namespaces?.[0],
});
deleted = true;
} catch (e) {
logger.error(
`Error while deleting stale search session ${session.id}: ${e.message}`
);
}

// Send a delete request for each async search to ES
Object.keys(session.attributes.idMapping).map(async (searchKey: string) => {
Expand All @@ -183,8 +192,8 @@ export async function checkRunningSessions(
try {
await client.asyncSearch.delete({ id: searchInfo.id });
} catch (e) {
logger.debug(
`Error ignored while deleting async_search ${searchInfo.id}: ${e.message}`
logger.error(
`Error while deleting async_search ${searchInfo.id}: ${e.message}`
);
}
}
Expand All @@ -202,9 +211,31 @@ export async function checkRunningSessions(
if (updatedSessions.length) {
// If there's an error, we'll try again in the next iteration, so there's no need to check the output.
const updatedResponse = await savedObjectsClient.bulkUpdate<SearchSessionSavedObjectAttributes>(
updatedSessions
updatedSessions.map((session) => ({
...session,
namespace: session.namespaces?.[0],
}))
);

const success: Array<
SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>
> = [];
const fail: Array<SavedObjectsUpdateResponse<SearchSessionSavedObjectAttributes>> = [];

updatedResponse.saved_objects.forEach((savedObjectResponse) => {
if ('error' in savedObjectResponse) {
fail.push(savedObjectResponse);
logger.error(
`Error while updating search session ${savedObjectResponse?.id}: ${savedObjectResponse.error?.message}`
);
} else {
success.push(savedObjectResponse);
}
});

logger.debug(
`Updating search sessions: success: ${success.length}, fail: ${fail.length}`
);
logger.debug(`Updated ${updatedResponse.saved_objects.length} search sessions`);
}
})
)
Expand Down
153 changes: 153 additions & 0 deletions x-pack/test/api_integration/apis/search/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default function ({ getService }: FtrProviderContext) {
const supertestWithoutAuth = getService('supertestWithoutAuth');
const security = getService('security');
const retry = getService('retry');
const spacesService = getService('spaces');

describe('search session', () => {
describe('session management', () => {
Expand Down Expand Up @@ -596,5 +597,157 @@ export default function ({ getService }: FtrProviderContext) {
.expect(403);
});
});

describe('in non-default space', () => {
const spaceId = 'foo-space';
before(async () => {
try {
await spacesService.create({
id: spaceId,
name: 'Foo Space',
});
} catch {
// might already be created
}
});

after(async () => {
await spacesService.delete(spaceId);
});

it('should complete and delete non-persistent sessions', async () => {
const sessionId = `my-session-${Math.random()}`;

// run search
const searchRes = await supertest
.post(`/s/${spaceId}/internal/search/ese`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
params: {
body: {
query: {
term: {
agent: '1',
},
},
},
wait_for_completion_timeout: '1ms',
},
})
.expect(200);

const { id } = searchRes.body;

await retry.waitForWithTimeout('searches persisted into session', 5000, async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { touched, created, persisted, idMapping } = resp.body.attributes;
expect(persisted).to.be(false);
expect(touched).not.to.be(undefined);
expect(created).not.to.be(undefined);

const idMappings = Object.values(idMapping).map((value: any) => value.id);
expect(idMappings).to.contain(id);
return true;
});

// not touched timeout in tests is 15s, wait to give a chance for status to update
await new Promise((resolve) =>
setTimeout(() => {
resolve(void 0);
}, 15_000)
);

await retry.waitForWithTimeout(
'searches eventually complete and session gets into the complete state',
30_000,
async () => {
await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(404);

return true;
}
);
});

it('should complete persisten session', async () => {
const sessionId = `my-session-${Math.random()}`;

// run search
const searchRes = await supertest
.post(`/s/${spaceId}/internal/search/ese`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
params: {
body: {
query: {
term: {
agent: '1',
},
},
},
wait_for_completion_timeout: '1ms',
},
})
.expect(200);

const { id } = searchRes.body;

// persist session
await supertest
.post(`/s/${spaceId}/internal/session`)
.set('kbn-xsrf', 'foo')
.send({
sessionId,
name: 'My Session',
appId: 'discover',
expires: '123',
urlGeneratorId: 'discover',
})
.expect(200);

await retry.waitForWithTimeout('searches persisted into session', 5000, async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { touched, created, persisted, idMapping } = resp.body.attributes;
expect(persisted).to.be(true);
expect(touched).not.to.be(undefined);
expect(created).not.to.be(undefined);

const idMappings = Object.values(idMapping).map((value: any) => value.id);
expect(idMappings).to.contain(id);
return true;
});

// session refresh interval is 5 seconds, wait to give a chance for status to update
await new Promise((resolve) => setTimeout(resolve, 5000));

await retry.waitForWithTimeout(
'searches eventually complete and session gets into the complete state',
5000,
async () => {
const resp = await supertest
.get(`/s/${spaceId}/internal/session/${sessionId}`)
.set('kbn-xsrf', 'foo')
.expect(200);

const { status } = resp.body.attributes;

expect(status).to.be(SearchSessionStatus.COMPLETE);
return true;
}
);
});
});
});
}