diff --git a/dev_docs/tutorials/testing_plugins.mdx b/dev_docs/tutorials/testing_plugins.mdx index 84e43310e46c8..e4a1d3e12aeb7 100644 --- a/dev_docs/tutorials/testing_plugins.mdx +++ b/dev_docs/tutorials/testing_plugins.mdx @@ -479,6 +479,7 @@ The more interesting logic is in `renderApp`: /** public/application.ts */ import React from 'react'; import ReactDOM from 'react-dom'; +import { switchMap } from 'rxjs'; import { AppMountParameters, CoreStart } from 'src/core/public'; import { AppRoot } from './components/app_root'; @@ -493,10 +494,10 @@ export const renderApp = ( // uiSettings subscription const uiSettingsClient = core.uiSettings.client; - const pollingSubscription = uiSettingClient.get$('mysetting1').subscribe(async mySetting1 => { + const pollingSubscription = uiSettingClient.get$('mysetting1').pipe(switchMap(async (mySetting1) => { const value = core.http.fetch(/** use `mySetting1` in request **/); // ... - }); + })).subscribe(); // Render app ReactDOM.render( diff --git a/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_delays.ts b/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_delays.ts index 2910c8343c811..6516565c8fa5a 100644 --- a/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_delays.ts +++ b/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_delays.ts @@ -7,7 +7,7 @@ */ import { takeUntil, finalize, map } from 'rxjs/operators'; -import { Observable, timer } from 'rxjs'; +import { Observable, timer, switchMap } from 'rxjs'; import type { ISavedObjectsRepository } from '@kbn/core/server'; import type { IEventLoopDelaysMonitor, IntervalHistogram } from '@kbn/core/server'; import { @@ -46,17 +46,18 @@ export function startTrackingEventLoopDelaysUsage( .pipe( map((i) => (i + 1) % resetOnCount === 0), takeUntil(stopMonitoringEventLoop$), - finalize(() => eventLoopDelaysMonitor.stop()) + finalize(() => eventLoopDelaysMonitor.stop()), + switchMap(async (shouldReset) => { + const histogram = eventLoopDelaysMonitor.collect(); + if (shouldReset) { + eventLoopDelaysMonitor.reset(); + } + try { + await storeHistogram(histogram, internalRepository, instanceUuid); + } catch (e) { + // do not crash if cannot store a histogram. + } + }) ) - .subscribe(async (shouldReset) => { - const histogram = eventLoopDelaysMonitor.collect(); - if (shouldReset) { - eventLoopDelaysMonitor.reset(); - } - try { - await storeHistogram(histogram, internalRepository, instanceUuid); - } catch (e) { - // do not crash if cannot store a histogram. - } - }); + .subscribe(); } diff --git a/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_threshold.ts b/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_threshold.ts index 9d650bdc4fc18..dbfc670ffdbbd 100644 --- a/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_threshold.ts +++ b/src/plugins/kibana_usage_collection/server/collectors/event_loop_delays/track_threshold.ts @@ -45,7 +45,7 @@ export function startTrackingEventLoopDelaysThreshold( takeUntil(stopMonitoringEventLoop$), finalize(() => eventLoopDelaysMonitor.stop()) ) - .subscribe(async () => { + .subscribe(() => { const { mean: meanMS } = eventLoopDelaysMonitor.collect(); if (meanMS > warnThreshold) { diff --git a/src/plugins/links/public/embeddable/links_embeddable.tsx b/src/plugins/links/public/embeddable/links_embeddable.tsx index 863bda323c39b..a29df393dd54b 100644 --- a/src/plugins/links/public/embeddable/links_embeddable.tsx +++ b/src/plugins/links/public/embeddable/links_embeddable.tsx @@ -7,7 +7,7 @@ */ import React, { createContext, useContext } from 'react'; -import { Subscription, distinctUntilChanged, skip } from 'rxjs'; +import { Subscription, distinctUntilChanged, skip, switchMap } from 'rxjs'; import deepEqual from 'fast-deep-equal'; import { @@ -104,8 +104,12 @@ export class LinksEmbeddable // By-value panels should update the componentState when input changes this.subscriptions.add( this.getInput$() - .pipe(distinctUntilChanged(deepEqual), skip(1)) - .subscribe(async () => await this.initializeSavedLinks()) + .pipe( + distinctUntilChanged(deepEqual), + skip(1), + switchMap(async () => await this.initializeSavedLinks()) + ) + .subscribe() ); } diff --git a/src/plugins/telemetry/public/plugin.ts b/src/plugins/telemetry/public/plugin.ts index 6300a56486169..59ff2d3478c1d 100644 --- a/src/plugins/telemetry/public/plugin.ts +++ b/src/plugins/telemetry/public/plugin.ts @@ -24,7 +24,7 @@ import type { import type { HomePublicPluginSetup } from '@kbn/home-plugin/public'; import { ElasticV3BrowserShipper } from '@kbn/analytics-shippers-elastic-v3-browser'; -import { BehaviorSubject, map, tap } from 'rxjs'; +import { BehaviorSubject, map, switchMap, tap } from 'rxjs'; import type { TelemetryConfigLabels } from '../server/config'; import { FetchTelemetryConfigRoute, INTERNAL_VERSION } from '../common/routes'; import type { v2 } from '../common/types'; @@ -246,26 +246,32 @@ export class TelemetryPlugin }); this.telemetryNotifications = telemetryNotifications; - application.currentAppId$.subscribe(async () => { - // Refresh and get telemetry config - const updatedConfig = await this.refreshConfig(http); - - analytics.optIn({ - global: { enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode() }, - }); - - const isUnauthenticated = this.getIsUnauthenticated(http); - if (isUnauthenticated) { - return; - } - - const telemetryBanner = updatedConfig?.banner; - - this.maybeStartTelemetryPoller(); - if (telemetryBanner) { - this.maybeShowOptedInNotificationBanner(); - } - }); + application.currentAppId$ + .pipe( + switchMap(async () => { + // Refresh and get telemetry config + const updatedConfig = await this.refreshConfig(http); + + analytics.optIn({ + global: { + enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode(), + }, + }); + + const isUnauthenticated = this.getIsUnauthenticated(http); + if (isUnauthenticated) { + return; + } + + const telemetryBanner = updatedConfig?.banner; + + this.maybeStartTelemetryPoller(); + if (telemetryBanner) { + this.maybeShowOptedInNotificationBanner(); + } + }) + ) + .subscribe(); return { telemetryService: this.getTelemetryServicePublicApis(), diff --git a/x-pack/plugins/aiops/server/plugin.ts b/x-pack/plugins/aiops/server/plugin.ts index 9eb613d7e3524..5cb23cb6ce57d 100755 --- a/x-pack/plugins/aiops/server/plugin.ts +++ b/x-pack/plugins/aiops/server/plugin.ts @@ -46,7 +46,7 @@ export class AiopsPlugin // This way we can pass on license changes to the route factory having always // the current license because it's stored in a mutable attribute. const aiopsLicense: AiopsLicense = { isActivePlatinumLicense: false }; - this.licenseSubscription = plugins.licensing.license$.subscribe(async (license) => { + this.licenseSubscription = plugins.licensing.license$.subscribe((license) => { aiopsLicense.isActivePlatinumLicense = isActiveLicense('platinum', license); if (aiopsLicense.isActivePlatinumLicense) { diff --git a/x-pack/plugins/security/server/authorization/authorization_service.tsx b/x-pack/plugins/security/server/authorization/authorization_service.tsx index 6e0fda1fa3d8e..a2b32a0a6b13e 100644 --- a/x-pack/plugins/security/server/authorization/authorization_service.tsx +++ b/x-pack/plugins/security/server/authorization/authorization_service.tsx @@ -9,6 +9,7 @@ import querystring from 'querystring'; import React from 'react'; import { renderToString } from 'react-dom/server'; import type { Observable, Subscription } from 'rxjs'; +import { switchMap } from 'rxjs'; import type { CapabilitiesSetup, @@ -209,18 +210,22 @@ export class AuthorizationService { validateFeaturePrivileges(allFeatures); validateReservedPrivileges(allFeatures); - this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => { - try { - await registerPrivilegesWithCluster( - this.logger, - this.privileges, - this.applicationName, - clusterClient - ); - } catch (err) { - scheduleRetry(); - } - }); + this.statusSubscription = online$ + .pipe( + switchMap(async ({ scheduleRetry }) => { + try { + await registerPrivilegesWithCluster( + this.logger, + this.privileges, + this.applicationName, + clusterClient + ); + } catch (err) { + scheduleRetry(); + } + }) + ) + .subscribe(); } stop() { diff --git a/x-pack/plugins/security/server/session_management/session_management_service.ts b/x-pack/plugins/security/server/session_management/session_management_service.ts index 10395999f502b..4c3298f69bca2 100644 --- a/x-pack/plugins/security/server/session_management/session_management_service.ts +++ b/x-pack/plugins/security/server/session_management/session_management_service.ts @@ -6,6 +6,7 @@ */ import type { Observable, Subscription } from 'rxjs'; +import { switchMap } from 'rxjs'; import type { ElasticsearchClient, HttpServiceSetup, Logger } from '@kbn/core/server'; import { SavedObjectsErrorHelpers } from '@kbn/core/server'; @@ -90,13 +91,20 @@ export class SessionManagementService { auditLogger: audit.withoutRequest, }); - this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => { - try { - await Promise.all([this.sessionIndex.initialize(), this.scheduleCleanupTask(taskManager)]); - } catch (err) { - scheduleRetry(); - } - }); + this.statusSubscription = online$ + .pipe( + switchMap(async ({ scheduleRetry }) => { + try { + await Promise.all([ + this.sessionIndex.initialize(), + this.scheduleCleanupTask(taskManager), + ]); + } catch (err) { + scheduleRetry(); + } + }) + ) + .subscribe(); return { session: new Session({ diff --git a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts index 561a0e5a90e4c..da0434201eaa2 100644 --- a/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts +++ b/x-pack/plugins/synthetics/server/synthetics_service/synthetics_service.ts @@ -14,7 +14,7 @@ import { TaskManagerSetupContract, TaskManagerStartContract, } from '@kbn/task-manager-plugin/server'; -import { Subject } from 'rxjs'; +import { concatMap, Subject } from 'rxjs'; import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server'; import pMap from 'p-map'; import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common'; @@ -377,45 +377,49 @@ export class SyntheticsService { let output: ServiceData['output'] | null = null; - subject.subscribe(async (monitors) => { - try { - if (monitors.length === 0 || !this.config.manifestUrl) { - return; - } + subject + .pipe( + concatMap(async (monitors) => { + try { + if (monitors.length === 0 || !this.config.manifestUrl) { + return; + } - if (!output) { - output = await this.getOutput(); + if (!output) { + output = await this.getOutput(); + + if (!output) { + sendErrorTelemetryEvents(service.logger, service.server.telemetry, { + reason: 'API key is not valid.', + message: 'Failed to push configs. API key is not valid.', + type: 'invalidApiKey', + stackVersion: service.server.stackVersion, + }); + return; + } + } - if (!output) { + this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); + + service.syncErrors = await this.apiClient.syncMonitors({ + monitors, + output, + license, + }); + } catch (e) { sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'API key is not valid.', - message: 'Failed to push configs. API key is not valid.', - type: 'invalidApiKey', + reason: 'Failed to push configs to service', + message: e?.message, + type: 'pushConfigsError', + code: e?.code, + status: e.status, stackVersion: service.server.stackVersion, }); - return; + this.logger.error(e); } - } - - this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`); - - service.syncErrors = await this.apiClient.syncMonitors({ - monitors, - output, - license, - }); - } catch (e) { - sendErrorTelemetryEvents(service.logger, service.server.telemetry, { - reason: 'Failed to push configs to service', - message: e?.message, - type: 'pushConfigsError', - code: e?.code, - status: e.status, - stackVersion: service.server.stackVersion, - }); - this.logger.error(e); - } - }); + }) + ) + .subscribe(); await this.getMonitorConfigs(subject); } @@ -479,25 +483,29 @@ export class SyntheticsService { const license = await this.getLicense(); const subject = new Subject(); - subject.subscribe(async (monitors) => { - const hasPublicLocations = monitors.some((config) => - config.locations.some(({ isServiceManaged }) => isServiceManaged) - ); - - if (hasPublicLocations) { - const output = await this.getOutput(); - if (!output) { - return; - } - - const data = { - output, - monitors, - license, - }; - return await this.apiClient.delete(data); - } - }); + subject + .pipe( + concatMap(async (monitors) => { + const hasPublicLocations = monitors.some((config) => + config.locations.some(({ isServiceManaged }) => isServiceManaged) + ); + + if (hasPublicLocations) { + const output = await this.getOutput(); + if (!output) { + return; + } + + const data = { + output, + monitors, + license, + }; + return await this.apiClient.delete(data); + } + }) + ) + .subscribe(); await this.getMonitorConfigs(subject); } diff --git a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts index 16d8cd796809d..e5ca18f39c0b4 100644 --- a/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.ts @@ -85,7 +85,7 @@ export class EphemeralTaskLifecycle { ); }) ) - .subscribe(async (e) => { + .subscribe((e) => { let overallCapacity = this.getCapacity(); const capacityByType = new Map(); const tasksWithinCapacity = [...this.ephemeralTaskQueue]