Skip to content

Latest commit

 

History

History
772 lines (551 loc) · 40.1 KB

File metadata and controls

772 lines (551 loc) · 40.1 KB

Transform Processor

Status
Stability alpha: traces, metrics, logs
Distributions contrib, k8s
Warnings Unsound Transformations, Identity Conflict, Orphaned Telemetry, Other
Issues Open issues Closed issues
Code Owners @TylerHelmuth, @kentquirk, @bogdandrutu, @evan-bradley

The transform processor modifies telemetry based on configuration using the OpenTelemetry Transformation Language.

For each signal type, the processor takes a list of conditions and statements associated to a Context type and executes the conditions and statements against the incoming telemetry in the order specified in the config. Each condition and statement can access and transform telemetry using functions and allow the use of a condition to help decide whether the function should be executed.

Config

The transform processor allows configuring multiple context statements for traces, metrics, and logs. The value of context specifies which OTTL Context to use when interpreting the associated statements. The global conditions and statement strings, which must be OTTL compatible, will be passed to OTTL and interpreted using the associated context. The condition string should contain a Where clause body without the where keyword at the beginning.

Each context will be processed in the order specified. Within a context, each global condition is checked and if any evaluates to true, the statements are executed in order. If a context doesn't meet any of the conditions, then the associated statement will be skipped.

Each statement may have a Where clause that acts as an additional check for whether to execute the statement.

The transform processor also allows configuring an optional field, error_mode, which will determine how the processor reacts to errors that occur while processing a statement.

error_mode description
ignore The processor ignores errors returned by statements, logs the error, and continues on to the next statement. This is the recommended mode.
silent The processor ignores errors returned by statements, does not log the error, and continues on to the next statement.
propagate The processor returns the error up the pipeline. This will result in the payload being dropped from the collector.

If not specified, propagate will be used.

transform:
  error_mode: ignore
  <trace|metric|log>_statements:
    - context: string
      conditions: 
        - string
        - string
      statements:
        - string
        - string
        - string
    - context: string
      statements:
        - string
        - string
        - string

Proper use of contexts will provide increased performance and capabilities. See Contexts for more details.

Valid values for context are:

Signal Context Values
trace_statements resource, scope, span, and spanevent
metric_statements resource, scope, metric, and datapoint
log_statements resource, scope, and log

conditions is a list comprised of multiple where clauses, which will be processed as global conditions for the accompanying set of statements. The conditions are ORed together, which means only one condition needs to evaluate to true in order for the statements (including their individual Where clauses) to be executed.

transform:
  error_mode: ignore
  metric_statements:
    - context: metric
      conditions: 
        - type == METRIC_DATA_TYPE_SUM
      statements:
        - set(description, "Sum")

  log_statements:
    - context: log
      conditions:
      - IsMap(body) and body["object"] != nil
      statements:
      - set(body, attributes["http.route"])

Example

The example takes advantage of context efficiency by grouping transformations with the context which it intends to transform. See Contexts for more details.

Example configuration:

transform:
  error_mode: ignore
  trace_statements:
    - context: resource
      statements:
        - keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])
        - replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")
        - limit(attributes, 100, [])
        - truncate_all(attributes, 4096)
    - context: span
      statements:
        - set(status.code, 1) where attributes["http.path"] == "/health"
        - set(name, attributes["http.route"])
        - replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
        - limit(attributes, 100, [])
        - truncate_all(attributes, 4096)

  metric_statements:
    - context: resource
      statements:
      - keep_keys(attributes, ["host.name"])
      - truncate_all(attributes, 4096)
    - context: metric
      statements:
        - set(description, "Sum") where type == "Sum"
        - convert_sum_to_gauge() where name == "system.processes.count"
        - convert_gauge_to_sum("cumulative", false) where name == "prometheus_metric"
    - context: datapoint
      statements:
        - limit(attributes, 100, ["host.name"])
        - truncate_all(attributes, 4096)
        
  log_statements:
    - context: resource
      statements:
        - keep_keys(attributes, ["service.name", "service.namespace", "cloud.region"])
    - context: log
      statements:
        - set(severity_text, "FAIL") where body == "request failed"
        - replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")
        - replace_all_patterns(attributes, "value", "/account/\\d{4}", "/account/{accountId}")
        - set(body, attributes["http.route"])

Grammar

You can learn more in-depth details on the capabilities and limitations of the OpenTelemetry Transformation Language used by the transform processor by reading about its grammar.

Contexts

The transform processor utilizes the OTTL's contexts to transform Resource, Scope, Span, SpanEvent, Metric, DataPoint, and Log telemetry. The contexts allow the OTTL to interact with the underlying telemetry data in its pdata form.

Each context allows transformation of its type of telemetry.
For example, statements associated to a resource context will be able to transform the resource's attributes and dropped_attributes_count.

Contexts NEVER supply access to individual items "lower" in the protobuf definition.

  • This means statements associated to a resource WILL NOT be able to access the underlying instrumentation scopes.
  • This means statements associated to a scope WILL NOT be able to access the underlying telemetry slices (spans, metrics, or logs).
  • Similarly, statements associated to a metric WILL NOT be able to access individual datapoints, but can access the entire datapoints slice.
  • Similarly, statements associated to a span WILL NOT be able to access individual SpanEvents, but can access the entire SpanEvents slice.

For practical purposes, this means that a context cannot make decisions on its telemetry based on telemetry "lower" in the structure. For example, the following context statement is not possible because it attempts to use individual datapoint attributes in the condition of a statements that is associated to a metric

metric_statements:
- context: metric
  statements:
  - set(description, "test passed") where datapoints.attributes["test"] == "pass"

Context ALWAYS supply access to the items "higher" in the protobuf definition that are associated to the telemetry being transformed.

  • This means that statements associated to a datapoint have access to a datapoint's metric, instrumentation scope, and resource.
  • This means that statements associated to a spanevent have access to a spanevent's span, instrumentation scope, and resource.
  • This means that statements associated to a span/metric/log have access to the telemetry's instrumentation scope, and resource.
  • This means that statements associated to a scope have access to the scope's resource.

For example, the following context statement is possible because datapoint statements can access the datapoint's metric.

metric_statements:
- context: datapoint
  statements:
    - set(metric.description, "test passed") where attributes["test"] == "pass"

Whenever possible, associate your statements to the context that the statement intend to transform. Although you can modify resource attributes associated to a span using the span context, it is more efficient to use the resource context. This is because contexts are nested: the efficiency comes because higher-level contexts can avoid iterating through any of the contexts at a lower level.

Supported functions:

Since the transform processor utilizes the OTTL's contexts for Traces, Metrics, and Logs, it is able to utilize functions that expect pdata in addition to any common functions. These common functions can be used for any signal.

In addition to OTTL functions, the processor defines its own functions to help with transformations specific to this processor:

Metrics only functions

convert_sum_to_gauge

convert_sum_to_gauge()

Converts incoming metrics of type "Sum" to type "Gauge", retaining the metric's datapoints. Noop for metrics that are not of type "Sum".

NOTE: This function may cause a metric to break semantics for Gauge metrics. Use at your own risk.

Examples:

  • convert_sum_to_gauge()

convert_gauge_to_sum

convert_gauge_to_sum(aggregation_temporality, is_monotonic)

Converts incoming metrics of type "Gauge" to type "Sum", retaining the metric's datapoints and setting its aggregation temporality and monotonicity accordingly. Noop for metrics that are not of type "Gauge".

aggregation_temporality is a string ("cumulative" or "delta") that specifies the resultant metric's aggregation temporality. is_monotonic is a boolean that specifies the resultant metric's monotonicity.

NOTE: This function may cause a metric to break semantics for Sum metrics. Use at your own risk.

Examples:

  • convert_gauge_to_sum("cumulative", false)

  • convert_gauge_to_sum("delta", true)

extract_count_metric

Note

This function supports Histograms, ExponentialHistograms and Summaries.

extract_count_metric(is_monotonic)

The extract_count_metric function creates a new Sum metric from a Histogram, ExponentialHistogram or Summary's count value. A metric will only be created if there is at least one data point.

is_monotonic is a boolean representing the monotonicity of the new metric.

The name for the new metric will be <original metric name>_count. The fields that are copied are: timestamp, starttimestamp, attibutes, description, and aggregation_temporality. As metrics of type Summary don't have an aggregation_temporality field, this field will be set to AGGREGATION_TEMPORALITY_CUMULATIVE for those metrics.

The new metric that is created will be passed to all subsequent statements in the metrics statements list.

Warning

This function may cause a metric to break semantics for Sum metrics. Use only if you're confident you know what the resulting monotonicity should be.

Examples:

  • extract_count_metric(true)

  • extract_count_metric(false)

extract_sum_metric

Note

This function supports Histograms, ExponentialHistograms and Summaries.

extract_sum_metric(is_monotonic)

The extract_sum_metric function creates a new Sum metric from a Histogram, ExponentialHistogram or Summary's sum value. If the sum value of a Histogram or ExponentialHistogram data point is missing, no data point is added to the output metric. A metric will only be created if there is at least one data point.

is_monotonic is a boolean representing the monotonicity of the new metric.

The name for the new metric will be <original metric name>_sum. The fields that are copied are: timestamp, starttimestamp, attibutes, description, and aggregation_temporality. As metrics of type Summary don't have an aggregation_temporality field, this field will be set to AGGREGATION_TEMPORALITY_CUMULATIVE for those metrics.

The new metric that is created will be passed to all subsequent statements in the metrics statements list.

Warning

This function may cause a metric to break semantics for Sum metrics. Use only if you're confident you know what the resulting monotonicity should be.

Examples:

  • extract_sum_metric(true)

  • extract_sum_metric(false)

convert_summary_count_val_to_sum

convert_summary_count_val_to_sum(aggregation_temporality, is_monotonic)

The convert_summary_count_val_to_sum function creates a new Sum metric from a Summary's count value.

aggregation_temporality is a string ("cumulative" or "delta") representing the desired aggregation temporality of the new metric. is_monotonic is a boolean representing the monotonicity of the new metric.

The name for the new metric will be <summary metric name>_count. The fields that are copied are: timestamp, starttimestamp, attibutes, and description. The new metric that is created will be passed to all functions in the metrics statements list. Function conditions will apply.

NOTE: This function may cause a metric to break semantics for Sum metrics. Use at your own risk.

Examples:

  • convert_summary_count_val_to_sum("delta", true)

  • convert_summary_count_val_to_sum("cumulative", false)

convert_summary_sum_val_to_sum

convert_summary_sum_val_to_sum(aggregation_temporality, is_monotonic)

The convert_summary_sum_val_to_sum function creates a new Sum metric from a Summary's sum value.

aggregation_temporality is a string ("cumulative" or "delta") representing the desired aggregation temporality of the new metric. is_monotonic is a boolean representing the monotonicity of the new metric.

The name for the new metric will be <summary metric name>_sum. The fields that are copied are: timestamp, starttimestamp, attibutes, and description. The new metric that is created will be passed to all functions in the metrics statements list. Function conditions will apply.

NOTE: This function may cause a metric to break semantics for Sum metrics. Use at your own risk.

Examples:

  • convert_summary_sum_val_to_sum("delta", true)

  • convert_summary_sum_val_to_sum("cumulative", false)

copy_metric

copy_metric(Optional[name], Optional[description], Optional[unit])

The copy_metric function copies the current metric, adding it to the end of the metric slice.

name is an optional string. description is an optional string. unit is an optional string.

The new metric will be exactly the same as the current metric. You can use the optional parameters to set the new metric's name, description, and unit.

NOTE: The new metric is appended to the end of the metric slice and therefore will be included in all the metric statements. It is a best practice to ALWAYS include a Where clause when copying a metric that WILL NOT match the new metric.

Examples:

  • copy_metric(name="http.request.status_code", unit="s") where name == "http.status_code

  • copy_metric(desc="new desc") where description == "old desc"

convert_exponential_histogram_to_histogram

Warning: The approach used in this function to convert exponential histograms to explicit histograms is not part of the OpenTelemetry Specification.

convert_exponential_histogram_to_histogram(distribution, [ExplicitBounds])

The convert_exponential_histogram_to_histogram function converts an ExponentialHistogram to an Explicit (normal) Histogram.

This function requires 2 arguments:

  • distribution - This argument defines the distribution algorithm used to allocate the exponential histogram datapoints into a new Explicit Histogram. There are 4 options:

    • upper - This approach identifies the highest possible value of each exponential bucket (the upper bound) and uses it to distribute the datapoints by comparing the upper bound of each bucket with the ExplicitBounds provided. This approach works better for small/narrow exponential histograms where the difference between the upper bounds and lower bounds are small.

      For example, Given:

      1. count = 10
      2. Boundaries: [5, 10, 15, 20, 25]
      3. Upper Bound: 15 Process:
      4. Start with zeros: [0, 0, 0, 0, 0]
      5. Iterate the boundaries and compare $upper = 15$ with each boundary: - $15&gt;5$ (skip) - $15&gt;10$ (skip) - $15&lt;=15$ (allocate count to this boundary)
      6. Allocate count: [0, 0, 10, 0, 0]
      7. Final Counts: [0, 0, 10, 0, 0]
    • midpoint - This approach works in a similar way to the upper approach, but instead of using the upper bound, it uses the midpoint of each exponential bucket. The midpoint is identified by calculating the average of the upper and lower bounds. This approach also works better for small/narrow exponential histograms.

      The uniform and random distribution algorithms both utilise the concept of intersecting boundaries. Intersecting boundaries are any boundary in the boundaries array that falls between or on the lower and upper values of the Exponential Histogram boundaries. For Example: if you have an Exponential Histogram bucket with a lower bound of 10 and upper of 20, and your boundaries array is [5, 10, 15, 20, 25], the intersecting boundaries are 10, 15, and 20 because they lie within the range [10, 20].

    • uniform - This approach distributes the datapoints for each bucket uniformly across the intersecting ExplicitBounds. The algorithm works as follows:

      • If there are valid intersecting boundaries, the function evenly distributes the count across these boundaries.
      • Calculate the count to be allocated to each boundary.
      • If there is a remainder after dividing the count equally, it distributes the remainder by incrementing the count for some of the boundaries until the remainder is exhausted.

      For example Given:

      1. count = 10
      2. Exponential Histogram Bounds: [10, 20]
      3. Boundaries: [5, 10, 15, 20, 25]
      4. Intersecting Boundaries: [10, 15, 20]
      5. Number of Intersecting Boundaries: 3
      6. Using the formula: $count/numOfIntersections=10/3=3r1$

      Uniform Allocation:

      1. Start with zeros: [0, 0, 0, 0, 0]
      2. Allocate 3 to each: [0, 3, 3, 3, 0]
      3. Distribute remainder $r$ 1: [0, 4, 3, 3, 0]
      4. Final Counts: [0, 4, 3, 3, 0]
    • random - This approach distributes the datapoints for each bucket randomly across the intersecting ExplicitBounds. This approach works in a similar manner to the uniform distribution algorithm with the main difference being that points are distributed randomly instead of uniformly. This works as follows:

      • If there are valid intersecting boundaries, calculate the proportion of the count that should be allocated to each boundary based on the overlap of the boundary with the provided range (lower to upper).
      • For each boundary, a random fraction of the calculated proportion is allocated.
      • Any remaining count (due to rounding or random distribution) is then distributed randomly among the intersecting boundaries.
      • If the bucket range does not intersect with any boundaries, the entire count is assigned to the start boundary.
  • ExplicitBounds represents the list of bucket boundaries for the new histogram. This argument is required and cannot be empty.

WARNINGS:

  • The process of converting an ExponentialHistogram to an Explicit Histogram is not perfect and may result in a loss of precision. It is important to define an appropriate set of bucket boundaries and identify the best distribution approach for your data in order to minimize this loss.

    For example, selecting Boundaries that are too high or too low may result histogram buckets that are too wide or too narrow, respectively.

  • Negative Bucket Counts are not supported in Explicit Histograms, as such negative bucket counts are ignored.

  • ZeroCounts are only allocated if the ExplicitBounds array contains a zero boundary. That is, if the Explicit Boundaries that you provide does not start with 0, the function will not allocate any zero counts from the Exponential Histogram.

This function should only be used when Exponential Histograms are not suitable for the downstream consumers or if upstream metric sources are unable to generate Explicit Histograms.

Example:

  • convert_exponential_histogram_to_histogram("random", [0.0, 10.0, 100.0, 1000.0, 10000.0])

scale_metric

scale_metric(factor, Optional[unit])

The scale_metric function multiplies the values in the data points in the metric by the float value factor. If the optional string unit is provided, the metric's unit will be set to this value. The supported data types are:

Supported metric types are Gauge, Sum, Histogram, and Summary.

Examples:

  • scale_metric(0.1): Scale the metric by a factor of 0.1. The unit of the metric will not be modified.
  • scale_metric(10.0, "kWh"): Scale the metric by a factor of 10.0 and sets the unit to kWh.

aggregate_on_attributes

aggregate_on_attributes(function, Optional[attributes])

The aggregate_on_attributes function aggregates all datapoints in the metric based on the supplied attributes. function is a case-sensitive string that represents the aggregation function and attributes is an optional list of attribute keys of type string to aggregate upon.

aggregate_on_attributes function removes all attributes that are present in datapoints except the ones that are specified in the attributes parameter. If attributes parameter is not set, all attributes are removed from datapoints. Afterwards all datapoints are aggregated depending on the attributes left (none or the ones present in the list).

NOTE: This function is supported only in metric context.

The following metric types can be aggregated:

  • sum
  • gauge
  • histogram
  • exponential histogram

Supported aggregation functions are:

  • sum
  • max
  • min
  • mean
  • median
  • count

NOTE: Only the sum aggregation function is supported for histogram and exponential histogram datatypes.

Examples:

  • aggregate_on_attributes("sum", ["attr1", "attr2"]) where name == "system.memory.usage"
  • aggregate_on_attributes("max") where name == "system.memory.usage"

The aggregate_on_attributes function can also be used in conjunction with keep_matching_keys or delete_matching_keys.

For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence:

statements:
   - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage"
   - aggregate_on_attributes("sum") where name == "system.memory.usage"

To aggregate only using a specified set of attributes, you can use keep_matching_keys.

aggregate_on_attribute_value

aggregate_on_attribute_value(function, attribute, values, newValue)

The aggregate_on_attribute_value function aggregates all datapoints in the metric containing the attribute attribute (type string) with one of the values present in the values parameter (list of strings) into a single datapoint where the attribute has the value newValue (type string). function is a case-sensitive string that represents the aggregation function.

NOTE: This function is supported only in metric context.

The following metric types can be aggregated:

  • sum
  • gauge
  • histogram
  • exponential histogram

Supported aggregation functions are:

  • sum
  • max
  • min
  • mean
  • median
  • count

NOTE: Only the sum agregation function is supported for histogram and exponential histogram datatypes.

Examples:

  • aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"

The aggregate_on_attribute_value function can also be used in conjunction with keep_matching_keys or delete_matching_keys.

For example, to remove attribute keys matching a regex and aggregate the metrics on the remaining attributes, you can perform the following statement sequence:

statements:
   - delete_matching_keys(attributes, "(?i).*myRegex.*") where name == "system.memory.usage"
   - aggregate_on_attribute_value("sum", "attr1", ["val1", "val2"], "new_val") where name == "system.memory.usage"

To aggregate only using a specified set of attributes, you can use keep_matching_keys.

Examples

Perform transformation if field does not exist

Set attribute test to "pass" if the attribute test does not exist:

transform:
  error_mode: ignore
  trace_statements:
    - context: span
      statements:
        # accessing a map with a key that does not exist will return nil. 
        - set(attributes["test"], "pass") where attributes["test"] == nil

Rename attribute

There are 2 ways to rename an attribute key:

You can either set a new attribute and delete the old:

transform:
  error_mode: ignore
  trace_statements:
    - context: resource
      statements:
        - set(attributes["namespace"], attributes["k8s.namespace.name"])
        - delete_key(attributes, "k8s.namespace.name") 

Or you can update the key using regex:

transform:
  error_mode: ignore
  trace_statements:
    - context: resource
      statements:
        - replace_all_patterns(attributes, "key", "k8s\\.namespace\\.name", "namespace")

Move field to attribute

Set attribute body to the value of the log body:

transform:
  error_mode: ignore
  log_statements:
    - context: log
      statements: 
        - set(attributes["body"], body)

Combine two attributes

Set attribute test to the value of attributes "foo" and "bar" combined.

transform:
  error_mode: ignore
  trace_statements:
    - context: resource
      statements:
        # Use Concat function to combine any number of string, separated by a delimiter.
        - set(attributes["test"], Concat([attributes["foo"], attributes["bar"]], " "))

Parsing JSON logs

Given the following json body

{
  "name": "log",
  "attr1": "foo",
  "attr2": "bar",
  "nested": {
    "attr3": "example"
  }
}

add specific fields as attributes on the log:

transform:
  error_mode: ignore
  log_statements:
    - context: log
      statements:
        # Parse body as JSON and merge the resulting map with the cache map, ignoring non-json bodies.
        # cache is a field exposed by OTTL that is a temporary storage place for complex operations.
        - merge_maps(cache, ParseJSON(body), "upsert") where IsMatch(body, "^\\{") 
          
        # Set attributes using the values merged into cache.
        # If the attribute doesn't exist in cache then nothing happens.
        - set(attributes["attr1"], cache["attr1"])
        - set(attributes["attr2"], cache["attr2"])
        
        # To access nested maps you can chain index ([]) operations.
        # If nested or attr3 do no exist in cache then nothing happens.
        - set(attributes["nested.attr3"], cache["nested"]["attr3"])

Get Severity of an Unstructured Log Body

Given the following unstructured log body

[2023-09-22 07:38:22,570] INFO [Something]: some interesting log

You can find the severity using IsMatch:

transform:
  error_mode: ignore
  log_statements:
    - context: log
      statements:
        - set(severity_number, SEVERITY_NUMBER_INFO) where IsString(body) and IsMatch(body, "\\sINFO\\s")
        - set(severity_number, SEVERITY_NUMBER_WARN) where IsString(body) and IsMatch(body, "\\sWARN\\s")
        - set(severity_number, SEVERITY_NUMBER_ERROR) where IsString(body) and IsMatch(body, "\\sERROR\\s")

Copy attributes matching regular expression to a separate location

If you want to move resource attributes, which keys are matching the regular expression pod_labels_.* to a new attribute location kubernetes.labels, use the following configuration:

transform:
  error_mode: ignore
  trace_statements:
    - context: resource
      statements:
        - set(cache["attrs"], attributes)
        - keep_matching_keys(cache["attrs"], "pod_labels_.*")
        - set(attributes["kubernetes.labels"], cache["attrs"])

The configuration can be used also with delete_matching_keys() to copy the attributes that do not match the regular expression.

Troubleshooting

When using OTTL you can enable debug logging in the collector to print out useful information, such as the current Statement and the current TransformContext, to help you troubleshoot why a statement is not behaving as you expect. This feature is very verbose, but provides you an accurate view into how OTTL views the underlying data.

receivers:
  filelog:
    start_at: beginning
    include: [ test.log ]

processors:
  transform:
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(resource.attributes["test"], "pass")
          - set(instrumentation_scope.attributes["test"], ["pass"])
          - set(attributes["test"], true)

exporters:
  debug:

service:
  telemetry:
    logs:
      level: debug
  pipelines:
    logs:
      receivers:
        - filelog
      processors:
        - transform
      exporters:
        - debug
2024-05-29T16:38:09.600-0600    debug   [email protected]/parser.go:265     initial TransformContext        {"kind": "processor", "name": "transform", "pipeline": "logs", "TransformContext": {"resource": {"attributes": {}, "dropped_attribute_count": 0}, "scope": {"attributes": {}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log"}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}}
2024-05-29T16:38:09.600-0600    debug   [email protected]/parser.go:268     TransformContext after statement execution      {"kind": "processor", "name": "transform", "pipeline": "logs", "statement": "set(resource.attributes[\"test\"], \"pass\")", "condition matched": true, "TransformContext": {"resource": {"attributes": {"test": "pass"}, "dropped_attribute_count": 0}, "scope": {"attributes": {}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log"}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}}
2024-05-29T16:38:09.600-0600    debug   [email protected]/parser.go:268     TransformContext after statement execution      {"kind": "processor", "name": "transform", "pipeline": "logs", "statement": "set(instrumentation_scope.attributes[\"test\"], [\"pass\"])", "condition matched": true, "TransformContext": {"resource": {"attributes": {"test": "pass"}, "dropped_attribute_count": 0}, "scope": {"attributes": {"test": ["pass"]}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log"}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}}
2024-05-29T16:38:09.601-0600    debug   [email protected]/parser.go:268     TransformContext after statement execution      {"kind": "processor", "name": "transform", "pipeline": "logs", "statement": "set(attributes[\"test\"], true)", "condition matched": true, "TransformContext": {"resource": {"attributes": {"test": "pass"}, "dropped_attribute_count": 0}, "scope": {"attributes": {"test": ["pass"]}, "dropped_attribute_count": 0, "name": "", "version": ""}, "log_record": {"attributes": {"log.file.name": "test.log", "test": true}, "body": "test", "dropped_attribute_count": 0, "flags": 0, "observed_time_unix_nano": 1717022289500721000, "severity_number": 0, "severity_text": "", "span_id": "", "time_unix_nano": 0, "trace_id": ""}, "cache": {}}}

Contributing

See CONTRIBUTING.md.

Warnings

The transform processor uses the OpenTelemetry Transformation Language (OTTL) which allows users to modify all aspects of their telemetry. Some specific risks are listed below, but this is not an exhaustive list. In general, understand your data before using the transform processor.

  • Unsound Transformations: Several Metric-only functions allow you to transform one metric data type to another or create new metrics from an existing metrics. Transformations between metric data types are not defined in the metrics data model. These functions have the expectation that you understand the incoming data and know that it can be meaningfully converted to a new metric data type or can meaningfully be used to create new metrics.
    • Although the OTTL allows the set function to be used with metric.data_type, its implementation in the transform processor is NOOP. To modify a data type you must use a function specific to that purpose.
  • Identity Conflict: Transformation of metrics have the potential to affect the identity of a metric leading to an Identity Crisis. Be especially cautious when transforming metric name and when reducing/changing existing attributes. Adding new attributes is safe.
  • Orphaned Telemetry: The processor allows you to modify span_id, trace_id, and parent_span_id for traces and span_id, and trace_id logs. Modifying these fields could lead to orphaned spans or logs.

Feature Gate

transform.flatten.logs

The transform.flatten.logs feature gate enables the flatten_data configuration option (default false). With flatten_data: true, the processor provides each log record with a distinct copy of its resource and scope. Then, after applying all transformations, the log records are regrouped by resource and scope.

This option is useful when applying transformations which alter the resource or scope. e.g. set(resource.attributes["to"], attributes["from"]), which may otherwise result in unexpected behavior. Using this option typically incurs a performance penalty as the processor must compute many hashes and create copies of resource and scope information for every log record.

The feature is currently only available for log processing.

Example Usage

config.yaml:

transform:
  flatten_data: true
  log_statements:
    - context: log
      statements:
        - set(resource.attributes["to"], attributes["from"])

Run collector: ./otelcol --config config.yaml --feature-gates=transform.flatten.logs