Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into feature/cases-sever…
Browse files Browse the repository at this point in the history
…ity-field-create-form
  • Loading branch information
Esteban Beltran committed May 5, 2022
2 parents e3e0dca + 5ab0fa5 commit da31573
Show file tree
Hide file tree
Showing 84 changed files with 1,494 additions and 1,607 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
/src/apm.js @elastic/kibana-core @vigneshshanmugam
/packages/kbn-apm-config-loader/ @elastic/kibana-core @vigneshshanmugam
/src/core/types/elasticsearch @elastic/apm-ui
/packages/elastic-apm-synthtrace/ @elastic/apm-ui
#CC# /src/plugins/apm_oss/ @elastic/apm-ui
#CC# /x-pack/plugins/observability/ @elastic/apm-ui

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { random } from 'lodash';
import { Client } from '@elastic/elasticsearch';
import { ApmFields } from '../apm_fields';
import { Fields } from '../../entity';
import { StreamAggregator } from '../../stream_aggregator';

type LatencyState = {
count: number;
min: number;
max: number;
sum: number;
timestamp: number;
} & Pick<ApmFields, 'service.name' | 'service.environment' | 'transaction.type'>;

export type ServiceFields = Fields &
Pick<
ApmFields,
| 'timestamp.us'
| 'ecs.version'
| 'metricset.name'
| 'observer'
| 'processor.event'
| 'processor.name'
| 'service.name'
| 'service.version'
| 'service.environment'
| 'transaction.type'
> &
Partial<{
'transaction.duration.aggregate': {
min: number;
max: number;
sum: number;
value_count: number;
};
}>;

export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
public readonly name;

constructor() {
this.name = 'service-latency';
}

getDataStreamName(): string {
return 'metrics-apm.service';
}

getMappings(): Record<string, any> {
return {
properties: {
'@timestamp': {
type: 'date',
format: 'date_optional_time||epoch_millis',
},
transaction: {
type: 'object',
properties: {
type: { type: 'keyword', time_series_dimension: true },
duration: {
type: 'object',
properties: {
aggregate: {
type: 'aggregate_metric_double',
metrics: ['min', 'max', 'sum', 'value_count'],
default_metric: 'sum',
time_series_metric: 'gauge',
},
},
},
},
},
service: {
type: 'object',
properties: {
name: { type: 'keyword', time_series_dimension: true },
environment: { type: 'keyword', time_series_dimension: true },
},
},
},
};
}

getDimensions(): string[] {
return ['service.name', 'service.environment', 'transaction.type'];
}

getWriteTarget(document: Record<string, any>): string | null {
const eventType = document.metricset?.name;
if (eventType === 'service') return 'metrics-apm.service-default';
return null;
}

private state: Record<string, LatencyState> = {};

private processedComponent: number = 0;

process(event: ApmFields): Fields[] | null {
if (!event['@timestamp']) return null;
const service = event['service.name']!;
const environment = event['service.environment'] ?? 'production';
const transactionType = event['transaction.type'] ?? 'request';
const key = `${service}-${environment}-${transactionType}`;
const addToState = (timestamp: number) => {
if (!this.state[key]) {
this.state[key] = {
timestamp,
count: 0,
min: 0,
max: 0,
sum: 0,
'service.name': service,
'service.environment': environment,
'transaction.type': transactionType,
};
}
const duration = Number(event['transaction.duration.us']);
if (duration >= 0) {
const state = this.state[key];

state.count++;
state.sum += duration;
if (duration > state.max) state.max = duration;
if (duration < state.min) state.min = Math.min(0, duration);
}
};

// ensure we flush current state first if event falls out of the current max window age
if (this.state[key]) {
const diff = Math.abs(event['@timestamp'] - this.state[key].timestamp);
if (diff >= 1000 * 60) {
const fields = this.createServiceFields(key);
delete this.state[key];
addToState(event['@timestamp']);
return [fields];
}
}

addToState(event['@timestamp']);
// if cardinality is too high force emit of current state
if (Object.keys(this.state).length === 1000) {
return this.flush();
}

return null;
}

flush(): Fields[] {
const fields = Object.keys(this.state).map((key) => this.createServiceFields(key));
this.state = {};
return fields;
}

private createServiceFields(key: string): ServiceFields {
this.processedComponent = ++this.processedComponent % 1000;
const component = Date.now() % 100;
const state = this.state[key];
return {
'@timestamp': state.timestamp + random(0, 100) + component + this.processedComponent,
'metricset.name': 'service',
'processor.event': 'metric',
'service.name': state['service.name'],
'service.environment': state['service.environment'],
'transaction.type': state['transaction.type'],
'transaction.duration.aggregate': {
min: state.min,
max: state.max,
sum: state.sum,
value_count: state.count,
},
};
}

async bootstrapElasticsearch(esClient: Client): Promise<void> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

import { Client } from '@elastic/elasticsearch';
import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
import { cleanWriteTargets } from '../../utils/clean_write_targets';
import { getApmWriteTargets } from '../utils/get_apm_write_targets';
import { Logger } from '../../utils/create_logger';
Expand All @@ -15,6 +16,7 @@ import { EntityIterable } from '../../entity_iterable';
import { StreamProcessor } from '../../stream_processor';
import { EntityStreams } from '../../entity_streams';
import { Fields } from '../../entity';
import { StreamAggregator } from '../../stream_aggregator';

export interface StreamToBulkOptions<TFields extends Fields = ApmFields> {
concurrency?: number;
Expand Down Expand Up @@ -57,7 +59,7 @@ export class ApmSynthtraceEsClient {
return info.version.number;
}

async clean() {
async clean(dataStreams?: string[]) {
return this.getWriteTargets().then(async (writeTargets) => {
const indices = Object.values(writeTargets);
this.logger.info(`Attempting to clean: ${indices}`);
Expand All @@ -68,7 +70,7 @@ export class ApmSynthtraceEsClient {
logger: this.logger,
});
}
for (const name of indices) {
for (const name of indices.concat(dataStreams ?? [])) {
const dataStream = await this.client.indices.getDataStream({ name }, { ignore: [404] });
if (dataStream.data_streams && dataStream.data_streams.length > 0) {
this.logger.debug(`Deleting datastream: ${name}`);
Expand Down Expand Up @@ -149,7 +151,6 @@ export class ApmSynthtraceEsClient {
streamProcessor?: StreamProcessor
) {
const dataStream = Array.isArray(events) ? new EntityStreams(events) : events;

const sp =
streamProcessor != null
? streamProcessor
Expand All @@ -165,7 +166,7 @@ export class ApmSynthtraceEsClient {
await this.logger.perf('enumerate_scenario', async () => {
// @ts-ignore
// We just want to enumerate
for await (item of sp.streamToDocumentAsync(sp.toDocument, dataStream)) {
for await (item of sp.streamToDocumentAsync((e) => sp.toDocument(e), dataStream)) {
if (yielded === 0) {
options.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
Expand All @@ -185,7 +186,7 @@ export class ApmSynthtraceEsClient {
flushBytes: 500000,
// TODO https://github.com/elastic/elasticsearch-js/issues/1610
// having to map here is awkward, it'd be better to map just before serialization.
datasource: sp.streamToDocumentAsync(sp.toDocument, dataStream),
datasource: sp.streamToDocumentAsync((e) => sp.toDocument(e), dataStream),
onDrop: (doc) => {
this.logger.info(JSON.stringify(doc, null, 2));
},
Expand All @@ -197,11 +198,12 @@ export class ApmSynthtraceEsClient {
options?.itemStartStopCallback?.apply(this, [item, false]);
yielded++;
}
const index = options?.mapToIndex
? options?.mapToIndex(item)
: !this.forceLegacyIndices
? StreamProcessor.getDataStreamForEvent(item, writeTargets)
: StreamProcessor.getIndexForEvent(item, writeTargets);
let index = options?.mapToIndex ? options?.mapToIndex(item) : null;
if (!index) {
index = !this.forceLegacyIndices
? sp.getDataStreamForEvent(item, writeTargets)
: StreamProcessor.getIndexForEvent(item, writeTargets);
}
return { create: { _index: index } };
},
});
Expand All @@ -211,4 +213,53 @@ export class ApmSynthtraceEsClient {
await this.refresh();
}
}

async createDataStream(aggregator: StreamAggregator) {
const datastreamName = aggregator.getDataStreamName();
const mappings = aggregator.getMappings();
const dimensions = aggregator.getDimensions();

const indexSettings: IndicesIndexSettings = { lifecycle: { name: 'metrics' } };
if (dimensions.length > 0) {
indexSettings.mode = 'time_series';
indexSettings.routing_path = dimensions;
}

await this.client.cluster.putComponentTemplate({
name: `${datastreamName}-mappings`,
template: {
mappings,
},
_meta: {
description: `Mappings for ${datastreamName}-*`,
},
});
this.logger.info(`Created mapping component template for ${datastreamName}-*`);

await this.client.cluster.putComponentTemplate({
name: `${datastreamName}-settings`,
template: {
settings: {
index: indexSettings,
},
},
_meta: {
description: `Settings for ${datastreamName}-*`,
},
});
this.logger.info(`Created settings component template for ${datastreamName}-*`);

await this.client.indices.putIndexTemplate({
name: `${datastreamName}-index_template`,
index_patterns: [`${datastreamName}-*`],
data_stream: {},
composed_of: [`${datastreamName}-mappings`, `${datastreamName}-settings`],
priority: 500,
});
this.logger.info(`Created index template for ${datastreamName}-*`);

await this.client.indices.createDataStream({ name: datastreamName + '-default' });

await aggregator.bootstrapElasticsearch(this.client);
}
}
27 changes: 27 additions & 0 deletions packages/elastic-apm-synthtrace/src/lib/stream_aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Client } from '@elastic/elasticsearch';
import { ApmFields, Fields } from '..';

export interface StreamAggregator<TFields extends Fields = ApmFields> {
name: string;

getWriteTarget(document: Record<string, any>): string | null;

process(event: TFields): Fields[] | null;

flush(): Fields[];

bootstrapElasticsearch(esClient: Client): Promise<void>;

getDataStreamName(): string;

getDimensions(): string[];

getMappings(): Record<string, any>;
}
Loading

0 comments on commit da31573

Please sign in to comment.