Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming poc #679

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open

feat: streaming poc #679

wants to merge 10 commits into from

Conversation

kwasniew
Copy link
Contributor

@kwasniew kwasniew commented Dec 2, 2024

About the changes

This is a reference implementation for the streaming client using SSE (Server-Sent Events).

The changes include:

  • production code changes in the repository
  • example how to test the streaming API (we test the API, not the low-level details that are tested in the ES library itself)
  • example showing user facing property to enable streaming

Chosen library:

Main logic:

  • customers can specify mode (polling vs streaming) using experimentalMode property. When they opt into streaming we create EventSource and inject it to the repository
  • in the repository we always make the initial fetch with the HTTP fetch API same as before
  • however when streaming is enabled (= EventSource is injected to the repository) we never do setTimeout calls for the next fetch and we listen to event called 'unleash-updated' from the streaming API and update flags accordingly
  • so effectively our event stream is a flag stream and not just a notification stream that requires the client to fetch data on its own. This way we save a network roundtrip and piggyback on the broadcast message data

More comments with the important decisions inline

Important files

Discussion points

@coveralls
Copy link

coveralls commented Dec 2, 2024

Coverage Status

coverage: 90.385% (-0.2%) from 90.625%
when pulling c8fda8a on streaming-client
into 4115c39 on main.

},
streaming: true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opt into streaming

@@ -13,6 +13,7 @@ import {
Segment,
StrategyTransportInterface,
} from '../strategy/strategy';
const EventSource = require('eventsource');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't get it to work with import, only require worked in a quick PoC

this.eventSource = new EventSource(resolveUrl(this.url, './client/streaming'), {
headers: this.headers,
} as any);
this.eventSource?.addEventListener('message', (event) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required EventSource is seen as potentially undefined. Not sure why that's what ? is handling here

}

timedFetch(interval: number) {
if (interval > 0) {
if (interval > 0 && !this.streaming) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line is all it takes to stop intervals/timeouts and use ES client instead

@kwasniew kwasniew marked this pull request as draft December 2, 2024 17:33
customHeaders: {
Authorization: '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0',
},
experimentalMode: { type: 'streaming' },
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has to be provided explicitly to opt into streaming. The reason I model it as {type: 'polling'} | {type: 'streaming'} union is to add streaming config option later {type: 'streaming', config: {refresh, backoff}}

experimentalMode: { type: 'streaming' },
skipInstanceCountWarning: true,
});
client.on('changed', () => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes are reflected instantly in the event emitter

@@ -13,6 +13,8 @@ import {
Segment,
StrategyTransportInterface,
} from '../strategy/strategy';
// @ts-expect-error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this library does not provide any types so we have to do this for now until we provide a type ourselves

@@ -105,6 +110,7 @@ export default class Repository extends EventEmitter implements EventEmitter {
bootstrapProvider,
bootstrapOverride = true,
storageProvider,
eventSource,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we make EventSource injectable so that it's easy to swap in testing for a simple mock

this.emit(UnleashEvents.Error, err);
}
});
this.eventSource.addEventListener('error', (error: unknown) => {
Copy link
Contributor Author

@kwasniew kwasniew Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now I decided to translate event source errors to warnings in Unleash event API

}

timedFetch(interval: number) {
if (interval > 0) {
if (interval > 0 && !this.eventSource) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do intervals only when eventSource is not provided

@@ -398,6 +424,9 @@ Message: ${err.message}`,
clearTimeout(this.timer);
}
this.removeAllListeners();
if (this.eventSource) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have to remember about closing the event source when Unleash is closed

@@ -123,6 +126,17 @@ export class Unleash extends EventEmitter {
tags,
bootstrapProvider,
bootstrapOverride,
eventSource:
experimentalMode?.type === 'streaming'
? new EventSource(resolveUrl(unleashUrl, './client/streaming'), {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can expose the low level config options to the public API over time, but for now I set those defaults internally

@kwasniew kwasniew marked this pull request as ready for review December 5, 2024 10:03
],
};
const storageProvider: StorageProvider<ClientFeaturesResponse> = new InMemStorageProvider();
const eventSource = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in-memory mock is trivial to implement. It's just a thin layer on top of the EventEmitter to adjust the API semantic to SSE API

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

3 participants