Skip to content

Commit

Permalink
[APM] Ensure rolled up data is excluded by default (#148510)
Browse files Browse the repository at this point in the history
Closes #148507
  • Loading branch information
dgieselaar authored Jan 30, 2023
1 parent 85023d1 commit c224972
Show file tree
Hide file tree
Showing 24 changed files with 317 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ const KEY_FIELDS: Array<keyof ApmFields> = [
'agent.name',
'service.environment',
'service.name',
'service.node.name',
'transaction.type',
'service.language.name',
];

export function createServiceMetricsAggregator(flushInterval: string) {
return createApmMetricAggregator(
{
filter: (event) => true,
filter: (event) => event['processor.event'] === 'transaction',
getAggregateKey: (event) => {
// see https://github.com/elastic/apm-server/blob/main/x-pack/apm-server/aggregation/txmetrics/aggregator.go
return hashKeysOf(event, KEY_FIELDS);
Expand All @@ -32,44 +32,39 @@ export function createServiceMetricsAggregator(flushInterval: string) {

return {
...set,
'metricset.name': 'service',
'metricset.name': 'service_transaction',
'metricset.interval': flushInterval,
'processor.event': 'metric',
'processor.name': 'metric',
'transaction.duration.histogram': createLosslessHistogram(),
'transaction.duration.summary': {
min: 0,
max: 0,
value_count: 0,
sum: 0,
},
'event.outcome_numeric': {
'event.success_count': {
sum: 0,
value_count: 0,
},
};
},
},
(metric, event) => {
if (event['processor.event'] === 'transaction') {
const duration = event['transaction.duration.us']!;
const duration = event['transaction.duration.us']!;

metric['transaction.duration.histogram'].record(duration);
metric['transaction.duration.histogram'].record(duration);

if (event['event.outcome'] === 'success' || event['event.outcome'] === 'failure') {
metric['event.outcome_numeric'].value_count += 1;
}
if (event['event.outcome'] === 'success' || event['event.outcome'] === 'failure') {
metric['event.success_count'].value_count += 1;
}

if (event['event.outcome'] === 'success') {
metric['event.outcome_numeric'].sum += 1;
}
if (event['event.outcome'] === 'success') {
metric['event.success_count'].sum += 1;
}

const summary = metric['transaction.duration.summary'];
const summary = metric['transaction.duration.summary'];

summary.min = Math.min(duration, metric['transaction.duration.summary'].min);
summary.max = Math.max(duration, metric['transaction.duration.summary'].max);
summary.sum += duration;
summary.value_count += 1;
}
summary.sum += duration;
summary.value_count += 1;
},
(metric) => {
const serialized = metric['transaction.duration.histogram'].serialize();
Expand All @@ -80,6 +75,7 @@ export function createServiceMetricsAggregator(flushInterval: string) {
};
// @ts-expect-error
metric._doc_count = serialized.total;

return metric;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export function createSpanMetricsAggregator(flushInterval: string) {
return {
...set,
'metricset.name': 'service_destination',
'metricset.interval': flushInterval,
'processor.event': 'metric',
'processor.name': 'metric',
'span.destination.service.response_time.count': 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,38 @@ export function createTransactionMetricsAggregator(flushInterval: string) {
return {
...set,
'metricset.name': 'transaction',
'metricset.interval': flushInterval,
'processor.event': 'metric',
'processor.name': 'metric',
'transaction.root': !event['parent.id'],
'transaction.duration.histogram': createLosslessHistogram(),
'transaction.duration.summary': {
value_count: 0,
sum: 0,
},
'event.success_count': {
sum: 0,
value_count: 0,
},
};
},
},
(metric, event) => {
metric['transaction.duration.histogram'].record(event['transaction.duration.us']!);
const duration = event['transaction.duration.us']!;
metric['transaction.duration.histogram'].record(duration);

if (event['event.outcome'] === 'success' || event['event.outcome'] === 'failure') {
metric['event.success_count'].value_count += 1;
}

if (event['event.outcome'] === 'success') {
metric['event.success_count'].sum += 1;
}

const summary = metric['transaction.duration.summary'];

summary.sum += duration;
summary.value_count += 1;
},
(metric) => {
const serialized = metric['transaction.duration.histogram'].serialize();
Expand All @@ -86,6 +109,7 @@ export function createTransactionMetricsAggregator(flushInterval: string) {
};
// @ts-expect-error
metric._doc_count = serialized.total;

return metric;
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ export function getRoutingTransform() {

if (metricsetName === 'app') {
index = `metrics-apm.app.${document['service.name']}-default`;
} else if (
metricsetName === 'transaction' ||
metricsetName === 'service_transaction' ||
metricsetName === 'service_destination'
) {
index = `metrics-apm.${metricsetName}.${document['metricset.interval']!}-default`;
} else {
index = `metrics-apm.internal-default`;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { ValuesType } from 'utility-types';
import { Logger } from '../../../utils/create_logger';
import { fork, sequential } from '../../../utils/stream_utils';
import { createBreakdownMetricsAggregator } from '../../aggregators/create_breakdown_metrics_aggregator';
import { createServiceMetricsAggregator } from '../../aggregators/create_service_metrics_aggregator';
import { createSpanMetricsAggregator } from '../../aggregators/create_span_metrics_aggregator';
import { createTransactionMetricsAggregator } from '../../aggregators/create_transaction_metrics_aggregator';
import { getApmServerMetadataTransform } from './get_apm_server_metadata_transform';
Expand Down Expand Up @@ -69,15 +70,12 @@ export class ApmSynthtraceEsClient {
}

async clean() {
this.logger.info(`Cleaning APM data streams ${DATA_STREAMS.join(', ')}`);

for (const name of DATA_STREAMS) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
await this.client.indices.deleteDataStream({ name });
}
}
this.logger.info(`Cleaning APM data streams ${DATA_STREAMS.join(',')}`);

await this.client.indices.deleteDataStream({
name: DATA_STREAMS.join(','),
expand_wildcards: ['open', 'hidden'],
});
}

async updateComponentTemplate(
Expand All @@ -92,14 +90,16 @@ export class ApmSynthtraceEsClient {
name,
});

const template = response.component_templates[0];

await this.client.cluster.putComponentTemplate({
name,
template: {
...modify(template.component_template.template),
},
});
await Promise.all(
response.component_templates.map((template) => {
return this.client.cluster.putComponentTemplate({
name: template.name,
template: {
...modify(template.component_template.template),
},
});
})
);

this.logger.info(`Updated component template: ${name}`);
}
Expand All @@ -118,7 +118,14 @@ export class ApmSynthtraceEsClient {
return (base: Readable) => {
const aggregators = [
createTransactionMetricsAggregator('1m'),
createTransactionMetricsAggregator('10m'),
createTransactionMetricsAggregator('60m'),
createServiceMetricsAggregator('1m'),
createServiceMetricsAggregator('10m'),
createServiceMetricsAggregator('60m'),
createSpanMetricsAggregator('1m'),
createSpanMetricsAggregator('10m'),
createSpanMetricsAggregator('60m'),
];

const serializationTransform = includeSerialization ? [getSerializeTransform()] : [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class ApmSynthtraceKibanaClient {

async fetchLatestApmPackageVersion() {
this.logger.debug(`Fetching latest APM package version`);
const fleetPackageApiUrl = `${this.target}/api/fleet/epm/packages/apm`;
const fleetPackageApiUrl = `${this.target}/api/fleet/epm/packages/apm?prerelease=true`;
const response = await fetch(fleetPackageApiUrl, {
method: 'GET',
headers: kibanaHeaders(),
Expand All @@ -35,6 +35,7 @@ export class ApmSynthtraceKibanaClient {
}

const { latestVersion } = responseJson.item;

return latestVersion as string;
}

Expand Down
94 changes: 1 addition & 93 deletions packages/kbn-apm-synthtrace/src/scenarios/continuous_rollups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@
* Side Public License, v 1.
*/

import { apm, ApmFields } from '@kbn/apm-synthtrace-client';
import { merge, range as lodashRange } from 'lodash';
import { PassThrough, pipeline, Transform } from 'stream';
import { apm, ApmFields, ESDocumentWithOperation } from '@kbn/apm-synthtrace-client';
import { Scenario } from '../cli/scenario';
import { createServiceMetricsAggregator } from '../lib/apm/aggregators/create_service_metrics_aggregator';
import { createTransactionMetricsAggregator } from '../lib/apm/aggregators/create_transaction_metrics_aggregator';
import { ComponentTemplateName } from '../lib/apm/client/apm_synthtrace_es_client';
import { getApmServerMetadataTransform } from '../lib/apm/client/apm_synthtrace_es_client/get_apm_server_metadata_transform';
import { getDedotTransform } from '../lib/apm/client/apm_synthtrace_es_client/get_dedot_transform';
import { getIntakeDefaultsTransform } from '../lib/apm/client/apm_synthtrace_es_client/get_intake_defaults_transform';
import { getSerializeTransform } from '../lib/apm/client/apm_synthtrace_es_client/get_serialize_transform';
import { fork } from '../lib/utils/stream_utils';

const scenario: Scenario<ApmFields> = async ({ logger, scenarioOpts }) => {
const {
Expand All @@ -37,95 +29,11 @@ const scenario: Scenario<ApmFields> = async ({ logger, scenarioOpts }) => {
number_of_shards: 8,
},
},
mappings: {
properties: {
event: {
properties: {
outcome_numeric: {
type: 'aggregate_metric_double',
metrics: ['sum', 'value_count'],
default_metric: 'sum',
},
},
},
transaction: {
properties: {
duration: {
properties: {
summary: {
type: 'aggregate_metric_double',
metrics: ['min', 'max', 'sum', 'value_count'],
default_metric: 'max',
},
},
},
},
},
metricset: {
properties: {
interval: {
type: 'constant_keyword' as const,
},
},
},
},
},
};

return merge({}, template, next);
}
);

function withInterval(cb: (flushInterval: string) => Transform, flushInterval: string) {
const aggregator = cb(flushInterval);

aggregator.pipe(
new PassThrough({
objectMode: true,
write(metric: ApmFields, encoding, callback) {
metric['metricset.interval'] = flushInterval;
callback();
},
})
);

return aggregator;
}

const aggregators = [
withInterval(createServiceMetricsAggregator, '1m'),
withInterval(createServiceMetricsAggregator, '10m'),
withInterval(createServiceMetricsAggregator, '60m'),
withInterval(createTransactionMetricsAggregator, '1m'),
withInterval(createTransactionMetricsAggregator, '10m'),
withInterval(createTransactionMetricsAggregator, '60m'),
];

apmEsClient.pipeline((base) => {
return pipeline(
base,
getSerializeTransform(),
getIntakeDefaultsTransform(),
fork(...aggregators),
new Transform({
objectMode: true,
transform(event: ESDocumentWithOperation<ApmFields>, encoding, callback) {
const index = `metrics-apm.internal-${
event['metricset.name'] === 'transaction' ? 'transaction' : 'service'
}.${event['metricset.interval']}`;
event._index = index;
callback(null, event);
},
}),
getApmServerMetadataTransform(apmEsClient.getVersion()),
getDedotTransform(),
(err) => {
if (err) {
logger.error(err);
}
}
);
});
},
generate: ({ range }) => {
const TRANSACTION_TYPES = ['request', 'custom'];
Expand Down
Loading

0 comments on commit c224972

Please sign in to comment.