Skip to content

Commit

Permalink
[Streams] Introducing the new Streams plugin (elastic#198713)
Browse files Browse the repository at this point in the history
## Summary

This PR introduces the new experimental "Streams" plugin into the Kibana
project. The Streams project aims to simplify workflows around dealing
with messy logs in Elasticsearch. Our current offering is either
extremely opinionated with integrations or leaves the user alone with
the high flexibility of Elasticsearch concepts like index templates,
component templates and so on, which make it challenging to configure
everything correctly for good performance and controlling search speed
and cost.

### Scope of PR
- Provides an API for the user to "enable" the streams framework which
creates the "root" entity `logs` with all the backing Elasticsearch
assets
- Provides an API for the user to "fork" a stream
- Provides an API for the user to "read" a stream and all of it's
Elasticsearch assets.
- Provides an API for the user to upsert a stream (and implicitly child
streams that are mentioned)
- Part of this API is placing grok and disscect processing steps as well
as fields to the mapping
- Implements the Stream Naming Schema (SNS) which uses dots to express
the index patterns and stream IDs. Example: `logs.nginx.errors`
- The APIs will fully manage the `index_template`, `component_template`,
and `ingest_pipelines`.

### Out of scope
- Integration tests (coming in a follow-up)

### Reviewer Notes
- I haven't implemented tests beyond a unit test for converting the
filter conditions to Painless. I wanted to get a PR up so we can start
iterating on the interface and functionality before we invest in
testing.
- You might need to add `server.versioned.versionResolution: oldest` to
your `config/kibana.dev.yaml` to play with the requests below in the
Kibana "Dev console".

### Example API Calls

Enable the root stream (and set the mapping for the internal `.streams`
index)
```
POST kbn:/api/streams/_enable
```

Read the root entity "logs"
```
GET kbn:/api/streams/logs
```

Fork the "root" entity "logs" and create "logs.nginx" based on a
condition
```
POST kbn:/api/streams/logs/_fork
{
  "stream": {
    "id": "logs.nginx",
    "children": [],
    "processing": [],
    "fields": [],
  },
  "condition": {
    "field": "log.logger",
    "operator": "eq",
    "value": "nginx_proxy"
  }
}
```

Fork the entity "logs.nginx" and create "logs.nginx.errors" based on a
condition
```
POST kbn:/api/streams/logs.nginx/_fork
{
  "stream": {
    "id": "logs.nginx.error",
    "children": [],
   "processing": [],
   "fields": [],
  },
  "condition": {
    "or": [
      { "field": "log.level", "operator": "eq", "value": "error" },
      { "field": "log.level", "operator": "eq", "value": "ERROR" }
    ]
  }
}
```

Set some processing on a stream and map the generated field
```
PUT kbn:/api/streams/logs.nginx
{
    "children": [],
    "processing": [
       { "config": { "type": "grok", "patterns": ["^%{IP:ip} – –"], "field": "message" } }
    ],
    "fields": [
       { "name": "ip", "type": "ip" }
    ],
  }
}
```

Field definitions are checked for both descendants and ancestors for
incompatibilities to ensure they stay additive.

If children are defined in the `PUT /api/streams/<name>` API,
sub-streams are created implicitly. If a stream is `PUT`, it's added to
the parent as well with a condition that is never true (can be edited
subsequently).

`POST /api/streams/_resync` can be used to re-sync all streams from
their meta data in case the Elasticsearch objects got messed up by some
external change - not sure whether we want to keep that.


Follow-ups

* API integration tests 
* Check read permissions on data streams to determine whether a user is
allowed to read certain streams

---------

Co-authored-by: Joe Reuter <[email protected]>
Co-authored-by: kibanamachine <[email protected]>
  • Loading branch information
3 people authored Nov 13, 2024
1 parent fb71f4e commit b86dc81
Show file tree
Hide file tree
Showing 62 changed files with 2,419 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ x-pack/plugins/snapshot_restore @elastic/kibana-management
x-pack/plugins/spaces @elastic/kibana-security
x-pack/plugins/stack_alerts @elastic/response-ops
x-pack/plugins/stack_connectors @elastic/response-ops
x-pack/plugins/streams @simianhacker @flash1293 @dgieselaar
x-pack/plugins/task_manager @elastic/response-ops
x-pack/plugins/telemetry_collection_xpack @elastic/kibana-core
x-pack/plugins/threat_intelligence @elastic/security-threat-hunting-investigations
Expand Down
4 changes: 4 additions & 0 deletions docs/developer/plugin-list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,10 @@ routes, etc.
|The stack_connectors plugin provides connector types shipped with Kibana, built on top of the framework provided in the actions plugin.
|{kib-repo}blob/{branch}/x-pack/plugins/streams/README.md[streams]
|This plugin provides an interface to manage streams
|{kib-repo}blob/{branch}/x-pack/plugins/observability_solution/synthetics/README.md[synthetics]
|The purpose of this plugin is to provide users of Heartbeat more visibility of what's happening
in their infrastructure.
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@
"@kbn/status-plugin-a-plugin": "link:test/server_integration/plugins/status_plugin_a",
"@kbn/status-plugin-b-plugin": "link:test/server_integration/plugins/status_plugin_b",
"@kbn/std": "link:packages/kbn-std",
"@kbn/streams-plugin": "link:x-pack/plugins/streams",
"@kbn/synthetics-plugin": "link:x-pack/plugins/observability_solution/synthetics",
"@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location",
"@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture",
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-optimizer/limits.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pageLoadAssetSize:
spaces: 57868
stackAlerts: 58316
stackConnectors: 67227
streams: 16742
synthetics: 55971
telemetry: 51957
telemetryManagementSection: 38586
Expand Down
2 changes: 2 additions & 0 deletions tsconfig.base.json
Original file line number Diff line number Diff line change
Expand Up @@ -1828,6 +1828,8 @@
"@kbn/stdio-dev-helpers/*": ["packages/kbn-stdio-dev-helpers/*"],
"@kbn/storybook": ["packages/kbn-storybook"],
"@kbn/storybook/*": ["packages/kbn-storybook/*"],
"@kbn/streams-plugin": ["x-pack/plugins/streams"],
"@kbn/streams-plugin/*": ["x-pack/plugins/streams/*"],
"@kbn/synthetics-e2e": ["x-pack/plugins/observability_solution/synthetics/e2e"],
"@kbn/synthetics-e2e/*": ["x-pack/plugins/observability_solution/synthetics/e2e/*"],
"@kbn/synthetics-plugin": ["x-pack/plugins/observability_solution/synthetics"],
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/streams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Streams Plugin

This plugin provides an interface to manage streams
30 changes: 30 additions & 0 deletions x-pack/plugins/streams/common/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { schema, TypeOf } from '@kbn/config-schema';

export const configSchema = schema.object({});

export type StreamsConfig = TypeOf<typeof configSchema>;

/**
* The following map is passed to the server plugin setup under the
* exposeToBrowser: option, and controls which of the above config
* keys are allow-listed to be available in the browser config.
*
* NOTE: anything exposed here will be visible in the UI dev tools,
* and therefore MUST NOT be anything that is sensitive information!
*/
export const exposeToBrowserConfig = {} as const;

type ValidKeys = keyof {
[K in keyof typeof exposeToBrowserConfig as (typeof exposeToBrowserConfig)[K] extends true
? K
: never]: true;
};

export type StreamsPublicConfig = Pick<StreamsConfig, ValidKeys>;
9 changes: 9 additions & 0 deletions x-pack/plugins/streams/common/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export const ASSET_VERSION = 1;
export const STREAMS_INDEX = '.kibana_streams';
91 changes: 91 additions & 0 deletions x-pack/plugins/streams/common/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { z } from '@kbn/zod';

const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]);

export const filterConditionSchema = z.object({
field: z.string(),
operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']),
value: stringOrNumberOrBoolean,
});

export type FilterCondition = z.infer<typeof filterConditionSchema>;

export interface AndCondition {
and: Condition[];
}

export interface RerouteOrCondition {
or: Condition[];
}

export type Condition = FilterCondition | AndCondition | RerouteOrCondition | undefined;

export const conditionSchema: z.ZodType<Condition> = z.lazy(() =>
z.union([
filterConditionSchema,
z.object({ and: z.array(conditionSchema) }),
z.object({ or: z.array(conditionSchema) }),
])
);

export const grokProcessingDefinitionSchema = z.object({
type: z.literal('grok'),
field: z.string(),
patterns: z.array(z.string()),
pattern_definitions: z.optional(z.record(z.string())),
});

export const dissectProcessingDefinitionSchema = z.object({
type: z.literal('dissect'),
field: z.string(),
pattern: z.string(),
});

export const processingDefinitionSchema = z.object({
condition: z.optional(conditionSchema),
config: z.discriminatedUnion('type', [
grokProcessingDefinitionSchema,
dissectProcessingDefinitionSchema,
]),
});

export type ProcessingDefinition = z.infer<typeof processingDefinitionSchema>;

export const fieldDefinitionSchema = z.object({
name: z.string(),
type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']),
});

export type FieldDefinition = z.infer<typeof fieldDefinitionSchema>;

export const streamWithoutIdDefinitonSchema = z.object({
processing: z.array(processingDefinitionSchema).default([]),
fields: z.array(fieldDefinitionSchema).default([]),
children: z
.array(
z.object({
id: z.string(),
condition: conditionSchema,
})
)
.default([]),
});

export type StreamWithoutIdDefinition = z.infer<typeof streamDefinitonSchema>;

export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({
id: z.string(),
});

export type StreamDefinition = z.infer<typeof streamDefinitonSchema>;

export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true });

export type StreamWithoutChildrenDefinition = z.infer<typeof streamDefinitonWithoutChildrenSchema>;
15 changes: 15 additions & 0 deletions x-pack/plugins/streams/jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

module.exports = {
preset: '@kbn/test',
rootDir: '../../..',
roots: ['<rootDir>/x-pack/plugins/streams'],
coverageDirectory: '<rootDir>/target/kibana-coverage/jest/x-pack/plugins/streams',
coverageReporters: ['text', 'html'],
collectCoverageFrom: ['<rootDir>/x-pack/plugins/streams/{common,public,server}/**/*.{js,ts,tsx}'],
};
28 changes: 28 additions & 0 deletions x-pack/plugins/streams/kibana.jsonc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type": "plugin",
"id": "@kbn/streams-plugin",
"owner": "@simianhacker @flash1293 @dgieselaar",
"description": "A manager for Streams",
"group": "observability",
"visibility": "private",
"plugin": {
"id": "streams",
"configPath": ["xpack", "streams"],
"browser": true,
"server": true,
"requiredPlugins": [
"data",
"security",
"encryptedSavedObjects",
"usageCollection",
"licensing",
"taskManager"
],
"optionalPlugins": [
"cloud",
"serverless"
],
"requiredBundles": [
]
}
}
13 changes: 13 additions & 0 deletions x-pack/plugins/streams/public/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { PluginInitializer, PluginInitializerContext } from '@kbn/core/public';
import { Plugin } from './plugin';

export const plugin: PluginInitializer<{}, {}> = (context: PluginInitializerContext) => {
return new Plugin(context);
};
32 changes: 32 additions & 0 deletions x-pack/plugins/streams/public/plugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { CoreSetup, CoreStart, PluginInitializerContext } from '@kbn/core/public';
import { Logger } from '@kbn/logging';

import type { StreamsPublicConfig } from '../common/config';
import { StreamsPluginClass, StreamsPluginSetup, StreamsPluginStart } from './types';

export class Plugin implements StreamsPluginClass {
public config: StreamsPublicConfig;
public logger: Logger;

constructor(context: PluginInitializerContext<{}>) {
this.config = context.config.get();
this.logger = context.logger.get();
}

setup(core: CoreSetup<StreamsPluginStart>, pluginSetup: StreamsPluginSetup) {
return {};
}

start(core: CoreStart) {
return {};
}

stop() {}
}
16 changes: 16 additions & 0 deletions x-pack/plugins/streams/public/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Plugin as PluginClass } from '@kbn/core/public';

// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginSetup {}

// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface StreamsPluginStart {}

export type StreamsPluginClass = PluginClass<{}, {}, StreamsPluginSetup, StreamsPluginStart>;
19 changes: 19 additions & 0 deletions x-pack/plugins/streams/server/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { PluginInitializerContext } from '@kbn/core-plugins-server';
import { StreamsConfig } from '../common/config';
import { StreamsPluginSetup, StreamsPluginStart, config } from './plugin';
import { StreamsRouteRepository } from './routes';

export type { StreamsConfig, StreamsPluginSetup, StreamsPluginStart, StreamsRouteRepository };
export { config };

export const plugin = async (context: PluginInitializerContext<StreamsConfig>) => {
const { StreamsPlugin } = await import('./plugin');
return new StreamsPlugin(context);
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import {
ClusterPutComponentTemplateRequest,
MappingProperty,
} from '@elastic/elasticsearch/lib/api/types';
import { StreamDefinition } from '../../../../common/types';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
import { getComponentTemplateName } from './name';

export function generateLayer(
id: string,
definition: StreamDefinition
): ClusterPutComponentTemplateRequest {
const properties: Record<string, MappingProperty> = {};
definition.fields.forEach((field) => {
properties[field.name] = {
type: field.type,
};
});
return {
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.id) ? logsSettings : {},
mappings: {
subobjects: false,
properties,
},
},
version: ASSET_VERSION,
_meta: {
managed: true,
description: `Default settings for the ${id} stream`,
},
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';

export const logsSettings: IndicesIndexSettings = {
index: {
lifecycle: {
name: 'logs',
},
codec: 'best_compression',
mapping: {
total_fields: {
ignore_dynamic_beyond_limit: true,
},
ignore_malformed: true,
},
},
};
Loading

0 comments on commit b86dc81

Please sign in to comment.