Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix aborted$ event and add completed$ event to KibanaRequest #73898

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->

[Home](./index.md) &gt; [kibana-plugin-core-server](./kibana-plugin-core-server.md) &gt; [KibanaRequestEvents](./kibana-plugin-core-server.kibanarequestevents.md) &gt; [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md)

## KibanaRequestEvents.completed$ property

Observable that emits once if and when the request has been completely handled.

<b>Signature:</b>

```typescript
completed$: Observable<void>;
```

## Remarks

The request may be considered completed if: - A response has been sent to the client; or - The request was aborted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export interface KibanaRequestEvents
| Property | Type | Description |
| --- | --- | --- |
| [aborted$](./kibana-plugin-core-server.kibanarequestevents.aborted_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been aborted. |
| [completed$](./kibana-plugin-core-server.kibanarequestevents.completed_.md) | <code>Observable&lt;void&gt;</code> | Observable that emits once if and when the request has been completely handled. |

91 changes: 91 additions & 0 deletions src/core/server/http/integration_tests/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { HttpService } from '../http_service';
import { contextServiceMock } from '../../context/context_service.mock';
import { loggingSystemMock } from '../../logging/logging_system.mock';
import { createHttpServer } from '../test_utils';
import { schema } from '@kbn/config-schema';

let server: HttpService;

Expand Down Expand Up @@ -195,6 +196,96 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});

it('does not complete before response has been sent', async () => {
const { server: innerServer, createRouter, registerOnPreAuth } = await server.setup(
setupDeps
);
const router = createRouter('/');

const nextSpy = jest.fn();
const completeSpy = jest.fn();

registerOnPreAuth((req, res, toolkit) => {
req.events.aborted$.subscribe({
next: nextSpy,
complete: completeSpy,
});
return toolkit.next();
});

router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
}
);

await server.start();

await supertest(innerServer.listener).post('/').send({ data: 'test' }).expect(200);

expect(nextSpy).toHaveBeenCalledTimes(0);
expect(completeSpy).toHaveBeenCalledTimes(1);
});
});

describe('completed$', () => {
it('emits once and completes when response is sent', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();
const completeSpy = jest.fn();

router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: completeSpy,
});

expect(nextSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();
return res.ok({ body: 'ok' });
});

await server.start();

await supertest(innerServer.listener).get('/').expect(200);
expect(nextSpy).toHaveBeenCalledTimes(1);
expect(completeSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when response is aborted', async (done) => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

router.get({ path: '/', validate: false }, async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
done();
},
});

expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.get('/')
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
});
});
});
});
19 changes: 17 additions & 2 deletions src/core/server/http/router/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ export interface KibanaRequestEvents {
* Observable that emits once if and when the request has been aborted.
*/
aborted$: Observable<void>;

/**
* Observable that emits once if and when the request has been completely handled.
*
* @remarks
* The request may be considered completed if:
* - A response has been sent to the client; or
* - The request was aborted.
*/
completed$: Observable<void>;
}

/**
Expand Down Expand Up @@ -186,11 +196,16 @@ export class KibanaRequest<

private getEvents(request: Request): KibanaRequestEvents {
const finish$ = merge(
fromEvent(request.raw.req, 'end'), // all data consumed
Copy link
Contributor

@mshustov mshustov Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial implementation was based on nodejs/node#21705 discussion and asserted by test #55061 (comment), showing that Request.end happens after Response.finish, even though it shouldn't. I mistakenly assumed the fact that this behavior is consistent across all requests.
2020-07-31_09-02-11

fromEvent(request.raw.res, 'finish'), // Response has been sent
fromEvent(request.raw.req, 'close') // connection was closed
).pipe(shareReplay(1), first());

const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's becoming complicated 😓

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to any ideas on how to improve, but this seemed the most clear for now.


return {
aborted$: fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)),
aborted$,
completed$,
} as const;
}

Expand Down
1 change: 1 addition & 0 deletions src/core/server/server.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,7 @@ export class KibanaRequest<Params = unknown, Query = unknown, Body = unknown, Me
// @public
export interface KibanaRequestEvents {
aborted$: Observable<void>;
completed$: Observable<void>;
}

// @public
Expand Down