Skip to content

Commit

Permalink
[RAC] Rule monitoring: Event Log for Rule Registry (elastic#98353)
Browse files Browse the repository at this point in the history
**Needed for:** rule execution log for Security elastic#94143
**Related to:**

- alerts-as-data: elastic#93728, elastic#93729, elastic#93730
- RFC for index naming elastic#98912

## Summary

This PR adds a mechanism for writing to / reading from / bootstrapping indices for RAC project into the `rule_registry` plugin. Particularly, indices for alerts-as-data and rule execution events. This implementation is similar to existing implementations like `event_log` plugin (see elastic#98353 (comment) for historical perspective), but we're going to converge all of them into 1 or 2 implementations. At least we should have a single one in `rule_registry` itself.

In this PR I tried to incorporate most of the feedback received in the RFC (elastic#98912), but if you notice I missed/forgot something, please let me know in the comments.

Done in this PR:

- [x] Schema-agnostic APIs for working with Elasticsearch.
- [x] Schema-aware log definition and bootstrapping API (creating hierarchical logs).
- [x] Schema-aware write API (logging events).
- [x] Schema-aware read API (searching logs, filtering, sorting, pagination, aggregation).
- [x] Support for Kibana spaces, space-aware index bootstrapping (either at rule creation or rule execution time).

As for reviewing this PR, perhaps it might be easier to start with:

- checking description of elastic#98912
- checking usage examples https://github.com/elastic/kibana/pull/98353/files#diff-c049ff2198cc69bd50a69e92d29e88da7e10b9a152bdaceaf3d41826e712c12b
- checking public api https://github.com/elastic/kibana/pull/98353/files#diff-8e9ef0dbcbc60b1861d492a03865b2ae76a56ec38ada61898c991d3a74bd6268

## Next steps

Next steps towards rule execution log in Security (elastic#94143):

- define actual schema for rule execution events
- inject instance of rule execution log into Security rule executors and route handlers
- implement actual execution logging in rule executors
- update route handlers to start fetching execution events and metrics from the log instead of custom saved objects

Next steps in the context of RAC and unified implementation:

- converge this implementation with `RuleDataService` implementation
  - implement robust index bootstrapping
  - reconsider using FieldMap as a generic type parameter
  - implement validation for documents being indexed
- cover the final implementation with tests
- write comprehensive docs: update plugin README, add JSDoc comments to all public interfaces
  • Loading branch information
banderror authored May 27, 2021
1 parent c298efe commit 7fd6539
Show file tree
Hide file tree
Showing 41 changed files with 1,809 additions and 40 deletions.
19 changes: 9 additions & 10 deletions x-pack/plugins/apm/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,20 @@ export class APMPlugin

registerFeaturesUsage({ licensingPlugin: plugins.licensing });

const { ruleDataService } = plugins.ruleRegistry;
const getCoreStart = () =>
core.getStartServices().then(([coreStart]) => coreStart);

const ready = once(async () => {
const componentTemplateName = plugins.ruleRegistry.getFullAssetName(
const componentTemplateName = ruleDataService.getFullAssetName(
'apm-mappings'
);

if (!plugins.ruleRegistry.isWriteEnabled()) {
if (!ruleDataService.isWriteEnabled()) {
return;
}

await plugins.ruleRegistry.createOrUpdateComponentTemplate({
await ruleDataService.createOrUpdateComponentTemplate({
name: componentTemplateName,
body: {
template: {
Expand All @@ -167,16 +168,14 @@ export class APMPlugin
},
});

await plugins.ruleRegistry.createOrUpdateIndexTemplate({
name: plugins.ruleRegistry.getFullAssetName('apm-index-template'),
await ruleDataService.createOrUpdateIndexTemplate({
name: ruleDataService.getFullAssetName('apm-index-template'),
body: {
index_patterns: [
plugins.ruleRegistry.getFullAssetName('observability-apm*'),
ruleDataService.getFullAssetName('observability-apm*'),
],
composed_of: [
plugins.ruleRegistry.getFullAssetName(
TECHNICAL_COMPONENT_TEMPLATE_NAME
),
ruleDataService.getFullAssetName(TECHNICAL_COMPONENT_TEMPLATE_NAME),
componentTemplateName,
],
},
Expand All @@ -188,7 +187,7 @@ export class APMPlugin
});

const ruleDataClient = new RuleDataClient({
alias: plugins.ruleRegistry.getFullAssetName('observability-apm'),
alias: ruleDataService.getFullAssetName('observability-apm'),
getClusterClient: async () => {
const coreStart = await getCoreStart();
return coreStart.elasticsearch.client.asInternalUser;
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/observability/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class ObservabilityPlugin implements Plugin<ObservabilityPluginSetup> {
return coreStart.elasticsearch.client.asInternalUser;
},
ready: () => Promise.resolve(),
alias: plugins.ruleRegistry.getFullAssetName(),
alias: plugins.ruleRegistry.ruleDataService.getFullAssetName(),
});

registerRoutes({
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/rule_registry/kibana.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
],
"requiredPlugins": [
"alerting",
"data",
"spaces",
"triggersActionsUi"
],
"server": true
Expand Down
20 changes: 20 additions & 0 deletions x-pack/plugins/rule_registry/server/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { schema, TypeOf } from '@kbn/config-schema';

export const config = {
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
write: schema.object({
enabled: schema.boolean({ defaultValue: true }),
}),
index: schema.string({ defaultValue: '.alerts' }),
}),
};

export type RuleRegistryPluginConfig = TypeOf<typeof config.schema>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export * from './index_bootstrapper';
export * from './index_management_gateway';
export * from './index_reader';
export * from './index_writer';
export * from './resources/ilm_policy';
export * from './resources/index_mappings';
export * from './resources/index_names';
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger } from 'src/core/server';

import { IndexNames } from './resources/index_names';
import { IndexMappings } from './resources/index_mappings';
import { createIndexTemplate } from './resources/index_template';
import { IlmPolicy, defaultIlmPolicy } from './resources/ilm_policy';
import { IIndexManagementGateway } from './index_management_gateway';

interface ConstructorParams {
gateway: IIndexManagementGateway;
logger: Logger;
}

export interface IndexSpecification {
indexNames: IndexNames;
indexMappings: IndexMappings;
ilmPolicy?: IlmPolicy;
}

export type IIndexBootstrapper = PublicMethodsOf<IndexBootstrapper>;

// TODO: Converge with the logic of .siem-signals index bootstrapping
// x-pack/plugins/security_solution/server/lib/detection_engine/routes/index/create_index_route.ts

// TODO: Handle race conditions and potential errors between multiple instances of Kibana
// trying to bootstrap the same index. Possible options:
// - robust idempotent logic with error handling
// - leveraging task_manager to make sure bootstrapping is run only once at a time
// - using some sort of distributed lock
// Maybe we can check how Saved Objects service bootstraps .kibana index

export class IndexBootstrapper {
private readonly gateway: IIndexManagementGateway;
private readonly logger: Logger;

constructor(params: ConstructorParams) {
this.gateway = params.gateway;
this.logger = params.logger.get('IndexBootstrapper');
}

public async run(indexSpec: IndexSpecification): Promise<boolean> {
this.logger.debug('bootstrapping elasticsearch resources starting');

try {
const { indexNames, indexMappings, ilmPolicy } = indexSpec;
await this.createIlmPolicyIfNotExists(indexNames, ilmPolicy);
await this.createIndexTemplateIfNotExists(indexNames, indexMappings);
await this.createInitialIndexIfNotExists(indexNames);
} catch (err) {
this.logger.error(`error bootstrapping elasticsearch resources: ${err.message}`);
return false;
}

this.logger.debug('bootstrapping elasticsearch resources complete');
return true;
}

private async createIlmPolicyIfNotExists(names: IndexNames, policy?: IlmPolicy): Promise<void> {
const { indexIlmPolicyName } = names;

const exists = await this.gateway.doesIlmPolicyExist(indexIlmPolicyName);
if (!exists) {
const ilmPolicy = policy ?? defaultIlmPolicy;
await this.gateway.createIlmPolicy(indexIlmPolicyName, ilmPolicy);
}
}

private async createIndexTemplateIfNotExists(
names: IndexNames,
mappings: IndexMappings
): Promise<void> {
const { indexTemplateName } = names;

const templateVersion = 1; // TODO: get from EventSchema definition
const template = createIndexTemplate(names, mappings, templateVersion);

const exists = await this.gateway.doesIndexTemplateExist(indexTemplateName);
if (!exists) {
await this.gateway.createIndexTemplate(indexTemplateName, template);
} else {
await this.gateway.updateIndexTemplate(indexTemplateName, template);
}
}

private async createInitialIndexIfNotExists(names: IndexNames): Promise<void> {
const { indexAliasName, indexInitialName } = names;

const exists = await this.gateway.doesAliasExist(indexAliasName);
if (!exists) {
await this.gateway.createIndex(indexInitialName, {
aliases: {
[indexAliasName]: {
is_write_index: true,
},
},
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { ElasticsearchClient, Logger } from 'src/core/server';
import { IlmPolicy } from './resources/ilm_policy';
import { IndexTemplate } from './resources/index_template';

interface ConstructorParams {
elasticsearch: Promise<ElasticsearchClient>;
logger: Logger;
}

export type IIndexManagementGateway = PublicMethodsOf<IndexManagementGateway>;

export class IndexManagementGateway {
private readonly elasticsearch: Promise<ElasticsearchClient>;
private readonly logger: Logger;

constructor(params: ConstructorParams) {
this.elasticsearch = params.elasticsearch;
this.logger = params.logger.get('IndexManagementGateway');
}

public async doesIlmPolicyExist(policyName: string): Promise<boolean> {
this.logger.debug(`Checking if ILM policy exists; name="${policyName}"`);

try {
const es = await this.elasticsearch;
await es.transport.request({
method: 'GET',
path: `/_ilm/policy/${policyName}`,
});
} catch (e) {
if (e.statusCode === 404) return false;
throw new Error(`Error checking existence of ILM policy: ${e.message}`);
}
return true;
}

public async createIlmPolicy(policyName: string, policy: IlmPolicy): Promise<void> {
this.logger.debug(`Creating ILM policy; name="${policyName}"`);

try {
const es = await this.elasticsearch;
await es.transport.request({
method: 'PUT',
path: `/_ilm/policy/${policyName}`,
body: policy,
});
} catch (e) {
throw new Error(`Error creating ILM policy: ${e.message}`);
}
}

public async doesIndexTemplateExist(templateName: string): Promise<boolean> {
this.logger.debug(`Checking if index template exists; name="${templateName}"`);

try {
const es = await this.elasticsearch;
const { body } = await es.indices.existsTemplate({ name: templateName });
return body as boolean;
} catch (e) {
throw new Error(`Error checking existence of index template: ${e.message}`);
}
}

public async createIndexTemplate(templateName: string, template: IndexTemplate): Promise<void> {
this.logger.debug(`Creating index template; name="${templateName}"`);

try {
const es = await this.elasticsearch;
await es.indices.putTemplate({ create: true, name: templateName, body: template });
} catch (e) {
// The error message doesn't have a type attribute we can look to guarantee it's due
// to the template already existing (only long message) so we'll check ourselves to see
// if the template now exists. This scenario would happen if you startup multiple Kibana
// instances at the same time.
const existsNow = await this.doesIndexTemplateExist(templateName);
if (!existsNow) {
const error = new Error(`Error creating index template: ${e.message}`);
Object.assign(error, { wrapped: e });
throw error;
}
}
}

public async updateIndexTemplate(templateName: string, template: IndexTemplate): Promise<void> {
this.logger.debug(`Updating index template; name="${templateName}"`);

try {
const { settings, ...templateWithoutSettings } = template;

const es = await this.elasticsearch;
await es.indices.putTemplate({
create: false,
name: templateName,
body: templateWithoutSettings,
});
} catch (e) {
throw new Error(`Error updating index template: ${e.message}`);
}
}

public async doesAliasExist(aliasName: string): Promise<boolean> {
this.logger.debug(`Checking if index alias exists; name="${aliasName}"`);

try {
const es = await this.elasticsearch;
const { body } = await es.indices.existsAlias({ name: aliasName });
return body as boolean;
} catch (e) {
throw new Error(`Error checking existence of initial index: ${e.message}`);
}
}

public async createIndex(indexName: string, body: Record<string, unknown> = {}): Promise<void> {
this.logger.debug(`Creating index; name="${indexName}"`);
this.logger.debug(JSON.stringify(body, null, 2));

try {
const es = await this.elasticsearch;
await es.indices.create({
index: indexName,
body,
});
} catch (e) {
if (e.body?.error?.type !== 'resource_already_exists_exception') {
this.logger.error(e);
this.logger.error(JSON.stringify(e.meta, null, 2));
throw new Error(`Error creating initial index: ${e.message}`);
}
}
}
}
Loading

0 comments on commit 7fd6539

Please sign in to comment.