Skip to content

Commit

Permalink
[EEM] Add option to enable backfill transform (#188379)
Browse files Browse the repository at this point in the history
## Summary

This PR adds 2 new optional settings for the history section of the
entity definition to enable a backfill transform:

- `history.settings.backfillSyncDelay` – A duration format, i.e. `15m`,
that enables the backfill transform and sets the sync delay to whatever
duration the user has configured
- `history.settings.backfilLookbackPeriod` – Controls how far back the
transform will start processing documents.

The idea behind this transform is that it will run with a longer delay
than the default transform. If there are events that show up after the
default transform's checkpoint has moved on, the backfill transform will
make a second sweep to backfill any data the default transform had
missed.

### Testing

Save the following config to `fake_logs.delayed.yaml`
```YAML
---
elasticsearch:
  installKibanaUser: false

kibana:
  installAssets: true
  host: "http://localhost:5601/kibana"

indexing:
  dataset: "fake_logs"
  eventsPerCycle: 100
  artificialIndexDelay: 300000

schedule:
  - template: "good"
    start: "now-1d"
    end: false
    eventsPerCycle: 100
```

run `node x-pack/scripts/data_forge.js --config fake_logs.delayed.yaml`
then run the following in Kibana's "Dev Tools":

```JSON
POST kbn:/internal/api/entities/definition
{
  "id": "fake-logs-services-no-backfill",
  "name": "Services for Fake Logs",
  "type": "service",
  "version": "0.0.1",
  "indexPatterns": ["kbn-data-forge-fake_logs.*"],
  "history": {
    "timestampField": "@timestamp",
    "interval": "1m"
  },
  "identityFields": ["labels.groupId", "labels.eventId"],
  "displayNameTemplate": "{{labels.groupId}}:{{labels.eventId}}",
  "metadata": [
    "host.name"
  ],
  "metrics": [
    {
      "name": "latency", 
      "equation": "A",
      "metrics": [
        {
          "name": "A",
          "aggregation": "avg",
          "field": "event.duration"
        }
      ]
    },
    {
      "name": "logRate",
      "equation": "A", 
      "metrics": [
        {
          "name": "A",
          "aggregation": "doc_count",
          "filter": "log.level: *"
        }
      ]
    },
    {
      "name": "errorRate",
      "equation": "A", 
      "metrics": [
        {
          "name": "A",
          "aggregation": "doc_count",
          "filter": "log.level: \"error\""
        }
      ]
    }
  ]
}

POST kbn:/internal/api/entities/definition
{
  "id": "fake-logs-services-with-backfill",
  "name": "Services for Fake Logs",
  "type": "service",
  "version": "0.0.1",
  "indexPatterns": ["kbn-data-forge-fake_logs.*"],
  "history": {
    "timestampField": "@timestamp",
    "interval": "1m",
    "settings": {
      "backfillSyncDelay": "10m",
      "backfillLookback": "24h"
    }
  },
  "identityFields": ["labels.groupId", "labels.eventId"],
  "displayNameTemplate": "{{labels.groupId}}:{{labels.eventId}}",
  "metadata": [
    "host.name"
  ],
  "metrics": [
    {
      "name": "latency", 
      "equation": "A",
      "metrics": [
        {
          "name": "A",
          "aggregation": "avg",
          "field": "event.duration"
        }
      ]
    },
    {
      "name": "logRate",
      "equation": "A", 
      "metrics": [
        {
          "name": "A",
          "aggregation": "doc_count",
          "filter": "log.level: *"
        }
      ]
    },
    {
      "name": "errorRate",
      "equation": "A", 
      "metrics": [
        {
          "name": "A",
          "aggregation": "doc_count",
          "filter": "log.level: \"error\""
        }
      ]
    }
  ]
}
```
The first transform should end up giving you history every 5 minutes,
the second will backfill and give you history every minute up until ~10
minutes. If you where to create a dashboard with the document counts for
the last hour, it would look like this:

![image](https://github.com/user-attachments/assets/2790c6a4-21c1-4258-a126-c12563e4b396)
  • Loading branch information
simianhacker authored Jul 24, 2024
1 parent f918fdc commit 6e09aef
Show file tree
Hide file tree
Showing 19 changed files with 615 additions and 10 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 106 additions & 0 deletions x-pack/packages/kbn-entities-schema/src/schema/common.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { SafeParseSuccess } from 'zod';
import { durationSchema, metadataSchema, semVerSchema } from './common';
import moment from 'moment';

describe('schemas', () => {
describe('metadataSchema', () => {
it('should error on empty string', () => {
const result = metadataSchema.safeParse('');
expect(result.success).toBeFalsy();
expect(result).toMatchSnapshot();
});
it('should error on empty string for source', () => {
const result = metadataSchema.safeParse({ source: '' });
expect(result.success).toBeFalsy();
expect(result).toMatchSnapshot();
});
it('should error on empty string for destination', () => {
const result = metadataSchema.safeParse({ source: 'host.name', destination: '', limit: 10 });
expect(result.success).toBeFalsy();
expect(result).toMatchSnapshot();
});
it('should error when limit is too low', () => {
const result = metadataSchema.safeParse({
source: 'host.name',
destination: 'host.name',
limit: 0,
});
expect(result.success).toBeFalsy();
expect(result).toMatchSnapshot();
});
it('should parse successfully with an valid string', () => {
const result = metadataSchema.safeParse('host.name');
expect(result.success).toBeTruthy();
expect(result).toMatchSnapshot();
});
it('should parse successfully with just a source', () => {
const result = metadataSchema.safeParse({ source: 'host.name' });
expect(result.success).toBeTruthy();
expect(result).toMatchSnapshot();
});
it('should parse successfully with a source and desitination', () => {
const result = metadataSchema.safeParse({ source: 'host.name', destination: 'hostName' });
expect(result.success).toBeTruthy();
expect(result).toMatchSnapshot();
});
it('should parse successfully with valid object', () => {
const result = metadataSchema.safeParse({
source: 'host.name',
destination: 'hostName',
size: 1,
});
expect(result.success).toBeTruthy();
expect(result).toMatchSnapshot();
});
});
describe('durationSchema', () => {
it('should work with 1m', () => {
const result = durationSchema.safeParse('1m');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('1m');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(60);
});
it('should work with 10s', () => {
const result = durationSchema.safeParse('10s');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('10s');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(10);
});
it('should work with 999h', () => {
const result = durationSchema.safeParse('999h');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('999h');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(999 * 60 * 60);
});
it('should work with 90d', () => {
const result = durationSchema.safeParse('90d');
expect(result.success).toBeTruthy();
expect((result as SafeParseSuccess<moment.Duration>).data.toJSON()).toBe('90d');
expect((result as SafeParseSuccess<moment.Duration>).data.asSeconds()).toEqual(
90 * 24 * 60 * 60
);
});
it('should not work with 1ms', () => {
const result = durationSchema.safeParse('1ms');
expect(result.success).toBeFalsy();
});
});
describe('semVerSchema', () => {
it('should validate with 999.999.999', () => {
const result = semVerSchema.safeParse('999.999.999');
expect(result.success).toBeTruthy();
});
it('should not validate with 0.9', () => {
const result = semVerSchema.safeParse('0.9');
expect(result.success).toBeFalsy();
expect(result).toMatchSnapshot();
});
});
});
27 changes: 25 additions & 2 deletions x-pack/packages/kbn-entities-schema/src/schema/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export const docCountMetricSchema = z.object({

export const durationSchema = z
.string()
.regex(/\d+[m|d|s|h]/)
.regex(/^\d+[m|d|s|h]$/)
.transform((val: string) => {
const parts = val.match(/(\d+)([m|s|h|d])/);
if (parts === null) {
Expand Down Expand Up @@ -93,7 +93,30 @@ export const metadataSchema = z
destination: metadata.destination ?? metadata.source,
limit: metadata.limit ?? 1000,
}))
.or(z.string().transform((value) => ({ source: value, destination: value, limit: 1000 })));
.or(z.string().transform((value) => ({ source: value, destination: value, limit: 1000 })))
.superRefine((value, ctx) => {
if (value.limit < 1) {
ctx.addIssue({
path: ['limit'],
code: z.ZodIssueCode.custom,
message: 'limit should be greater than 1',
});
}
if (value.source.length === 0) {
ctx.addIssue({
path: ['source'],
code: z.ZodIssueCode.custom,
message: 'source should not be empty',
});
}
if (value.destination.length === 0) {
ctx.addIssue({
path: ['destination'],
code: z.ZodIssueCode.custom,
message: 'destination should not be empty',
});
}
});

export const identityFieldsSchema = z
.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export const entityDefinitionSchema = z.object({
syncField: z.optional(z.string()),
syncDelay: z.optional(z.string()),
frequency: z.optional(z.string()),
backfillSyncDelay: z.optional(z.string()),
backfillLookbackPeriod: z.optional(durationSchema),
backfillFrequency: z.optional(z.string()),
})
),
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export const ENTITY_HISTORY_BASE_COMPONENT_TEMPLATE_V1 =
`${ENTITY_BASE_PREFIX}_${ENTITY_SCHEMA_VERSION_V1}_${ENTITY_HISTORY}_base` as const;
export const ENTITY_HISTORY_PREFIX_V1 =
`${ENTITY_BASE_PREFIX}-${ENTITY_SCHEMA_VERSION_V1}-${ENTITY_HISTORY}` as const;
export const ENTITY_HISTORY_BACKFILL_PREFIX_V1 =
`${ENTITY_BASE_PREFIX}-${ENTITY_SCHEMA_VERSION_V1}-${ENTITY_HISTORY}-backfill` as const;
export const ENTITY_HISTORY_INDEX_PREFIX_V1 =
`${ENTITY_INDEX_PREFIX}.${ENTITY_SCHEMA_VERSION_V1}.${ENTITY_HISTORY}` as const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ export async function createAndInstallHistoryTransform(
}
}

export async function createAndInstallHistoryBackfillTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
logger: Logger
) {
try {
const historyTransform = generateHistoryTransform(definition, true);
await retryTransientEsErrors(() => esClient.transform.putTransform(historyTransform), {
logger,
});
} catch (e) {
logger.error(
`Cannot create entity history backfill transform for [${definition.id}] entity definition`
);
throw e;
}
}

export async function createAndInstallLatestTransform(
esClient: ElasticsearchClient,
definition: EntityDefinition,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { entityDefinitionSchema } from '@kbn/entities-schema';
export const entityDefinitionWithBackfill = entityDefinitionSchema.parse({
id: 'admin-console-services',
version: '999.999.999',
name: 'Services for Admin Console',
type: 'service',
indexPatterns: ['kbn-data-forge-fake_stack.*'],
history: {
timestampField: '@timestamp',
interval: '1m',
settings: {
backfillSyncDelay: '15m',
backfillLookbackPeriod: '72h',
backfillFrequency: '5m',
},
},
identityFields: ['log.logger', { field: 'event.category', optional: true }],
displayNameTemplate: '{{log.logger}}{{#event.category}}:{{.}}{{/event.category}}',
metadata: ['tags', 'host.name', 'host.os.name', { source: '_index', destination: 'sourceIndex' }],
metrics: [
{
name: 'logRate',
equation: 'A',
metrics: [
{
name: 'A',
aggregation: 'doc_count',
filter: 'log.level: *',
},
],
},
{
name: 'errorRate',
equation: 'A',
metrics: [
{
name: 'A',
aggregation: 'doc_count',
filter: 'log.level: "ERROR"',
},
],
},
],
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import { EntityDefinition } from '@kbn/entities-schema';
import {
ENTITY_HISTORY_BACKFILL_PREFIX_V1,
ENTITY_HISTORY_INDEX_PREFIX_V1,
ENTITY_HISTORY_PREFIX_V1,
ENTITY_LATEST_INDEX_PREFIX_V1,
Expand All @@ -18,6 +19,11 @@ function generateHistoryId(definition: EntityDefinition) {
return `${ENTITY_HISTORY_PREFIX_V1}-${definition.id}`;
}

// History Backfill
export function generateHistoryBackfillTransformId(definition: EntityDefinition) {
return `${ENTITY_HISTORY_BACKFILL_PREFIX_V1}-${definition.id}`;
}

export const generateHistoryTransformId = generateHistoryId;
export const generateHistoryIngestPipelineId = generateHistoryId;

Expand Down
Loading

0 comments on commit 6e09aef

Please sign in to comment.