Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle content stream errors in report pre-deletion #173792

Merged
merged 6 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,43 @@ export const commonJobsRouteHandlerFactory = (reporting: ReportingCore) => {
return jobManagementPreRouting(reporting, res, docId, user, counters, async (doc) => {
const docIndex = doc.index;
const stream = await getContentStream(reporting, { id: docId, index: docIndex });
/** @note Overwriting existing content with an empty buffer to remove all the chunks. */
await new Promise<void>((resolve) => {
stream.end('', 'utf8', () => {
resolve();
});
const reportingSetup = reporting.getPluginSetupDeps();
const logger = reportingSetup.logger.get('delete-report');

// An "error" event is emitted if an error is
// passed to the `stream.end` callback from
// the _final method of the ContentStream.
// This event must be handled.
stream.on('error', (err) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice!!

logger.error(err);
});
await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
try {
// Overwriting existing content with an
// empty buffer to remove all the chunks.
await new Promise<void>((resolve, reject) => {
stream.end('', 'utf8', (error?: Error) => {
if (error) {
// handle error that could be thrown
// from the _write method of the ContentStream
reject(error);
} else {
resolve();
}
});
});

await jobsQuery.delete(docIndex, docId);

return res.ok({
body: { deleted: true },
});
} catch (error) {
logger.error(error);
return res.customError({
statusCode: 500,
});
}
});
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { registerJobInfoRoutesInternal as registerJobInfoRoutes } from '../jobs'

type SetupServerReturn = Awaited<ReturnType<typeof setupServer>>;

describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
describe(`Reporting Job Management Routes: Internal`, () => {
const reportingSymbol = Symbol('reporting');
let server: SetupServerReturn['server'];
let usageCounter: IUsageCounter;
Expand Down Expand Up @@ -144,148 +144,148 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
await server.stop();
});

it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
describe('download report', () => {
it('fails on malformed download IDs', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/1`)
.expect(400)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(
'"[request params.docId]: value has length [1] but it must have a minimum length of [3]."'
)
);
});

it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
it('fails on unauthenticated users', async () => {
mockStartDeps = await createMockPluginStart(
{
licensing: {
...licensingMock.createStart(),
license$: new BehaviorSubject({ isActive: true, isAvailable: true, type: 'gold' }),
},
security: { authc: { getCurrentUser: () => undefined } },
},
security: { authc: { getCurrentUser: () => undefined } },
},
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);
mockConfigSchema
);
core = await createMockReportingCore(mockConfigSchema, mockSetupDeps, mockStartDeps);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dope`)
.expect(401)
.then(({ body }) =>
expect(body.message).toMatchInlineSnapshot(`"Sorry, you aren't authenticated"`)
);
});

it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);
it('returns 404 if job not found', async () => {
mockEsClient.search.mockResponseOnce(getHits());
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(404);
});

it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);
it('returns a 403 if not a valid job type', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'invalidJobType',
payload: { title: 'invalid!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/poo`)
.expect(403);
});

it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);
it(`returns job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeBase64Encoded,
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(200);
});

it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);
it(`returns 403 if a user cannot view a job's info`, async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: 'customForbiddenJobType',
payload: {}, // payload is irrelevant
})
);

registerJobInfoRoutes(core);
registerJobInfoRoutes(core);

await server.start();
await server.start();

await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.INFO_PREFIX}/test`)
.expect(403);
});

it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});
it('when a job is incomplete', async () => {
mockEsClient.search.mockResponseOnce(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'pending',
payload: { title: 'incomplete!' },
})
);
registerJobInfoRoutes(core);

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(503)
.expect('Content-Type', 'text/plain; charset=utf-8')
.expect('Retry-After', '30')
.then(({ text }) => expect(text).toEqual('pending'));
});

it('when a job fails', async () => {
mockEsClient.search.mockResponse(
getHits({
jobtype: mockJobTypeUnencoded,
status: 'failed',
output: { content: 'job failure message' },
payload: { title: 'failing job!' },
})
);
});
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.get(`${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8')
.then(({ body }) =>
expect(body.message).toEqual('Reporting generation failed: job failure message')
);
});

describe('successful downloads', () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes above this line are to just restructure the tests

it('when a known job-type is complete', async () => {
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);
Expand Down Expand Up @@ -483,4 +483,28 @@ describe(`GET ${INTERNAL_ROUTES.JOBS.DOWNLOAD_PREFIX}`, () => {
});
});
});

describe('delete report', () => {
it('handles content stream errors', async () => {
stream = new Readable({
read() {
this.push('test');
this.push(null);
},
}) as typeof stream;
stream.end = jest.fn().mockImplementation((_name, _encoding, callback) => {
callback(new Error('An error occurred in ending the content stream'));
});

(getContentStream as jest.MockedFunction<typeof getContentStream>).mockResolvedValue(stream);
mockEsClient.search.mockResponseOnce(getCompleteHits());
registerJobInfoRoutes(core);

await server.start();
await supertest(httpSetup.server.listener)
.delete(`${INTERNAL_ROUTES.JOBS.DELETE_PREFIX}/dank`)
.expect(500)
.expect('Content-Type', 'application/json; charset=utf-8');
});
});
});
Loading
Loading