diff --git a/src/core/public/core_system.ts b/src/core/public/core_system.ts index 2a9dca96062dc..63855adf39f09 100644 --- a/src/core/public/core_system.ts +++ b/src/core/public/core_system.ts @@ -26,6 +26,7 @@ import { ChromeService } from './chrome'; import { FatalErrorsService, FatalErrorsSetup } from './fatal_errors'; import { HttpService } from './http'; import { I18nService } from './i18n'; +import { PulseService } from './pulse'; import { InjectedMetadataParams, InjectedMetadataService, @@ -100,6 +101,7 @@ export class CoreSystem { private readonly rendering: RenderingService; private readonly context: ContextService; private readonly integrations: IntegrationsService; + private readonly pulse: PulseService; private readonly rootDomElement: HTMLElement; private readonly coreContext: CoreContext; @@ -137,6 +139,7 @@ export class CoreSystem { this.rendering = new RenderingService(); this.application = new ApplicationService(); this.integrations = new IntegrationsService(); + this.pulse = new PulseService(); this.coreContext = { coreId: Symbol('core'), env: injectedMetadata.env }; @@ -162,6 +165,7 @@ export class CoreSystem { const http = this.http.setup({ injectedMetadata, fatalErrors: this.fatalErrorsSetup }); const uiSettings = this.uiSettings.setup({ http, injectedMetadata }); const notifications = this.notifications.setup({ uiSettings }); + const pulse = await this.pulse.setup(); const pluginDependencies = this.plugins.getOpaqueIds(); const context = this.context.setup({ @@ -184,6 +188,7 @@ export class CoreSystem { injectedMetadata, notifications, uiSettings, + pulse, }; // Services that do not expose contracts at setup @@ -216,6 +221,7 @@ export class CoreSystem { const i18n = await this.i18n.start(); const application = await this.application.start({ http, injectedMetadata }); await this.integrations.start({ uiSettings }); + const pulse = await this.pulse.start(); const coreUiTargetDomElement = document.createElement('div'); coreUiTargetDomElement.id = 'kibana-body'; @@ -271,6 +277,7 @@ export class CoreSystem { notifications, overlays, uiSettings, + pulse, }; const plugins = await this.plugins.start(core); diff --git a/src/core/public/index.ts b/src/core/public/index.ts index 7488f9b973b71..01226825bbd2e 100644 --- a/src/core/public/index.ts +++ b/src/core/public/index.ts @@ -66,6 +66,7 @@ import { UiSettingsState, IUiSettingsClient } from './ui_settings'; import { ApplicationSetup, Capabilities, ApplicationStart } from './application'; import { DocLinksStart } from './doc_links'; import { SavedObjectsStart } from './saved_objects'; +import { PulseServiceSetup, PulseServiceStart } from './pulse'; export { PackageInfo, EnvironmentMode } from '../server/types'; import { IContextContainer, @@ -177,6 +178,7 @@ export interface CoreSetup { notifications: NotificationsSetup; /** {@link IUiSettingsClient} */ uiSettings: IUiSettingsClient; + pulse: PulseServiceSetup; /** * exposed temporarily until https://github.com/elastic/kibana/issues/41990 done * use *only* to retrieve config values. There is no way to set injected values @@ -219,6 +221,7 @@ export interface CoreStart { i18n: I18nStart; /** {@link NotificationsStart} */ notifications: NotificationsStart; + pulse: PulseServiceStart; /** {@link OverlayStart} */ overlays: OverlayStart; /** {@link IUiSettingsClient} */ @@ -306,4 +309,6 @@ export { PluginOpaqueId, IUiSettingsClient, UiSettingsState, + PulseServiceSetup, + PulseServiceStart, }; diff --git a/src/core/public/plugins/plugin_context.ts b/src/core/public/plugins/plugin_context.ts index 848f46605d4de..de0fe4fd60af9 100644 --- a/src/core/public/plugins/plugin_context.ts +++ b/src/core/public/plugins/plugin_context.ts @@ -104,6 +104,7 @@ export function createPluginSetupContext< http: deps.http, notifications: deps.notifications, uiSettings: deps.uiSettings, + pulse: deps.pulse, injectedMetadata: { getInjectedVar: deps.injectedMetadata.getInjectedVar, }, @@ -147,6 +148,7 @@ export function createPluginStartContext< overlays: deps.overlays, uiSettings: deps.uiSettings, savedObjects: deps.savedObjects, + pulse: deps.pulse, injectedMetadata: { getInjectedVar: deps.injectedMetadata.getInjectedVar, }, diff --git a/src/core/public/pulse/channel.ts b/src/core/public/pulse/channel.ts new file mode 100644 index 0000000000000..77b4a6d2d6416 --- /dev/null +++ b/src/core/public/pulse/channel.ts @@ -0,0 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +export { PulseInstruction, PulseChannel } from '../../server/pulse/channel'; diff --git a/src/core/public/pulse/collectors/default.ts b/src/core/public/pulse/collectors/default.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/default.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/collectors/errors.ts b/src/core/public/pulse/collectors/errors.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/errors.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/collectors/notifications.ts b/src/core/public/pulse/collectors/notifications.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/notifications.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/index.ts b/src/core/public/pulse/index.ts new file mode 100644 index 0000000000000..e236bf14da4b5 --- /dev/null +++ b/src/core/public/pulse/index.ts @@ -0,0 +1,153 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Subject } from 'rxjs'; + +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { InstructionsResponse } from '../../server/pulse'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { PulseChannel, PulseInstruction } from './channel'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { Fetcher, sendPulse } from '../../server/pulse/send_pulse'; + +export interface PulseServiceSetup { + getChannel: (id: string) => PulseChannel; +} + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface PulseServiceStart {} + +const channelNames = ['default', 'notifications', 'errors']; + +export class PulseService { + private retriableErrors = 0; + private readonly channels: Map; + private readonly instructions: Map> = new Map(); + + constructor() { + this.channels = new Map( + channelNames.map((id): [string, PulseChannel] => { + const instructions$ = new Subject(); + this.instructions.set(id, instructions$); + const channel = new PulseChannel({ id, instructions$ }); + return [channel.id, channel]; + }) + ); + } + + public async setup(): Promise { + // poll for instructions every second for this deployment + setInterval(() => { + // eslint-disable-next-line no-console + this.loadInstructions().catch(err => console.error(err.stack)); + }, 10000); + + // eslint-disable-next-line no-console + console.log('Will attempt first telemetry collection in 5 seconds...'); + setTimeout(() => { + setInterval(() => { + // eslint-disable-next-line no-console + this.sendTelemetry().catch(err => console.error(err.stack)); + }, 5000); + }, 5000); + + return { + getChannel: (id: string) => { + const channel = this.channels.get(id); + if (!channel) { + throw new Error(`Unknown channel: ${id}`); + } + return channel; + }, + }; + } + + private async sendTelemetry() { + const fetcher: Fetcher = async (url, channels) => { + return await fetch(url, { + method: 'post', + + headers: { + 'content-type': 'application/json', + 'kbn-xsrf': 'true', + }, + body: JSON.stringify({ + channels, + }), + }); + }; + + return await sendPulse(this.channels, fetcher); + } + + private async loadInstructions() { + const url = 'http://localhost:5601/api/pulse_poc/instructions/123'; + let response: any; + try { + response = await fetch(url); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + this.handleRetriableError(); + return; + } + if (response.status === 503) { + this.handleRetriableError(); + return; + } + + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } + + const responseBody: InstructionsResponse = await response.json(); + + responseBody.channels.forEach(channel => { + const instructions$ = this.instructions.get(channel.id); + if (!instructions$) { + throw new Error( + `Channel (${channel.id}) from service has no corresponding channel handler in client` + ); + } + + channel.instructions.forEach(instruction => instructions$.next(instruction)); + }); + } + + private handleRetriableError() { + this.retriableErrors++; + if (this.retriableErrors === 1) { + // eslint-disable-next-line no-console + console.warn( + 'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...' + ); + } else if (this.retriableErrors > 120) { + this.retriableErrors = 0; + } + } + + async start(): Promise { + return {}; + } + public stop() { + // nothing to do here currently + } +} diff --git a/src/core/server/index.ts b/src/core/server/index.ts index 060265120b865..debf2abc0e465 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -47,6 +47,7 @@ import { IUiSettingsClient, UiSettingsServiceSetup, UiSettingsServiceStart } fro import { SavedObjectsClientContract } from './saved_objects/types'; import { SavedObjectsServiceSetup, SavedObjectsServiceStart } from './saved_objects'; import { CapabilitiesSetup, CapabilitiesStart } from './capabilities'; +import { PulseServiceSetup } from './pulse'; import { UuidServiceSetup } from './uuid'; export { bootstrap } from './bootstrap'; @@ -274,6 +275,7 @@ export interface CoreSetup { /** {@link UiSettingsServiceSetup} */ uiSettings: UiSettingsServiceSetup; /** {@link UuidServiceSetup} */ + pulse: PulseServiceSetup; uuid: UuidServiceSetup; } @@ -298,5 +300,6 @@ export { PluginsServiceSetup, PluginsServiceStart, PluginOpaqueId, + PulseServiceSetup, UuidServiceSetup, }; diff --git a/src/core/server/legacy/legacy_service.ts b/src/core/server/legacy/legacy_service.ts index 2e8a467eff995..c5e8f8dc16a80 100644 --- a/src/core/server/legacy/legacy_service.ts +++ b/src/core/server/legacy/legacy_service.ts @@ -281,6 +281,7 @@ export class LegacyService implements CoreService { dataClient$: setupDeps.core.elasticsearch.dataClient$, createClient: setupDeps.core.elasticsearch.createClient, }, + pulse: setupDeps.core.pulse, http: { createCookieSessionStorageFactory: setupDeps.core.http.createCookieSessionStorageFactory, registerRouteHandlerContext: setupDeps.core.http.registerRouteHandlerContext.bind( diff --git a/src/core/server/plugins/plugin_context.ts b/src/core/server/plugins/plugin_context.ts index 6e9a7967e9eca..8128113b427a1 100644 --- a/src/core/server/plugins/plugin_context.ts +++ b/src/core/server/plugins/plugin_context.ts @@ -137,6 +137,7 @@ export function createPluginSetupContext( plugin: PluginWrapper ): CoreSetup { return { + pulse: deps.pulse, capabilities: { registerProvider: deps.capabilities.registerProvider, registerSwitcher: deps.capabilities.registerSwitcher, diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts index b9e2061afba9f..a37fba7792f62 100644 --- a/src/core/server/pulse/channel.ts +++ b/src/core/server/pulse/channel.ts @@ -18,7 +18,7 @@ */ import { Subject } from 'rxjs'; -import { IClusterClient } from '../elasticsearch'; +// import { IClusterClient } from '../elasticsearch'; export interface PulseInstruction { owner: string; @@ -32,16 +32,24 @@ interface ChannelConfig { } export class PulseChannel { - public readonly getRecords: (elasticsearch: IClusterClient) => Promise>; - + public readonly getRecords: () => Promise>; + private readonly collector: any; constructor(private readonly config: ChannelConfig) { - this.getRecords = require(`${__dirname}/collectors/${this.id}`).getRecords; + this.collector = require(`${__dirname}/collectors/${this.id}`); + this.getRecords = this.collector.getRecords; } public get id() { return this.config.id; } + public sendPulse(payload: T) { + if (!this.collector.putRecord) { + throw Error(`this.collector.putRecords not implemented for ${this.id}.`); + } + this.collector.putRecord(payload); + } + public instructions$() { return this.config.instructions$.asObservable(); } diff --git a/src/core/server/pulse/collectors/default.ts b/src/core/server/pulse/collectors/default.ts index e692558630d32..8032f118cb8e3 100644 --- a/src/core/server/pulse/collectors/default.ts +++ b/src/core/server/pulse/collectors/default.ts @@ -17,10 +17,13 @@ * under the License. */ -import { IClusterClient } from '../../elasticsearch'; +export async function getRecords() { + return []; +} +// import { IClusterClient } from '../../elasticsearch'; -export async function getRecords(elasticsearch: IClusterClient) { - const pingResult = await elasticsearch.callAsInternalUser('ping'); +// export async function getRecords(elasticsearch: IClusterClient) { +// const pingResult = await elasticsearch.callAsInternalUser('ping'); - return [{ ping_received: pingResult }]; -} +// return [{ ping_received: pingResult }]; +// } diff --git a/src/core/server/pulse/collectors/errors.ts b/src/core/server/pulse/collectors/errors.ts index e1463ac336466..d4bc1442e005b 100644 --- a/src/core/server/pulse/collectors/errors.ts +++ b/src/core/server/pulse/collectors/errors.ts @@ -17,22 +17,21 @@ * under the License. */ -import { IClusterClient } from '../../elasticsearch'; -/* queries the local error index for the current deployment - (is the info and config provided by using callAsInternalUser? It doesn't look like it) - returns array of documents in the local index - these documents may or may not have fixed-version annotations, - but will have an error recorded along with a signature for the error -*/ +// getRecords should return an array of one or more telemetry +// records for the errors channel. Each record will ultimately +// be stored as an individual document in the errors channel index +// by the service -export async function getRecords(elasticsearch: IClusterClient) { - // this is where I start getting confused. `callAsInternalUser` docs only gives the following info: - /* - Calls specified endpoint with provided clientParams on behalf of the Kibana internal user. See {@link APICaller}. - @param endpoint — String descriptor of the endpoint e.g. cluster.getSettings or ping. --> what does 'ping' give us? - @param clientParams — A dictionary of parameters that will be passed directly to the Elasticsearch JS client. - @param options — Options that affect the way we call the API and process the result. - */ - const errorsRecorded = await elasticsearch.callAsInternalUser('ping'); - return [{ errors_recorded: errorsRecorded }]; +export interface Payload { + errorId: string; +} + +const payloads: Payload[] = []; + +export async function putRecord(payload: Payload) { + payloads.push(payload); +} + +export async function getRecords() { + return payloads; } diff --git a/src/core/server/pulse/collectors/notifications.ts b/src/core/server/pulse/collectors/notifications.ts new file mode 100644 index 0000000000000..551dd94fba2fa --- /dev/null +++ b/src/core/server/pulse/collectors/notifications.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return []; +} diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index e0ee333024d78..6599f7739a259 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -19,14 +19,15 @@ import { readdirSync } from 'fs'; import { resolve, parse } from 'path'; + import { Subject } from 'rxjs'; // @ts-ignore import fetch from 'node-fetch'; - import { CoreContext } from '../core_context'; import { Logger } from '../logging'; -import { ElasticsearchServiceSetup, IClusterClient } from '../elasticsearch'; +import { ElasticsearchServiceSetup } from '../elasticsearch'; import { PulseChannel, PulseInstruction } from './channel'; +import { sendPulse, Fetcher } from './send_pulse'; export interface InternalPulseService { getChannel: (id: string) => PulseChannel; @@ -36,6 +37,10 @@ export interface PulseSetupDeps { elasticsearch: ElasticsearchServiceSetup; } +export type PulseServiceSetup = InternalPulseService; +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface PulseServiceStart {} + interface ChannelResponse { id: string; instructions: PulseInstruction[]; @@ -53,18 +58,19 @@ const channelNames = readdirSync(resolve(__dirname, 'collectors')) }); export class PulseService { + private retriableErrors = 0; private readonly log: Logger; private readonly channels: Map; - private readonly instructions: Map> = new Map(); - private readonly subscriptions: Set = new Set(); - private elasticsearch?: IClusterClient; + private readonly instructions$: Map> = new Map(); + // private readonly subscriptions: Set = new Set(); + // private elasticsearch?: IClusterClient; constructor(coreContext: CoreContext) { this.log = coreContext.logger.get('pulse-service'); this.channels = new Map( channelNames.map((id): [string, PulseChannel] => { const instructions$ = new Subject(); - this.instructions.set(id, instructions$); + this.instructions$.set(id, instructions$); const channel = new PulseChannel({ id, instructions$ }); return [channel.id, channel]; }) @@ -72,9 +78,22 @@ export class PulseService { } public async setup(deps: PulseSetupDeps): Promise { - this.log.info('Setting up service'); + this.log.debug('Setting up pulse service'); - this.elasticsearch = deps.elasticsearch.createClient('pulse-service'); + // poll for instructions every second for this deployment + setInterval(() => { + // eslint-disable-next-line no-console + this.loadInstructions().catch(err => console.error(err.stack)); + }, 1000); + + // eslint-disable-next-line no-console + console.log('Will attempt first telemetry collection in 5 seconds...'); + setTimeout(() => { + setInterval(() => { + // eslint-disable-next-line no-console + this.sendTelemetry().catch(err => console.error(err.stack)); + }, 5000); + }, 5000); return { getChannel: (id: string) => { @@ -87,35 +106,6 @@ export class PulseService { }; } - public async start() { - this.log.info('Starting service'); - if (!this.elasticsearch) { - throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`); - } - const elasticsearch = this.elasticsearch; - - // poll for instructions every second for this deployment - const loadInstructionSubcription = setInterval(() => { - this.loadInstructions().catch(err => this.log.error(err.stack)); - }, 1000); - this.subscriptions.add(loadInstructionSubcription); - - this.log.debug('Will attempt first telemetry collection in 5 seconds...'); - const sendTelemetrySubcription = setInterval(() => { - this.log.debug('sending telemetry data from pulse'); - this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack)); - }, 5000); - this.subscriptions.add(sendTelemetrySubcription); - } - - public async stop() { - this.subscriptions.forEach(subscription => { - clearInterval(subscription); - this.subscriptions.delete(subscription); - }); - } - - private retriableErrors = 0; private async loadInstructions() { const url = 'http://localhost:5601/api/pulse_poc/instructions/123'; let response: any; @@ -141,7 +131,7 @@ export class PulseService { const responseBody: InstructionsResponse = await response.json(); responseBody.channels.forEach(channel => { - const instructions$ = this.instructions.get(channel.id); + const instructions$ = this.instructions$.get(channel.id); if (!instructions$) { throw new Error( `Channel (${channel.id}) from service has no corresponding channel handler in client` @@ -155,7 +145,8 @@ export class PulseService { private handleRetriableError() { this.retriableErrors++; if (this.retriableErrors === 1) { - this.log.warn( + // eslint-disable-next-line no-console + console.warn( 'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...' ); } else if (this.retriableErrors > 120) { @@ -163,24 +154,11 @@ export class PulseService { } } - private async sendTelemetry(elasticsearch: IClusterClient) { - this.log.debug('Sending telemetry'); - const url = 'http://localhost:5601/api/pulse_poc/intake/123'; - - const channels = []; - for (const channel of this.channels.values()) { - const records = await channel.getRecords(elasticsearch); - this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`); - channels.push({ - records, - channel_id: channel.id, - }); - } - - let response: any; - try { - response = await fetch(url, { + private async sendTelemetry() { + const fetcher: Fetcher = async (url, channels) => { + return await fetch(url, { method: 'post', + headers: { 'content-type': 'application/json', 'kbn-xsrf': 'true', @@ -189,21 +167,81 @@ export class PulseService { channels, }), }); - } catch (err) { - if (!err.message.includes('ECONNREFUSED')) { - throw err; - } - // the instructions polling should handle logging for this case, yay for POCs - return; - } - if (response.status === 503) { - // the instructions polling should handle logging for this case, yay for POCs - return; - } + }; - if (response.status !== 200) { - const responseBody = await response.text(); - throw new Error(`${response.status}: ${responseBody}`); - } + return await sendPulse(this.channels, fetcher); } } + +// public async start() { +// this.log.info('Starting service'); +// if (!this.elasticsearch) { +// throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`); +// } +// const elasticsearch = this.elasticsearch; + +// // poll for instructions every second for this deployment +// const loadInstructionSubcription = setInterval(() => { +// this.loadInstructions().catch(err => this.log.error(err.stack)); +// }, 1000); +// this.subscriptions.add(loadInstructionSubcription); + +// this.log.debug('Will attempt first telemetry collection in 5 seconds...'); +// const sendTelemetrySubcription = setInterval(() => { +// this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack)); +// }, 5000); +// this.subscriptions.add(sendTelemetrySubcription); +// } + +// public async stop() { +// this.subscriptions.forEach(subscription => { +// clearInterval(subscription); +// this.subscriptions.delete(subscription); +// }); +// } + +// private retriableErrors = 0; + +// private async sendTelemetry(elasticsearch: IClusterClient) { +// this.log.debug('Sending telemetry'); +// const url = 'http://localhost:5601/api/pulse_poc/intake/123'; + +// const channels = []; +// for (const channel of this.channels.values()) { +// const records = await channel.getRecords(elasticsearch); +// this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`); +// channels.push({ +// records, +// channel_id: channel.id, +// }); +// } + +// let response: any; +// try { +// response = await fetch(url, { +// method: 'post', +// headers: { +// 'content-type': 'application/json', +// 'kbn-xsrf': 'true', +// }, +// body: JSON.stringify({ +// channels, +// }), +// }); +// } catch (err) { +// if (!err.message.includes('ECONNREFUSED')) { +// throw err; +// } +// // the instructions polling should handle logging for this case, yay for POCs +// return; +// } +// if (response.status === 503) { +// // the instructions polling should handle logging for this case, yay for POCs +// return; +// } + +// if (response.status !== 200) { +// const responseBody = await response.text(); +// throw new Error(`${response.status}: ${responseBody}`); +// } +// } diff --git a/src/core/server/pulse/send_pulse.ts b/src/core/server/pulse/send_pulse.ts new file mode 100644 index 0000000000000..2b58ef13d903d --- /dev/null +++ b/src/core/server/pulse/send_pulse.ts @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { PulseChannel } from './channel'; + +export const CLUSTER_UUID = '123'; +export const BASE_URL = 'http://localhost:5601/api/pulse_poc'; +export interface ChannelsToSend { + records: any; + channel_id: string; +} + +export type Fetcher = ( + url: string, + channelsToSend: ChannelsToSend[] +) => Promise; + +export async function sendPulse( + channels: Map, + fetcher: Fetcher +) { + const url = `${BASE_URL}/intake/${CLUSTER_UUID}`; + + const channelsToSend = []; + for (const channel of channels.values()) { + const records = await channel.getRecords(); + channelsToSend.push({ + records, + channel_id: channel.id, + }); + } + + let response: any; + try { + response = await fetcher(url, channelsToSend); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + // the instructions polling should handle logging for this case, yay for POCs + return; + } + if (response.status === 503) { + // the instructions polling should handle logging for this case, yay for POCs + return; + } + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } +} diff --git a/src/core/server/server.ts b/src/core/server/server.ts index e32f34adcdbea..14a4bbda0b37c 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - import { filter, take } from 'rxjs/operators'; import { Type } from '@kbn/config-schema'; @@ -241,7 +240,7 @@ export class Server { await this.http.start(); // Starting it after http because it relies on the the HTTP service - await this.pulse.start(); + // await this.pulse.start(); return coreStart; } @@ -249,7 +248,7 @@ export class Server { public async stop() { this.log.debug('stopping server'); - await this.pulse.stop(); + // await this.pulse.stop(); await this.legacy.stop(); await this.plugins.stop(); await this.savedObjects.stop(); diff --git a/src/plugins/newsfeed/public/plugin.tsx b/src/plugins/newsfeed/public/plugin.tsx index 5ea5e5b324717..e43b7e59c2e99 100644 --- a/src/plugins/newsfeed/public/plugin.tsx +++ b/src/plugins/newsfeed/public/plugin.tsx @@ -38,7 +38,16 @@ export class NewsfeedPublicPlugin implements Plugin { this.kibanaVersion = initializerContext.env.packageInfo.version; } - public setup(core: CoreSetup): Setup {} + public setup(core: CoreSetup): Setup { + const instructions$ = core.pulse.getChannel('errors').instructions$(); + core.pulse.getChannel('errors').sendPulse({ + errorId: 'new_error', + }); + + instructions$.subscribe(instruction => { + // console.log('instruction::', instruction); + }); + } public start(core: CoreStart): Start { const api$ = this.fetchNewsfeed(core); diff --git a/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts new file mode 100644 index 0000000000000..f7d98e949ceb7 --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { IScopedClusterClient } from 'src/core/server'; + +export async function check(es: IScopedClusterClient, deploymentId: string) { + // TODO: modify the search query for full text search + const response = await es.callAsInternalUser('search', { + index: 'pulse-poc-raw-errors', + size: 100, + allow_no_indices: true, + ignore_unavailable: true, + body: { + query: { + term: { + deployment_id: { + value: deploymentId, + }, + }, + }, + }, + }); + + if (response.hits.total.value === 0) { + // we haven't recorded any errors so there aren't instructions to send back + return undefined; + } else { + /* + we have recorded errors and need to send instructions back that will help + plugin owners resolve the errors + we'll need to parse the stack trace and get information from it regarding: + plugin name + error type (fatal, warning etc) + where it was first encountered + etc + TODO: see the logger for more info + */ + return [ + { + owner: 'core', + id: 'pulse_error', + value: { + error_id: '1', + fix_version: '7.7.0', + }, + }, + { + owner: 'core', + id: 'pulse_error', + value: { + error_id: '2', + fix_version: null, + }, + }, + ]; + } +} diff --git a/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts b/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts new file mode 100644 index 0000000000000..d5514124eef08 --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export function check() { + return { + owner: 'ml_plugin', + id: 'pulse_telemetry', + value: {}, + }; +} diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 861779642ed91..8c3d5274d917c 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -116,9 +116,11 @@ export class PulsePocPlugin { async (context, request, response) => { const { deploymentId } = request.params; const es = context.core.elasticsearch.adminClient; - const allChannelCheckResults = this.channels.map(async channel => { - const channelChecks = channel.checks.map(check => check.check(es, deploymentId)); + // const indexName = `pulse-poc-raw-${channel.id}`; + const channelChecks = channel.checks.map(check => + check.check(es, deploymentId) + ); const checkResults = await Promise.all(channelChecks); const instructions = checkResults.filter((value: any) => Boolean(value)); return {