From 89106b16936dd52d853ef66edb0ea6b17dcb4e47 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 5 Nov 2024 16:32:19 +0100 Subject: [PATCH] feat: make condition field avilable along with other rc config opts Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/subscription_types.go | 25 +++++- api/telemetry/v1alpha1/tenant_types.go | 24 ++++-- .../v1alpha1/zz_generated.deepcopy.go | 79 ++++++++++++++++--- ...emetry.kube-logging.dev_subscriptions.yaml | 31 +++++++- .../telemetry.kube-logging.dev_tenants.yaml | 27 ++++++- ...emetry.kube-logging.dev_subscriptions.yaml | 31 +++++++- .../telemetry.kube-logging.dev_tenants.yaml | 27 ++++++- docs/demos/loki/manifests.yaml | 8 +- docs/demos/openobserve/demo.yaml | 8 +- .../fluent-forward/telemetry-controller.yaml | 4 +- .../simple-demo-with-secretref/pipeline.yaml | 4 +- .../one_tenant_two_subscriptions.yaml | 8 +- .../tenant-to-tenant-routing/pipeline.yaml | 12 ++- .../two_tenants_one_subscription_each.yaml | 8 +- .../one_tenant_two_subscriptions.yaml | 8 +- .../telemetry/controller_integration_test.go | 12 ++- .../otel_col_conf_test_fixtures/complex.yaml | 2 +- .../telemetry/otel_conf_gen/otel_conf_gen.go | 6 +- .../otel_conf_gen/otel_conf_gen_test.go | 68 ++++++++++++---- .../components/connector/routing_connector.go | 73 +++++++++++++---- .../processor/transform_processor.go | 2 +- 21 files changed, 381 insertions(+), 86 deletions(-) diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index aa69318c..bf1b64bd 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -18,10 +18,31 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// Routing represents a Routing connector statement, which routes logs, +// metrics or traces based on resource attributes to specific pipelines +// using OTTL statements as routing conditions. +// ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/routingconnector +type Routing struct { + // +kubebuilder:validation:Enum:=resource;log;request + + // Context specifies the context in which the statement will be evaluated + // valid options are: resource, log, request; (default: resource) + Context string `json:"context,omitempty"` + + // Only one of Condition or Statement should be set + Condition string `json:"condition,omitempty"` + Statement string `json:"statement,omitempty"` +} + +// RouteOpts defines the routing options for a subscription +type RouteOpts struct { + Routing Routing `json:"routing,omitempty"` +} + // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { - Outputs []NamespacedName `json:"outputs,omitempty"` - OTTL string `json:"ottl,omitempty"` + RouteOpts RouteOpts `json:"routeOpts"` + Outputs []NamespacedName `json:"outputs,omitempty"` } // SubscriptionStatus defines the observed state of Subscription diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index 8fa5280c..d5d92cf8 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -18,9 +18,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Statement represents a single statement in a Transform processor +// TransformStatement represents a single statement in a Transform processor // ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor -type Statement struct { +type TransformStatement struct { // +kubebuilder:validation:Enum:=resource;scope;span;spanevent;metric;datapoint;log Context string `json:"context,omitempty"` Conditions []string `json:"conditions,omitempty"` @@ -38,9 +38,22 @@ type Transform struct { // vaid options are: ignore, silent, propagate; (default: propagate) ErrorMode string `json:"errorMode,omitempty"` - TraceStatements []Statement `json:"traceStatements,omitempty"` - MetricStatements []Statement `json:"metricStatements,omitempty"` - LogStatements []Statement `json:"logStatements,omitempty"` + TraceStatements []TransformStatement `json:"traceStatements,omitempty"` + MetricStatements []TransformStatement `json:"metricStatements,omitempty"` + LogStatements []TransformStatement `json:"logStatements,omitempty"` +} + +// RouteConfig defines the routing configuration for a tenant +// it will be used to generate routing connectors +type RouteConfig struct { + DefaultPipelines []string `json:"defaultPipelines,omitempty"` // TODO: Provide users with a guide to determine generated pipeline names + + // +kubebuilder:validation:Enum:=ignore;silent;propagate + + // ErrorMode specifies how errors are handled while processing a statement + // vaid options are: ignore, silent, propagate; (default: propagate) + ErrorMode string `json:"errorMode,omitempty"` + MatchOnce bool `json:"matchOnce,omitempty"` } // TenantSpec defines the desired state of Tenant @@ -48,6 +61,7 @@ type TenantSpec struct { SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"` LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"` Transform Transform `json:"transform,omitempty"` + RouteConfig RouteConfig `json:"routeConfig,omitempty"` } // TenantStatus defines the observed state of Tenant diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index e7665846..de7dc133 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -645,26 +645,52 @@ func (in *QueueSettings) DeepCopy() *QueueSettings { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Statement) DeepCopyInto(out *Statement) { +func (in *RouteConfig) DeepCopyInto(out *RouteConfig) { *out = *in - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions + if in.DefaultPipelines != nil { + in, out := &in.DefaultPipelines, &out.DefaultPipelines *out = make([]string, len(*in)) copy(*out, *in) } - if in.Statements != nil { - in, out := &in.Statements, &out.Statements - *out = make([]string, len(*in)) - copy(*out, *in) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RouteConfig. +func (in *RouteConfig) DeepCopy() *RouteConfig { + if in == nil { + return nil + } + out := new(RouteConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RouteOpts) DeepCopyInto(out *RouteOpts) { + *out = *in + out.Routing = in.Routing +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RouteOpts. +func (in *RouteOpts) DeepCopy() *RouteOpts { + if in == nil { + return nil } + out := new(RouteOpts) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Routing) DeepCopyInto(out *Routing) { + *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Statement. -func (in *Statement) DeepCopy() *Statement { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Routing. +func (in *Routing) DeepCopy() *Routing { if in == nil { return nil } - out := new(Statement) + out := new(Routing) in.DeepCopyInto(out) return out } @@ -731,6 +757,7 @@ func (in *SubscriptionList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SubscriptionSpec) DeepCopyInto(out *SubscriptionSpec) { *out = *in + out.RouteOpts = in.RouteOpts if in.Outputs != nil { in, out := &in.Outputs, &out.Outputs *out = make([]NamespacedName, len(*in)) @@ -892,6 +919,7 @@ func (in *TenantSpec) DeepCopyInto(out *TenantSpec) { } } in.Transform.DeepCopyInto(&out.Transform) + in.RouteConfig.DeepCopyInto(&out.RouteConfig) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TenantSpec. @@ -954,21 +982,21 @@ func (in *Transform) DeepCopyInto(out *Transform) { *out = *in if in.TraceStatements != nil { in, out := &in.TraceStatements, &out.TraceStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.MetricStatements != nil { in, out := &in.MetricStatements, &out.MetricStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.LogStatements != nil { in, out := &in.LogStatements, &out.LogStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -984,3 +1012,28 @@ func (in *Transform) DeepCopy() *Transform { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TransformStatement) DeepCopyInto(out *TransformStatement) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Statements != nil { + in, out := &in.Statements, &out.Statements + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TransformStatement. +func (in *TransformStatement) DeepCopy() *TransformStatement { + if in == nil { + return nil + } + out := new(TransformStatement) + in.DeepCopyInto(out) + return out +} diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml index 42b0dda0..b4c4012c 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,8 +51,6 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - ottl: - type: string outputs: items: properties: @@ -65,6 +63,35 @@ spec: - namespace type: object type: array + routeOpts: + description: RouteOpts defines the routing options for a subscription + properties: + routing: + description: |- + Routing represents a Routing connector statement, which routes logs, + metrics or traces based on resource attributes to specific pipelines + using OTTL statements as routing conditions. + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/routingconnector + properties: + condition: + description: Only one of Condition or Statement should be + set + type: string + context: + description: |- + Context specifies the context in which the statement will be evaluated + valid options are: resource, log, request; (default: resource) + enum: + - resource + - log + - request + type: string + statement: + type: string + type: object + type: object + required: + - routeOpts type: object status: description: SubscriptionStatus defines the observed state of Subscription diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index f61b34bd..ab253d49 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,27 @@ spec: type: object x-kubernetes-map-type: atomic type: array + routeConfig: + description: |- + RouteConfig defines the routing configuration for a tenant + it will be used to generate routing connectors + properties: + defaultPipelines: + items: + type: string + type: array + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + matchOnce: + type: boolean + type: object subscriptionNamespaceSelectors: items: description: |- @@ -172,7 +193,7 @@ spec: logStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -198,7 +219,7 @@ spec: metricStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -227,7 +248,7 @@ spec: traceStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index 42b0dda0..b4c4012c 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,8 +51,6 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - ottl: - type: string outputs: items: properties: @@ -65,6 +63,35 @@ spec: - namespace type: object type: array + routeOpts: + description: RouteOpts defines the routing options for a subscription + properties: + routing: + description: |- + Routing represents a Routing connector statement, which routes logs, + metrics or traces based on resource attributes to specific pipelines + using OTTL statements as routing conditions. + ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/connector/routingconnector + properties: + condition: + description: Only one of Condition or Statement should be + set + type: string + context: + description: |- + Context specifies the context in which the statement will be evaluated + valid options are: resource, log, request; (default: resource) + enum: + - resource + - log + - request + type: string + statement: + type: string + type: object + type: object + required: + - routeOpts type: object status: description: SubscriptionStatus defines the observed state of Subscription diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index f61b34bd..ab253d49 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,27 @@ spec: type: object x-kubernetes-map-type: atomic type: array + routeConfig: + description: |- + RouteConfig defines the routing configuration for a tenant + it will be used to generate routing connectors + properties: + defaultPipelines: + items: + type: string + type: array + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + matchOnce: + type: boolean + type: object subscriptionNamespaceSelectors: items: description: |- @@ -172,7 +193,7 @@ spec: logStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -198,7 +219,7 @@ spec: metricStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -227,7 +248,7 @@ spec: traceStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: diff --git a/docs/demos/loki/manifests.yaml b/docs/demos/loki/manifests.yaml index 45ac39ed..99147534 100644 --- a/docs/demos/loki/manifests.yaml +++ b/docs/demos/loki/manifests.yaml @@ -59,7 +59,9 @@ metadata: name: all-logs namespace: tenant-demo-1 spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: loki namespace: collector @@ -86,7 +88,9 @@ metadata: name: all-logs namespace: tenant-demo-2 spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: loki namespace: collector diff --git a/docs/demos/openobserve/demo.yaml b/docs/demos/openobserve/demo.yaml index 23cb08ec..7defbb3b 100644 --- a/docs/demos/openobserve/demo.yaml +++ b/docs/demos/openobserve/demo.yaml @@ -40,7 +40,9 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-openobserve namespace: collector @@ -51,7 +53,9 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-openobserve namespace: collector diff --git a/docs/examples/fluent-forward/telemetry-controller.yaml b/docs/examples/fluent-forward/telemetry-controller.yaml index 9f5338cb..887fd06a 100644 --- a/docs/examples/fluent-forward/telemetry-controller.yaml +++ b/docs/examples/fluent-forward/telemetry-controller.yaml @@ -29,7 +29,9 @@ metadata: name: all-logs namespace: default spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: fluent namespace: default diff --git a/docs/examples/simple-demo-with-secretref/pipeline.yaml b/docs/examples/simple-demo-with-secretref/pipeline.yaml index 2cc20426..fe760e06 100644 --- a/docs/examples/simple-demo-with-secretref/pipeline.yaml +++ b/docs/examples/simple-demo-with-secretref/pipeline.yaml @@ -41,7 +41,9 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: "route()" + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-1 namespace: collector diff --git a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml index d78b78a1..5dc66da2 100644 --- a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml +++ b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml @@ -40,7 +40,9 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: "route()" + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-1 namespace: collector @@ -51,7 +53,9 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: "route()" + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-2 namespace: collector diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index e89a36b0..77de5168 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -34,7 +34,9 @@ metadata: name: shared namespace: shared spec: - ottl: route() + routeOpts: + routing: + statement: route() outputs: - name: openobserve-shared namespace: shared @@ -84,7 +86,9 @@ metadata: name: database namespace: database spec: - ottl: route() + routeOpts: + routing: + statement: route() outputs: - name: openobserve-database namespace: database @@ -134,7 +138,9 @@ metadata: name: web namespace: web spec: - ottl: route() + routeOpts: + routing: + statement: route() outputs: - name: openobserve-web namespace: web diff --git a/docs/examples/two_tenants_one_subscription_each.yaml b/docs/examples/two_tenants_one_subscription_each.yaml index 44bb7dfd..8b565d8b 100644 --- a/docs/examples/two_tenants_one_subscription_each.yaml +++ b/docs/examples/two_tenants_one_subscription_each.yaml @@ -71,7 +71,9 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns-1 spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-1 namespace: collector @@ -82,7 +84,9 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns-2 spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-2 namespace: collector diff --git a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml index 86d5fa68..61cf0f38 100644 --- a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml +++ b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml @@ -40,7 +40,9 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output namespace: collector @@ -51,7 +53,9 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: 'route()' + routeOpts: + routing: + statement: route() outputs: - name: otlp-test-output-2 namespace: collector diff --git a/internal/controller/telemetry/controller_integration_test.go b/internal/controller/telemetry/controller_integration_test.go index 2e0ecc85..c9d74b7e 100644 --- a/internal/controller/telemetry/controller_integration_test.go +++ b/internal/controller/telemetry/controller_integration_test.go @@ -76,13 +76,17 @@ var _ = Describe("Telemetry controller integration test", func() { Namespace: "tenant-1-ctrl", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-1", Namespace: "collector", }, }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, { @@ -91,13 +95,17 @@ var _ = Describe("Telemetry controller integration test", func() { Namespace: "tenant-2-all", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", Namespace: "collector", }, }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, } diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index b13610b9..c978b695 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -51,7 +51,7 @@ connectors: statement: route() - pipelines: - logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-2 - statement: 'route() ' + statement: route() routing/tenant_example-tenant-b_subscriptions: table: - pipelines: diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 507bc0f3..f67085cd 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -116,12 +116,12 @@ func (cfgInput *OtelColConfigInput) generateReceivers() map[string]any { return receivers } -func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { +func (cfgInput *OtelColConfigInput) generateConnectors(ctx context.Context) map[string]any { connectors := make(map[string]any) maps.Copy(connectors, connector.GenerateCountConnectors()) for _, tenant := range cfgInput.Tenants { - rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) + rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(ctx, tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) connectors[rc.Name] = rc } @@ -210,7 +210,7 @@ func (cfgInput *OtelColConfigInput) AssembleConfig(ctx context.Context) otelv1be receivers := cfgInput.generateReceivers() - connectors := cfgInput.generateConnectors() + connectors := cfgInput.generateConnectors(ctx) pipelines := cfgInput.generateNamedPipelines() diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 57fc4c12..27fe63d9 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -49,7 +49,6 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-a-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output", @@ -60,6 +59,11 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "collector", }, }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, {Name: "subscription-example-2", Namespace: "example-tenant-a-ns"}: { @@ -68,13 +72,17 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-a-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", Namespace: "collector", }, }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, {Name: "subscription-example-3", Namespace: "example-tenant-b-ns"}: { @@ -83,7 +91,6 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-b-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", @@ -94,6 +101,11 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "collector", }, }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, } @@ -372,12 +384,19 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te Name: "subsA", Namespace: "nsA", }, - Spec: v1alpha1.SubscriptionSpec{Outputs: []v1alpha1.NamespacedName{ - { - Namespace: "xy", - Name: "zq", + Spec: v1alpha1.SubscriptionSpec{ + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: `set(attributes["subscription"], "subscriptionA")`, + }, }, - }, OTTL: `set(attributes["subscription"], "subscriptionA")`}, + }, }, { Name: "subsB", @@ -387,12 +406,19 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te Name: "subsB", Namespace: "nsA", }, - Spec: v1alpha1.SubscriptionSpec{Outputs: []v1alpha1.NamespacedName{ - { - Namespace: "xy", - Name: "zq", + Spec: v1alpha1.SubscriptionSpec{ + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, + }, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: `set(attributes["subscription"], "subscriptionB") `, + }, }, - }, OTTL: `set(attributes["subscription"], "subscriptionB")`}, + }, }, }, }, @@ -430,7 +456,7 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te cfgInput := &OtelColConfigInput{ Subscriptions: tt.fields.Subscriptions, } - got := connector.GenerateRoutingConnectorForTenantsSubscriptions(tt.args.tenantName, tt.args.subscriptionNames, cfgInput.Subscriptions) + got := connector.GenerateRoutingConnectorForTenantsSubscriptions(context.TODO(), tt.args.tenantName, v1alpha1.RouteConfig{}, tt.args.subscriptionNames, cfgInput.Subscriptions) assert.Equal(t, got, tt.want) }) } @@ -493,7 +519,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Spec: v1alpha1.TenantSpec{ Transform: v1alpha1.Transform{ Name: "transform1", - LogStatements: []v1alpha1.Statement{ + LogStatements: []v1alpha1.TransformStatement{ { Statements: []string{`set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))"))`}, }, @@ -538,8 +564,12 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Namespace: "ns1", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{}, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, { @@ -551,8 +581,12 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Namespace: "ns2", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", Outputs: []v1alpha1.NamespacedName{}, + RouteOpts: v1alpha1.RouteOpts{ + Routing: v1alpha1.Routing{ + Statement: "route()", + }, + }, }, }, }, diff --git a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go index f98547e8..b8c50477 100644 --- a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go +++ b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go @@ -15,6 +15,8 @@ package connector import ( + "context" + "errors" "fmt" "strings" @@ -22,11 +24,22 @@ import ( "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/utils" otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type RCContext string + +const ( + RCcontextResource RCContext = "resource" + RCcontextLog RCContext = "log" + RCcontextRequest RCContext = "request" ) type RoutingConnectorTableItem struct { - Statement string `json:"statement"` - Pipelines []string `json:"pipelines"` + Context RCContext `json:"context,omitempty"` + Statement string `json:"statement,omitempty"` + Condition string `json:"condition,omitempty"` + Pipelines []string `json:"pipelines,omitempty"` } type RoutingConnector struct { @@ -41,29 +54,55 @@ func (rc *RoutingConnector) AddRoutingConnectorTableElem(newTableItem RoutingCon rc.Table = append(rc.Table, newTableItem) } -func newRoutingConnector(name string) RoutingConnector { - result := RoutingConnector{} - result.Name = name - - return result +func newRoutingConnector(name string, tenantRouteConfig v1alpha1.RouteConfig) RoutingConnector { + return RoutingConnector{ + Name: name, + DefaultPipelines: tenantRouteConfig.DefaultPipelines, + ErrorMode: components.ErrorMode(tenantRouteConfig.ErrorMode), + MatchOnce: tenantRouteConfig.MatchOnce, + } } -func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) RoutingConnectorTableItem { +func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) (RoutingConnectorTableItem, error) { + if subscription.Spec.RouteOpts.Routing.Statement == "" && subscription.Spec.RouteOpts.Routing.Condition == "" { + return RoutingConnectorTableItem{}, errors.New("subscription must have either a statement or a condition specified") + } + if subscription.Spec.RouteOpts.Routing.Statement != "" && subscription.Spec.RouteOpts.Routing.Condition != "" { + return RoutingConnectorTableItem{}, errors.New("subscription must have either a statement or a condition, not both") + } + pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name) - newItem := RoutingConnectorTableItem{ - Statement: fmt.Sprintf("%s%s", subscription.Spec.OTTL, strings.Repeat(" ", index)), - Pipelines: []string{pipelineName}, + var tableItem RoutingConnectorTableItem + if subscription.Spec.RouteOpts.Routing.Condition == "" && subscription.Spec.RouteOpts.Routing.Statement != "" { + tableItem = RoutingConnectorTableItem{ + Context: RCContext(subscription.Spec.RouteOpts.Routing.Context), + Statement: fmt.Sprintf("%s%s", subscription.Spec.RouteOpts.Routing.Statement, strings.Repeat(" ", index)), + Pipelines: []string{pipelineName}, + } + } + if subscription.Spec.RouteOpts.Routing.Condition != "" && subscription.Spec.RouteOpts.Routing.Statement == "" { + tableItem = RoutingConnectorTableItem{ + Context: RCContext(subscription.Spec.RouteOpts.Routing.Context), + Condition: fmt.Sprintf("%s%s", subscription.Spec.RouteOpts.Routing.Condition, strings.Repeat(" ", index)), + Pipelines: []string{pipelineName}, + } } - return newItem + return tableItem, nil } -func GenerateRoutingConnectorForTenantsSubscriptions(tenantName string, subscriptionNames []v1alpha1.NamespacedName, subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName)) +func GenerateRoutingConnectorForTenantsSubscriptions(ctx context.Context, tenantName string, tenantRouteConfig v1alpha1.RouteConfig, subscriptionNames []v1alpha1.NamespacedName, subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription) RoutingConnector { + logger := log.FromContext(ctx) + + rc := newRoutingConnector(fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName), tenantRouteConfig) utils.SortNamespacedNames(subscriptionNames) for index, subscriptionRef := range subscriptionNames { subscription := subscriptions[subscriptionRef] - tableItem := buildRoutingTableItemForSubscription(tenantName, subscription, index) + tableItem, err := buildRoutingTableItemForSubscription(tenantName, subscription, index) + if err != nil { + logger.Error(err, "failed to build routing table item for subscription", "subscription", subscription.NamespacedName().String()) + } + rc.AddRoutingConnectorTableElem(tableItem) } @@ -71,7 +110,7 @@ func GenerateRoutingConnectorForTenantsSubscriptions(tenantName string, subscrip } func GenerateRoutingConnectorForSubscriptionsOutputs(subscriptionRef v1alpha1.NamespacedName, outputNames []v1alpha1.NamespacedName) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/subscription_%s_%s_outputs", subscriptionRef.Namespace, subscriptionRef.Name)) + rc := newRoutingConnector(fmt.Sprintf("routing/subscription_%s_%s_outputs", subscriptionRef.Namespace, subscriptionRef.Name), v1alpha1.RouteConfig{}) utils.SortNamespacedNames(outputNames) pipelines := []string{} for _, outputRef := range outputNames { @@ -88,7 +127,7 @@ func GenerateRoutingConnectorForSubscriptionsOutputs(subscriptionRef v1alpha1.Na } func GenerateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/bridge_%s", bridge.Name)) + rc := newRoutingConnector(fmt.Sprintf("routing/bridge_%s", bridge.Name), v1alpha1.RouteConfig{}) tableItem := RoutingConnectorTableItem{ Statement: bridge.Spec.OTTL, diff --git a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go index 1a554af8..23d52a68 100644 --- a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go +++ b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go @@ -52,7 +52,7 @@ func GenerateTransformProcessorForTenantPipeline(tenantName string, pipeline *ot } } -func convertAPIStatements(APIStatements []v1alpha1.Statement) []TransformProcessorStatement { +func convertAPIStatements(APIStatements []v1alpha1.TransformStatement) []TransformProcessorStatement { statements := make([]TransformProcessorStatement, len(APIStatements)) for i, statement := range APIStatements { statements[i] = TransformProcessorStatement{