Skip to content

Commit

Permalink
[8.11] Fix some unhandledRejections (#168009) (#168208)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.11`:
- [Fix some unhandledRejections
(#168009)](#168009)

<!--- Backport version: 8.9.7 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Alejandro Fernández
Haro","email":"[email protected]"},"sourceCommit":{"committedDate":"2023-10-06T11:44:52Z","message":"Fix
some unhandledRejections
(#168009)","sha":"baff0eb32c894eed537bb563068c9743e98e2412","branchLabelMapping":{"^v8.12.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["Team:Core","performance","Team:Security",":ml","Team:Presentation","Team:uptime","release_note:skip","Team:ResponseOps","Team:ML","Team:SharedUX","backport:prev-minor","Team:Performance","v8.12.0"],"number":168009,"url":"https://github.com/elastic/kibana/pull/168009","mergeCommit":{"message":"Fix
some unhandledRejections
(#168009)","sha":"baff0eb32c894eed537bb563068c9743e98e2412"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v8.12.0","labelRegex":"^v8.12.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/168009","number":168009,"mergeCommit":{"message":"Fix
some unhandledRejections
(#168009)","sha":"baff0eb32c894eed537bb563068c9743e98e2412"}}]}]
BACKPORT-->

Co-authored-by: Alejandro Fernández Haro <[email protected]>
  • Loading branch information
kibanamachine and afharo authored Oct 6, 2023
1 parent 79224cc commit 467f42b
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 114 deletions.
5 changes: 3 additions & 2 deletions dev_docs/tutorials/testing_plugins.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ The more interesting logic is in `renderApp`:
/** public/application.ts */
import React from 'react';
import ReactDOM from 'react-dom';
import { switchMap } from 'rxjs';

import { AppMountParameters, CoreStart } from 'src/core/public';
import { AppRoot } from './components/app_root';
Expand All @@ -493,10 +494,10 @@ export const renderApp = (

// uiSettings subscription
const uiSettingsClient = core.uiSettings.client;
const pollingSubscription = uiSettingClient.get$('mysetting1').subscribe(async mySetting1 => {
const pollingSubscription = uiSettingClient.get$('mysetting1').pipe(switchMap(async (mySetting1) => {
const value = core.http.fetch(/** use `mySetting1` in request **/);
// ...
});
})).subscribe();

// Render app
ReactDOM.render(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import { takeUntil, finalize, map } from 'rxjs/operators';
import { Observable, timer } from 'rxjs';
import { Observable, timer, switchMap } from 'rxjs';
import type { ISavedObjectsRepository } from '@kbn/core/server';
import type { IEventLoopDelaysMonitor, IntervalHistogram } from '@kbn/core/server';
import {
Expand Down Expand Up @@ -46,17 +46,18 @@ export function startTrackingEventLoopDelaysUsage(
.pipe(
map((i) => (i + 1) % resetOnCount === 0),
takeUntil(stopMonitoringEventLoop$),
finalize(() => eventLoopDelaysMonitor.stop())
finalize(() => eventLoopDelaysMonitor.stop()),
switchMap(async (shouldReset) => {
const histogram = eventLoopDelaysMonitor.collect();
if (shouldReset) {
eventLoopDelaysMonitor.reset();
}
try {
await storeHistogram(histogram, internalRepository, instanceUuid);
} catch (e) {
// do not crash if cannot store a histogram.
}
})
)
.subscribe(async (shouldReset) => {
const histogram = eventLoopDelaysMonitor.collect();
if (shouldReset) {
eventLoopDelaysMonitor.reset();
}
try {
await storeHistogram(histogram, internalRepository, instanceUuid);
} catch (e) {
// do not crash if cannot store a histogram.
}
});
.subscribe();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export function startTrackingEventLoopDelaysThreshold(
takeUntil(stopMonitoringEventLoop$),
finalize(() => eventLoopDelaysMonitor.stop())
)
.subscribe(async () => {
.subscribe(() => {
const { mean: meanMS } = eventLoopDelaysMonitor.collect();

if (meanMS > warnThreshold) {
Expand Down
10 changes: 7 additions & 3 deletions src/plugins/links/public/embeddable/links_embeddable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/

import React, { createContext, useContext } from 'react';
import { Subscription, distinctUntilChanged, skip } from 'rxjs';
import { Subscription, distinctUntilChanged, skip, switchMap } from 'rxjs';
import deepEqual from 'fast-deep-equal';

import {
Expand Down Expand Up @@ -104,8 +104,12 @@ export class LinksEmbeddable
// By-value panels should update the componentState when input changes
this.subscriptions.add(
this.getInput$()
.pipe(distinctUntilChanged(deepEqual), skip(1))
.subscribe(async () => await this.initializeSavedLinks())
.pipe(
distinctUntilChanged(deepEqual),
skip(1),
switchMap(async () => await this.initializeSavedLinks())
)
.subscribe()
);
}

Expand Down
48 changes: 27 additions & 21 deletions src/plugins/telemetry/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import type {
import type { HomePublicPluginSetup } from '@kbn/home-plugin/public';
import { ElasticV3BrowserShipper } from '@kbn/analytics-shippers-elastic-v3-browser';

import { BehaviorSubject, map, tap } from 'rxjs';
import { BehaviorSubject, map, switchMap, tap } from 'rxjs';
import type { TelemetryConfigLabels } from '../server/config';
import { FetchTelemetryConfigRoute, INTERNAL_VERSION } from '../common/routes';
import type { v2 } from '../common/types';
Expand Down Expand Up @@ -246,26 +246,32 @@ export class TelemetryPlugin
});
this.telemetryNotifications = telemetryNotifications;

application.currentAppId$.subscribe(async () => {
// Refresh and get telemetry config
const updatedConfig = await this.refreshConfig(http);

analytics.optIn({
global: { enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode() },
});

const isUnauthenticated = this.getIsUnauthenticated(http);
if (isUnauthenticated) {
return;
}

const telemetryBanner = updatedConfig?.banner;

this.maybeStartTelemetryPoller();
if (telemetryBanner) {
this.maybeShowOptedInNotificationBanner();
}
});
application.currentAppId$
.pipe(
switchMap(async () => {
// Refresh and get telemetry config
const updatedConfig = await this.refreshConfig(http);

analytics.optIn({
global: {
enabled: this.telemetryService!.isOptedIn && !screenshotMode.isScreenshotMode(),
},
});

const isUnauthenticated = this.getIsUnauthenticated(http);
if (isUnauthenticated) {
return;
}

const telemetryBanner = updatedConfig?.banner;

this.maybeStartTelemetryPoller();
if (telemetryBanner) {
this.maybeShowOptedInNotificationBanner();
}
})
)
.subscribe();

return {
telemetryService: this.getTelemetryServicePublicApis(),
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/aiops/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class AiopsPlugin
// This way we can pass on license changes to the route factory having always
// the current license because it's stored in a mutable attribute.
const aiopsLicense: AiopsLicense = { isActivePlatinumLicense: false };
this.licenseSubscription = plugins.licensing.license$.subscribe(async (license) => {
this.licenseSubscription = plugins.licensing.license$.subscribe((license) => {
aiopsLicense.isActivePlatinumLicense = isActiveLicense('platinum', license);

if (aiopsLicense.isActivePlatinumLicense) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import querystring from 'querystring';
import React from 'react';
import { renderToString } from 'react-dom/server';
import type { Observable, Subscription } from 'rxjs';
import { switchMap } from 'rxjs';

import type {
CapabilitiesSetup,
Expand Down Expand Up @@ -209,18 +210,22 @@ export class AuthorizationService {
validateFeaturePrivileges(allFeatures);
validateReservedPrivileges(allFeatures);

this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => {
try {
await registerPrivilegesWithCluster(
this.logger,
this.privileges,
this.applicationName,
clusterClient
);
} catch (err) {
scheduleRetry();
}
});
this.statusSubscription = online$
.pipe(
switchMap(async ({ scheduleRetry }) => {
try {
await registerPrivilegesWithCluster(
this.logger,
this.privileges,
this.applicationName,
clusterClient
);
} catch (err) {
scheduleRetry();
}
})
)
.subscribe();
}

stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { Observable, Subscription } from 'rxjs';
import { switchMap } from 'rxjs';

import type { ElasticsearchClient, HttpServiceSetup, Logger } from '@kbn/core/server';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
Expand Down Expand Up @@ -90,13 +91,20 @@ export class SessionManagementService {
auditLogger: audit.withoutRequest,
});

this.statusSubscription = online$.subscribe(async ({ scheduleRetry }) => {
try {
await Promise.all([this.sessionIndex.initialize(), this.scheduleCleanupTask(taskManager)]);
} catch (err) {
scheduleRetry();
}
});
this.statusSubscription = online$
.pipe(
switchMap(async ({ scheduleRetry }) => {
try {
await Promise.all([
this.sessionIndex.initialize(),
this.scheduleCleanupTask(taskManager),
]);
} catch (err) {
scheduleRetry();
}
})
)
.subscribe();

return {
session: new Session({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
TaskManagerSetupContract,
TaskManagerStartContract,
} from '@kbn/task-manager-plugin/server';
import { Subject } from 'rxjs';
import { concatMap, Subject } from 'rxjs';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import pMap from 'p-map';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common';
Expand Down Expand Up @@ -377,45 +377,49 @@ export class SyntheticsService {

let output: ServiceData['output'] | null = null;

subject.subscribe(async (monitors) => {
try {
if (monitors.length === 0 || !this.config.manifestUrl) {
return;
}
subject
.pipe(
concatMap(async (monitors) => {
try {
if (monitors.length === 0 || !this.config.manifestUrl) {
return;
}

if (!output) {
output = await this.getOutput();
if (!output) {
output = await this.getOutput();

if (!output) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'API key is not valid.',
message: 'Failed to push configs. API key is not valid.',
type: 'invalidApiKey',
stackVersion: service.server.stackVersion,
});
return;
}
}

if (!output) {
this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

service.syncErrors = await this.apiClient.syncMonitors({
monitors,
output,
license,
});
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'API key is not valid.',
message: 'Failed to push configs. API key is not valid.',
type: 'invalidApiKey',
reason: 'Failed to push configs to service',
message: e?.message,
type: 'pushConfigsError',
code: e?.code,
status: e.status,
stackVersion: service.server.stackVersion,
});
return;
this.logger.error(e);
}
}

this.logger.debug(`${monitors.length} monitors will be pushed to synthetics service.`);

service.syncErrors = await this.apiClient.syncMonitors({
monitors,
output,
license,
});
} catch (e) {
sendErrorTelemetryEvents(service.logger, service.server.telemetry, {
reason: 'Failed to push configs to service',
message: e?.message,
type: 'pushConfigsError',
code: e?.code,
status: e.status,
stackVersion: service.server.stackVersion,
});
this.logger.error(e);
}
});
})
)
.subscribe();

await this.getMonitorConfigs(subject);
}
Expand Down Expand Up @@ -479,25 +483,29 @@ export class SyntheticsService {
const license = await this.getLicense();
const subject = new Subject<MonitorFields[]>();

subject.subscribe(async (monitors) => {
const hasPublicLocations = monitors.some((config) =>
config.locations.some(({ isServiceManaged }) => isServiceManaged)
);

if (hasPublicLocations) {
const output = await this.getOutput();
if (!output) {
return;
}

const data = {
output,
monitors,
license,
};
return await this.apiClient.delete(data);
}
});
subject
.pipe(
concatMap(async (monitors) => {
const hasPublicLocations = monitors.some((config) =>
config.locations.some(({ isServiceManaged }) => isServiceManaged)
);

if (hasPublicLocations) {
const output = await this.getOutput();
if (!output) {
return;
}

const data = {
output,
monitors,
license,
};
return await this.apiClient.delete(data);
}
})
)
.subscribe();

await this.getMonitorConfigs(subject);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class EphemeralTaskLifecycle {
);
})
)
.subscribe(async (e) => {
.subscribe((e) => {
let overallCapacity = this.getCapacity();
const capacityByType = new Map<string, number>();
const tasksWithinCapacity = [...this.ephemeralTaskQueue]
Expand Down

0 comments on commit 467f42b

Please sign in to comment.