diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts index d7e8ee379e528..668574dbb8f1d 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.test.ts @@ -47,7 +47,7 @@ describe('ElasticV3ServerShipper', () => { initContext ); // eslint-disable-next-line dot-notation - shipper['firstTimeOffline'] = null; + shipper['firstTimeOffline'] = null; // The tests think connectivity is OK initially for easier testing. }); afterEach(() => { @@ -57,7 +57,7 @@ describe('ElasticV3ServerShipper', () => { test('set optIn should update the isOptedIn$ observable', () => { // eslint-disable-next-line dot-notation - const getInternalOptIn = () => shipper['isOptedIn']; + const getInternalOptIn = () => shipper['isOptedIn$'].value; // Initially undefined expect(getInternalOptIn()).toBeUndefined(); @@ -342,97 +342,242 @@ describe('ElasticV3ServerShipper', () => { }) ); - test( - 'connectivity check is run after report failure', - fakeSchedulers(async (advance) => { - fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); - shipper.reportEvents(events); - shipper.optIn(true); - const counter = firstValueFrom(shipper.telemetryCounter$); - setLastBatchSent(Date.now() - 10 * MINUTES); - advance(10 * MINUTES); - expect(fetchMock).toHaveBeenNthCalledWith( - 1, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { - body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', - headers: { - 'content-type': 'application/x-ndjson', - 'x-elastic-cluster-id': 'UNKNOWN', - 'x-elastic-stack-version': '1.2.3', - }, - method: 'POST', - query: { debug: true }, - } + describe('Connectivity Checks', () => { + describe('connectivity check when connectivity is confirmed (firstTimeOffline === null)', () => { + test.each([undefined, false, true])('does not run for opt-in %p', (optInValue) => + fakeSchedulers(async (advance) => { + if (optInValue !== undefined) { + shipper.optIn(optInValue); + } + + // From the start, it doesn't check connectivity because already confirmed + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + + // Wait a big time (1 minute should be enough, but for the sake of tests...) + advance(10 * MINUTES); + await nextTick(); + + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + })() ); - await expect(counter).resolves.toMatchInlineSnapshot(` - Object { - "code": "Failed to fetch", - "count": 1, - "event_type": "test-event-type", - "source": "elastic_v3_server", - "type": "failed", - } - `); - fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); - advance(1 * MINUTES); - await nextTick(); - expect(fetchMock).toHaveBeenNthCalledWith( - 2, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { method: 'OPTIONS' } + }); + + describe('connectivity check with initial unknown state of the connectivity', () => { + beforeEach(() => { + // eslint-disable-next-line dot-notation + shipper['firstTimeOffline'] = undefined; // Initial unknown state of the connectivity + }); + + test.each([undefined, false])('does not run for opt-in %p', (optInValue) => + fakeSchedulers(async (advance) => { + if (optInValue !== undefined) { + shipper.optIn(optInValue); + } + + // From the start, it doesn't check connectivity because already confirmed + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + + // Wait a big time (1 minute should be enough, but for the sake of tests...) + advance(10 * MINUTES); + await nextTick(); + + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + })() ); - fetchMock.mockResolvedValueOnce({ ok: false }); - advance(2 * MINUTES); - await nextTick(); - expect(fetchMock).toHaveBeenNthCalledWith( - 3, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { method: 'OPTIONS' } + + test('runs as soon as opt-in is set to true', () => { + shipper.optIn(true); + + // From the start, it doesn't check connectivity because opt-in is not true + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + }); + }); + + describe('connectivity check with the connectivity confirmed to be faulty', () => { + beforeEach(() => { + // eslint-disable-next-line dot-notation + shipper['firstTimeOffline'] = 100; // Failed at some point + }); + + test.each([undefined, false])('does not run for opt-in %p', (optInValue) => + fakeSchedulers(async (advance) => { + if (optInValue !== undefined) { + shipper.optIn(optInValue); + } + + // From the start, it doesn't check connectivity because already confirmed + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + + // Wait a big time (1 minute should be enough, but for the sake of tests...) + advance(10 * MINUTES); + await nextTick(); + + expect(fetchMock).not.toHaveBeenCalledWith( + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + })() ); - // let's see the effect of after 24 hours: - shipper.reportEvents(events); - // eslint-disable-next-line dot-notation - expect(shipper['internalQueue'].length).toBe(1); - // eslint-disable-next-line dot-notation - shipper['firstTimeOffline'] = 100; + test('runs as soon as opt-in is set to true', () => { + shipper.optIn(true); - fetchMock.mockResolvedValueOnce({ ok: false }); - advance(4 * MINUTES); - await nextTick(); - expect(fetchMock).toHaveBeenNthCalledWith( - 4, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { method: 'OPTIONS' } + // From the start, it doesn't check connectivity because opt-in is not true + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + }); + }); + + describe('after report failure', () => { + // generate the report failure for each test + beforeEach( + fakeSchedulers(async (advance) => { + fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); + shipper.reportEvents(events); + shipper.optIn(true); + const counter = firstValueFrom(shipper.telemetryCounter$); + setLastBatchSent(Date.now() - 10 * MINUTES); + advance(10 * MINUTES); + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { + body: '{"timestamp":"2020-01-01T00:00:00.000Z","event_type":"test-event-type","context":{},"properties":{}}\n', + headers: { + 'content-type': 'application/x-ndjson', + 'x-elastic-cluster-id': 'UNKNOWN', + 'x-elastic-stack-version': '1.2.3', + }, + method: 'POST', + query: { debug: true }, + } + ); + await expect(counter).resolves.toMatchInlineSnapshot(` + Object { + "code": "Failed to fetch", + "count": 1, + "event_type": "test-event-type", + "source": "elastic_v3_server", + "type": "failed", + } + `); + }) ); - // eslint-disable-next-line dot-notation - expect(shipper['internalQueue'].length).toBe(0); - // New events are not added to the queue because it's been offline for 24 hours. - shipper.reportEvents(events); - // eslint-disable-next-line dot-notation - expect(shipper['internalQueue'].length).toBe(0); - - // Regains connection - fetchMock.mockResolvedValueOnce({ ok: true }); - advance(8 * MINUTES); - await nextTick(); - expect(fetchMock).toHaveBeenNthCalledWith( - 5, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { method: 'OPTIONS' } + test( + 'connectivity check runs periodically', + fakeSchedulers(async (advance) => { + fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); + advance(1 * MINUTES); + await nextTick(); + expect(fetchMock).toHaveBeenNthCalledWith( + 2, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + fetchMock.mockResolvedValueOnce({ ok: false }); + advance(2 * MINUTES); + await nextTick(); + expect(fetchMock).toHaveBeenNthCalledWith( + 3, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + }) ); - // eslint-disable-next-line dot-notation - expect(shipper['firstTimeOffline']).toBe(null); + }); + + describe('after being offline for longer than 24h', () => { + beforeEach(() => { + shipper.optIn(true); + shipper.reportEvents(events); + // eslint-disable-next-line dot-notation + expect(shipper['internalQueue'].length).toBe(1); + // eslint-disable-next-line dot-notation + shipper['firstTimeOffline'] = 100; + }); - advance(16 * MINUTES); - await nextTick(); - expect(fetchMock).not.toHaveBeenNthCalledWith( - 6, - 'https://telemetry-staging.elastic.co/v3/send/test-channel', - { method: 'OPTIONS' } + test( + 'the following connectivity check clears the queue', + fakeSchedulers(async (advance) => { + fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); + advance(1 * MINUTES); + await nextTick(); + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + // eslint-disable-next-line dot-notation + expect(shipper['internalQueue'].length).toBe(0); + }) ); - }) - ); + + test( + 'new events are not added to the queue', + fakeSchedulers(async (advance) => { + fetchMock.mockRejectedValueOnce(new Error('Failed to fetch')); + advance(1 * MINUTES); + await nextTick(); + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + // eslint-disable-next-line dot-notation + expect(shipper['internalQueue'].length).toBe(0); + + shipper.reportEvents(events); + // eslint-disable-next-line dot-notation + expect(shipper['internalQueue'].length).toBe(0); + }) + ); + + test( + 'regains the connection', + fakeSchedulers(async (advance) => { + fetchMock.mockResolvedValueOnce({ ok: true }); + advance(1 * MINUTES); + await nextTick(); + expect(fetchMock).toHaveBeenNthCalledWith( + 1, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + // eslint-disable-next-line dot-notation + expect(shipper['firstTimeOffline']).toBe(null); + + advance(10 * MINUTES); + await nextTick(); + expect(fetchMock).not.toHaveBeenNthCalledWith( + 2, + 'https://telemetry-staging.elastic.co/v3/send/test-channel', + { method: 'OPTIONS' } + ); + }) + ); + }); + }); }); diff --git a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts index 1a0b76f13c286..e8cb3c7bff5db 100644 --- a/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts +++ b/packages/analytics/shippers/elastic_v3/server/src/server_shipper.ts @@ -22,6 +22,8 @@ import { delayWhen, takeUntil, map, + BehaviorSubject, + exhaustMap, } from 'rxjs'; import type { AnalyticsClientInitContext, @@ -30,8 +32,8 @@ import type { IShipper, TelemetryCounter, } from '@kbn/analytics-client'; -import type { ElasticV3ShipperOptions } from '@kbn/analytics-shippers-elastic-v3-common'; import { + type ElasticV3ShipperOptions, buildHeaders, buildUrl, createTelemetryCounterHelper, @@ -62,6 +64,7 @@ export class ElasticV3ServerShipper implements IShipper { private readonly internalQueue: Event[] = []; private readonly shutdown$ = new ReplaySubject(1); + private readonly isOptedIn$ = new BehaviorSubject(undefined); private readonly url: string; @@ -69,7 +72,6 @@ export class ElasticV3ServerShipper implements IShipper { private clusterUuid: string = 'UNKNOWN'; private licenseId?: string; - private isOptedIn?: boolean; /** * Specifies when it went offline: @@ -116,7 +118,7 @@ export class ElasticV3ServerShipper implements IShipper { * @param isOptedIn `true` for resume sending events. `false` to stop. */ public optIn(isOptedIn: boolean) { - this.isOptedIn = isOptedIn; + this.isOptedIn$.next(isOptedIn); if (isOptedIn === false) { this.internalQueue.length = 0; @@ -129,7 +131,7 @@ export class ElasticV3ServerShipper implements IShipper { */ public reportEvents(events: Event[]) { if ( - this.isOptedIn === false || + this.isOptedIn$.value === false || (this.firstTimeOffline && Date.now() - this.firstTimeOffline > 24 * HOUR) ) { return; @@ -157,6 +159,7 @@ export class ElasticV3ServerShipper implements IShipper { public shutdown() { this.shutdown$.next(); this.shutdown$.complete(); + this.isOptedIn$.complete(); } /** @@ -169,11 +172,17 @@ export class ElasticV3ServerShipper implements IShipper { */ private checkConnectivity() { let backoff = 1 * MINUTE; - timer(0, 1 * MINUTE) + merge( + timer(0, 1 * MINUTE), + // Also react to opt-in changes to avoid being stalled for 1 minute for the first connectivity check. + // More details in: https://github.com/elastic/kibana/issues/135647 + this.isOptedIn$ + ) .pipe( takeUntil(this.shutdown$), - filter(() => this.isOptedIn === true && this.firstTimeOffline !== null), - concatMap(async () => { + filter(() => this.isOptedIn$.value === true && this.firstTimeOffline !== null), + // Using exhaustMap here because one request at a time is enough to check the connectivity. + exhaustMap(async () => { const { ok } = await fetch(this.url, { method: 'OPTIONS', }); @@ -215,7 +224,7 @@ export class ElasticV3ServerShipper implements IShipper { ) .pipe( // Only move ahead if it's opted-in and online. - filter(() => this.isOptedIn === true && this.firstTimeOffline === null), + filter(() => this.isOptedIn$.value === true && this.firstTimeOffline === null), // Send the events now if (validations sorted from cheapest to most CPU expensive): // - We are shutting down. @@ -241,6 +250,7 @@ export class ElasticV3ServerShipper implements IShipper { // 2. Skip empty buffers filter((events) => events.length > 0), // 3. Actually send the events + // Using `concatMap` here because we want to send events whenever the emitter says so. Otherwise, it'd skip sending some events. concatMap(async (eventsToSend) => await this.sendEvents(eventsToSend)) ) .subscribe();