From f51a0cdabccf3f17f210391b6ee5b485b806bdd9 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Tue, 5 Nov 2024 13:22:36 +0100 Subject: [PATCH 1/2] fix: minor fixes Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/subscription_types.go | 1 - .../crds/telemetry.kube-logging.dev_subscriptions.yaml | 2 -- config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml | 2 -- docs/examples/fluent-forward/telemetry-controller.yaml | 1 - .../components/extension/bearer_token_auth_extension.go | 2 +- 5 files changed, 1 insertion(+), 7 deletions(-) diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index 80078fc1..aa69318c 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -22,7 +22,6 @@ import ( type SubscriptionSpec struct { Outputs []NamespacedName `json:"outputs,omitempty"` OTTL string `json:"ottl,omitempty"` - Debug bool `json:"debug,omitempty"` } // SubscriptionStatus defines the observed state of Subscription diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml index 58b26cb7..42b0dda0 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,8 +51,6 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - debug: - type: boolean ottl: type: string outputs: diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index 58b26cb7..42b0dda0 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,8 +51,6 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - debug: - type: boolean ottl: type: string outputs: diff --git a/docs/examples/fluent-forward/telemetry-controller.yaml b/docs/examples/fluent-forward/telemetry-controller.yaml index 58664935..9f5338cb 100644 --- a/docs/examples/fluent-forward/telemetry-controller.yaml +++ b/docs/examples/fluent-forward/telemetry-controller.yaml @@ -29,7 +29,6 @@ metadata: name: all-logs namespace: default spec: - debug: true ottl: 'route()' outputs: - name: fluent diff --git a/internal/controller/telemetry/pipeline/components/extension/bearer_token_auth_extension.go b/internal/controller/telemetry/pipeline/components/extension/bearer_token_auth_extension.go index aef7e687..fda82bff 100644 --- a/internal/controller/telemetry/pipeline/components/extension/bearer_token_auth_extension.go +++ b/internal/controller/telemetry/pipeline/components/extension/bearer_token_auth_extension.go @@ -25,7 +25,7 @@ type BearerTokenAuthExtensionConfig struct { func GenerateBearerAuthExtensionsForOutput(output components.OutputWithSecretData) BearerTokenAuthExtensionConfig { var effectiveTokenField string if output.Output.Spec.Authentication.BearerAuth.TokenField != "" { - effectiveTokenField = output.Output.Spec.Authentication.BasicAuth.UsernameField + effectiveTokenField = output.Output.Spec.Authentication.BearerAuth.TokenField } else { effectiveTokenField = defaultBearerAuthTokenField } From 6cd04720977e5d6e6ee6747f0fd980575a8bd804 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Wed, 6 Nov 2024 17:15:10 +0100 Subject: [PATCH 2/2] feat: drop ottl fields for subs and bridges for condition Also added a new e2e test-case for bridges. Signed-off-by: Bence Csati --- api/telemetry/v1alpha1/bridge_types.go | 6 +- api/telemetry/v1alpha1/subscription_types.go | 4 +- api/telemetry/v1alpha1/tenant_types.go | 24 ++- .../v1alpha1/zz_generated.deepcopy.go | 49 +++-- .../telemetry.kube-logging.dev_bridges.yaml | 6 +- ...emetry.kube-logging.dev_subscriptions.yaml | 4 +- .../telemetry.kube-logging.dev_tenants.yaml | 27 ++- .../telemetry.kube-logging.dev_bridges.yaml | 6 +- ...emetry.kube-logging.dev_subscriptions.yaml | 4 +- .../telemetry.kube-logging.dev_tenants.yaml | 27 ++- config/samples/telemetry_v1alpha1_bridge.yaml | 2 +- .../telemetry_v1alpha1_subscription.yaml | 1 + docs/demos/loki/manifests.yaml | 4 +- docs/demos/openobserve/demo.yaml | 4 +- .../fluent-forward/telemetry-controller.yaml | 2 +- .../simple-demo-with-secretref/pipeline.yaml | 2 +- .../simple-demo-with-secretref/receiver.yaml | 1 + .../one_tenant_two_subscriptions.yaml | 4 +- .../tenant-to-tenant-routing/pipeline.yaml | 10 +- .../two_tenants_one_subscription_each.yaml | 4 +- e2e/e2e_test.sh | 103 ++++++++--- .../one_tenant_two_subscriptions.yaml | 4 +- .../receiver.yaml | 1 + .../tenants_with_bridges/receiver.yaml | 21 +++ .../tenants_with_bridges.yaml | 169 +++++++++++++++++ .../telemetry/controller_integration_test.go | 4 +- .../otel_col_conf_test_fixtures/complex.yaml | 22 ++- .../telemetry/otel_conf_gen/otel_conf_gen.go | 2 +- .../otel_conf_gen/otel_conf_gen_test.go | 54 +++--- .../components/connector/routing_connector.go | 70 ++++---- .../connector/routing_connector_test.go | 170 ++++++++++++++++++ .../processor/transform_processor.go | 2 +- 32 files changed, 657 insertions(+), 156 deletions(-) create mode 100644 e2e/testdata/tenants_with_bridges/receiver.yaml create mode 100644 e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml create mode 100644 internal/controller/telemetry/pipeline/components/connector/routing_connector_test.go diff --git a/api/telemetry/v1alpha1/bridge_types.go b/api/telemetry/v1alpha1/bridge_types.go index 76c8d824..ee061ae3 100644 --- a/api/telemetry/v1alpha1/bridge_types.go +++ b/api/telemetry/v1alpha1/bridge_types.go @@ -20,11 +20,11 @@ import ( // BridgeSpec defines the desired state of Bridge type BridgeSpec struct { - SourceTenant string `json:"sourceTenant,omitempty"` - TargetTenant string `json:"targetTenant,omitempty"` + SourceTenant string `json:"sourceTenant"` + TargetTenant string `json:"targetTenant"` // The OTTL condition which must be satisfied in order to forward telemetry // from the source tenant to the target tenant - OTTL string `json:"ottl,omitempty"` + Condition string `json:"condition"` } // BridgeStatus defines the observed state of Bridge diff --git a/api/telemetry/v1alpha1/subscription_types.go b/api/telemetry/v1alpha1/subscription_types.go index aa69318c..c8ebc2ce 100644 --- a/api/telemetry/v1alpha1/subscription_types.go +++ b/api/telemetry/v1alpha1/subscription_types.go @@ -20,8 +20,8 @@ import ( // SubscriptionSpec defines the desired state of Subscription type SubscriptionSpec struct { - Outputs []NamespacedName `json:"outputs,omitempty"` - OTTL string `json:"ottl,omitempty"` + Condition string `json:"condition"` + Outputs []NamespacedName `json:"outputs,omitempty"` } // SubscriptionStatus defines the observed state of Subscription diff --git a/api/telemetry/v1alpha1/tenant_types.go b/api/telemetry/v1alpha1/tenant_types.go index 8fa5280c..d5d92cf8 100644 --- a/api/telemetry/v1alpha1/tenant_types.go +++ b/api/telemetry/v1alpha1/tenant_types.go @@ -18,9 +18,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// Statement represents a single statement in a Transform processor +// TransformStatement represents a single statement in a Transform processor // ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor -type Statement struct { +type TransformStatement struct { // +kubebuilder:validation:Enum:=resource;scope;span;spanevent;metric;datapoint;log Context string `json:"context,omitempty"` Conditions []string `json:"conditions,omitempty"` @@ -38,9 +38,22 @@ type Transform struct { // vaid options are: ignore, silent, propagate; (default: propagate) ErrorMode string `json:"errorMode,omitempty"` - TraceStatements []Statement `json:"traceStatements,omitempty"` - MetricStatements []Statement `json:"metricStatements,omitempty"` - LogStatements []Statement `json:"logStatements,omitempty"` + TraceStatements []TransformStatement `json:"traceStatements,omitempty"` + MetricStatements []TransformStatement `json:"metricStatements,omitempty"` + LogStatements []TransformStatement `json:"logStatements,omitempty"` +} + +// RouteConfig defines the routing configuration for a tenant +// it will be used to generate routing connectors +type RouteConfig struct { + DefaultPipelines []string `json:"defaultPipelines,omitempty"` // TODO: Provide users with a guide to determine generated pipeline names + + // +kubebuilder:validation:Enum:=ignore;silent;propagate + + // ErrorMode specifies how errors are handled while processing a statement + // vaid options are: ignore, silent, propagate; (default: propagate) + ErrorMode string `json:"errorMode,omitempty"` + MatchOnce bool `json:"matchOnce,omitempty"` } // TenantSpec defines the desired state of Tenant @@ -48,6 +61,7 @@ type TenantSpec struct { SubscriptionNamespaceSelectors []metav1.LabelSelector `json:"subscriptionNamespaceSelectors,omitempty"` LogSourceNamespaceSelectors []metav1.LabelSelector `json:"logSourceNamespaceSelectors,omitempty"` Transform Transform `json:"transform,omitempty"` + RouteConfig RouteConfig `json:"routeConfig,omitempty"` } // TenantStatus defines the observed state of Tenant diff --git a/api/telemetry/v1alpha1/zz_generated.deepcopy.go b/api/telemetry/v1alpha1/zz_generated.deepcopy.go index e7665846..c4c1d15a 100644 --- a/api/telemetry/v1alpha1/zz_generated.deepcopy.go +++ b/api/telemetry/v1alpha1/zz_generated.deepcopy.go @@ -645,26 +645,21 @@ func (in *QueueSettings) DeepCopy() *QueueSettings { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Statement) DeepCopyInto(out *Statement) { +func (in *RouteConfig) DeepCopyInto(out *RouteConfig) { *out = *in - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions - *out = make([]string, len(*in)) - copy(*out, *in) - } - if in.Statements != nil { - in, out := &in.Statements, &out.Statements + if in.DefaultPipelines != nil { + in, out := &in.DefaultPipelines, &out.DefaultPipelines *out = make([]string, len(*in)) copy(*out, *in) } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Statement. -func (in *Statement) DeepCopy() *Statement { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RouteConfig. +func (in *RouteConfig) DeepCopy() *RouteConfig { if in == nil { return nil } - out := new(Statement) + out := new(RouteConfig) in.DeepCopyInto(out) return out } @@ -892,6 +887,7 @@ func (in *TenantSpec) DeepCopyInto(out *TenantSpec) { } } in.Transform.DeepCopyInto(&out.Transform) + in.RouteConfig.DeepCopyInto(&out.RouteConfig) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TenantSpec. @@ -954,21 +950,21 @@ func (in *Transform) DeepCopyInto(out *Transform) { *out = *in if in.TraceStatements != nil { in, out := &in.TraceStatements, &out.TraceStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.MetricStatements != nil { in, out := &in.MetricStatements, &out.MetricStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } if in.LogStatements != nil { in, out := &in.LogStatements, &out.LogStatements - *out = make([]Statement, len(*in)) + *out = make([]TransformStatement, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -984,3 +980,28 @@ func (in *Transform) DeepCopy() *Transform { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TransformStatement) DeepCopyInto(out *TransformStatement) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Statements != nil { + in, out := &in.Statements, &out.Statements + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TransformStatement. +func (in *TransformStatement) DeepCopy() *TransformStatement { + if in == nil { + return nil + } + out := new(TransformStatement) + in.DeepCopyInto(out) + return out +} diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml index 428c8d2a..0e12d330 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_bridges.yaml @@ -51,7 +51,7 @@ spec: spec: description: BridgeSpec defines the desired state of Bridge properties: - ottl: + condition: description: |- The OTTL condition which must be satisfied in order to forward telemetry from the source tenant to the target tenant @@ -60,6 +60,10 @@ spec: type: string targetTenant: type: string + required: + - condition + - sourceTenant + - targetTenant type: object status: description: BridgeStatus defines the observed state of Bridge diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml index 42b0dda0..2ff509f6 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,7 +51,7 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - ottl: + condition: type: string outputs: items: @@ -65,6 +65,8 @@ spec: - namespace type: object type: array + required: + - condition type: object status: description: SubscriptionStatus defines the observed state of Subscription diff --git a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml index f61b34bd..ab253d49 100644 --- a/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml +++ b/charts/telemetry-controller/crds/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,27 @@ spec: type: object x-kubernetes-map-type: atomic type: array + routeConfig: + description: |- + RouteConfig defines the routing configuration for a tenant + it will be used to generate routing connectors + properties: + defaultPipelines: + items: + type: string + type: array + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + matchOnce: + type: boolean + type: object subscriptionNamespaceSelectors: items: description: |- @@ -172,7 +193,7 @@ spec: logStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -198,7 +219,7 @@ spec: metricStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -227,7 +248,7 @@ spec: traceStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: diff --git a/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml index 428c8d2a..0e12d330 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_bridges.yaml @@ -51,7 +51,7 @@ spec: spec: description: BridgeSpec defines the desired state of Bridge properties: - ottl: + condition: description: |- The OTTL condition which must be satisfied in order to forward telemetry from the source tenant to the target tenant @@ -60,6 +60,10 @@ spec: type: string targetTenant: type: string + required: + - condition + - sourceTenant + - targetTenant type: object status: description: BridgeStatus defines the observed state of Bridge diff --git a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml index 42b0dda0..2ff509f6 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_subscriptions.yaml @@ -51,7 +51,7 @@ spec: spec: description: SubscriptionSpec defines the desired state of Subscription properties: - ottl: + condition: type: string outputs: items: @@ -65,6 +65,8 @@ spec: - namespace type: object type: array + required: + - condition type: object status: description: SubscriptionStatus defines the observed state of Subscription diff --git a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml index f61b34bd..ab253d49 100644 --- a/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml +++ b/config/crd/bases/telemetry.kube-logging.dev_tenants.yaml @@ -105,6 +105,27 @@ spec: type: object x-kubernetes-map-type: atomic type: array + routeConfig: + description: |- + RouteConfig defines the routing configuration for a tenant + it will be used to generate routing connectors + properties: + defaultPipelines: + items: + type: string + type: array + errorMode: + description: |- + ErrorMode specifies how errors are handled while processing a statement + vaid options are: ignore, silent, propagate; (default: propagate) + enum: + - ignore + - silent + - propagate + type: string + matchOnce: + type: boolean + type: object subscriptionNamespaceSelectors: items: description: |- @@ -172,7 +193,7 @@ spec: logStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -198,7 +219,7 @@ spec: metricStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: @@ -227,7 +248,7 @@ spec: traceStatements: items: description: |- - Statement represents a single statement in a Transform processor + TransformStatement represents a single statement in a Transform processor ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor properties: conditions: diff --git a/config/samples/telemetry_v1alpha1_bridge.yaml b/config/samples/telemetry_v1alpha1_bridge.yaml index 9bf42a8b..86c6bed3 100644 --- a/config/samples/telemetry_v1alpha1_bridge.yaml +++ b/config/samples/telemetry_v1alpha1_bridge.yaml @@ -11,4 +11,4 @@ metadata: spec: sourceTenant: shared destinationTenant: tenantA - ottl: 'route() where attributes["parsed"]["method"] == "GET"' + condition: 'attributes["parsed"]["method"] == "GET"' diff --git a/config/samples/telemetry_v1alpha1_subscription.yaml b/config/samples/telemetry_v1alpha1_subscription.yaml index 6c40b6f9..4c90968b 100644 --- a/config/samples/telemetry_v1alpha1_subscription.yaml +++ b/config/samples/telemetry_v1alpha1_subscription.yaml @@ -9,6 +9,7 @@ metadata: app.kubernetes.io/created-by: telemetry-controller name: subscription-sample spec: + condition: "true" outputs: - name: output-sample namespace: collector diff --git a/docs/demos/loki/manifests.yaml b/docs/demos/loki/manifests.yaml index 45ac39ed..0d66edff 100644 --- a/docs/demos/loki/manifests.yaml +++ b/docs/demos/loki/manifests.yaml @@ -59,7 +59,7 @@ metadata: name: all-logs namespace: tenant-demo-1 spec: - ottl: 'route()' + condition: "true" outputs: - name: loki namespace: collector @@ -86,7 +86,7 @@ metadata: name: all-logs namespace: tenant-demo-2 spec: - ottl: 'route()' + condition: "true" outputs: - name: loki namespace: collector diff --git a/docs/demos/openobserve/demo.yaml b/docs/demos/openobserve/demo.yaml index 23cb08ec..4c283562 100644 --- a/docs/demos/openobserve/demo.yaml +++ b/docs/demos/openobserve/demo.yaml @@ -40,7 +40,7 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-openobserve namespace: collector @@ -51,7 +51,7 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-openobserve namespace: collector diff --git a/docs/examples/fluent-forward/telemetry-controller.yaml b/docs/examples/fluent-forward/telemetry-controller.yaml index 9f5338cb..06a7f2b1 100644 --- a/docs/examples/fluent-forward/telemetry-controller.yaml +++ b/docs/examples/fluent-forward/telemetry-controller.yaml @@ -29,7 +29,7 @@ metadata: name: all-logs namespace: default spec: - ottl: 'route()' + condition: "true" outputs: - name: fluent namespace: default diff --git a/docs/examples/simple-demo-with-secretref/pipeline.yaml b/docs/examples/simple-demo-with-secretref/pipeline.yaml index 2cc20426..9dc457d4 100644 --- a/docs/examples/simple-demo-with-secretref/pipeline.yaml +++ b/docs/examples/simple-demo-with-secretref/pipeline.yaml @@ -41,7 +41,7 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: "route()" + condition: "true" outputs: - name: otlp-test-output-1 namespace: collector diff --git a/docs/examples/simple-demo-with-secretref/receiver.yaml b/docs/examples/simple-demo-with-secretref/receiver.yaml index 0fdf9333..9c2e5dec 100644 --- a/docs/examples/simple-demo-with-secretref/receiver.yaml +++ b/docs/examples/simple-demo-with-secretref/receiver.yaml @@ -4,6 +4,7 @@ metadata: name: receiver-otelcol namespace: receiver spec: + managementState: managed image: otel/opentelemetry-collector-contrib:0.112.0 volumes: - name: certs-volume diff --git a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml index d78b78a1..c248017a 100644 --- a/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml +++ b/docs/examples/simple-demo/one_tenant_two_subscriptions.yaml @@ -40,7 +40,7 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: "route()" + condition: "true" outputs: - name: otlp-test-output-1 namespace: collector @@ -51,7 +51,7 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: "route()" + condition: "true" outputs: - name: otlp-test-output-2 namespace: collector diff --git a/docs/examples/tenant-to-tenant-routing/pipeline.yaml b/docs/examples/tenant-to-tenant-routing/pipeline.yaml index e89a36b0..620a1367 100644 --- a/docs/examples/tenant-to-tenant-routing/pipeline.yaml +++ b/docs/examples/tenant-to-tenant-routing/pipeline.yaml @@ -34,7 +34,7 @@ metadata: name: shared namespace: shared spec: - ottl: route() + condition: "true" outputs: - name: openobserve-shared namespace: shared @@ -76,7 +76,7 @@ metadata: spec: sourceTenant: shared targetTenant: database - ottl: 'route() where attributes["parsed"]["method"] == "GET"' + condition: 'attributes["parsed"]["method"] == "GET"' --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Subscription @@ -84,7 +84,7 @@ metadata: name: database namespace: database spec: - ottl: route() + condition: "true" outputs: - name: openobserve-database namespace: database @@ -126,7 +126,7 @@ metadata: spec: sourceTenant: shared targetTenant: web - ottl: 'route() where attributes["parsed"]["method"] == "PUT"' + condition: 'attributes["parsed"]["method"] == "PUT"' --- apiVersion: telemetry.kube-logging.dev/v1alpha1 kind: Subscription @@ -134,7 +134,7 @@ metadata: name: web namespace: web spec: - ottl: route() + condition: "true" outputs: - name: openobserve-web namespace: web diff --git a/docs/examples/two_tenants_one_subscription_each.yaml b/docs/examples/two_tenants_one_subscription_each.yaml index 44bb7dfd..a0ec03c7 100644 --- a/docs/examples/two_tenants_one_subscription_each.yaml +++ b/docs/examples/two_tenants_one_subscription_each.yaml @@ -71,7 +71,7 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns-1 spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-test-output-1 namespace: collector @@ -82,7 +82,7 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns-2 spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-test-output-2 namespace: collector diff --git a/e2e/e2e_test.sh b/e2e/e2e_test.sh index ca063ae2..a90e9bf5 100755 --- a/e2e/e2e_test.sh +++ b/e2e/e2e_test.sh @@ -5,23 +5,22 @@ set -o xtrace function main() { - kubectl get namespace telemetry-controller-system || kubectl create namespace telemetry-controller-system - kubectl config set-context --current --namespace=telemetry-controller-system + setup - load_images + test_one_tenant_two_subscriptions + test_tenants_with_bridges - helm_deploy_telemetry_controller - - helm_deploy_log_generator + echo "E2E (helm) test: PASSED" +} - deploy_test_assets +function setup() +{ + kubectl get namespace telemetry-controller-system || kubectl create namespace telemetry-controller-system + kubectl config set-context --current --namespace=telemetry-controller-system - # Check for received messages - subscription-sample-1 - # NOTE: We should not use grep -q, because it causes a SIGPIPE for kubectl and we have -o pipefail - check_subscription_logs "subscription-sample-1" - check_subscription_logs "subscription-sample-2" + load_images - echo "E2E (helm) test: PASSED" + helm_install_telemetry_controller } function load_images() @@ -32,50 +31,98 @@ function load_images() done } -function helm_deploy_telemetry_controller() +function helm_install_telemetry_controller() { helm upgrade --install \ --debug \ --wait \ --create-namespace \ - -f e2e/values.yaml \ + -f "e2e/values.yaml" \ telemetry-controller \ "charts/telemetry-controller/" } -function helm_deploy_log_generator() +function test_one_tenant_two_subscriptions() +{ + helm_install_log_generator_to_ns "example-tenant-ns" + + deploy_test_assets "e2e/testdata/one_tenant_two_subscriptions/" + + # Check for received messages - subscription-sample-1 + # NOTE: We should not use grep -q, because it causes a SIGPIPE for kubectl and we have -o pipefail + check_logs_in_workload_with_regex "example-tenant-ns" "receiver-collector" "subscription-sample-1" + check_logs_in_workload_with_regex "example-tenant-ns" "receiver-collector" "subscription-sample-2" + + helm_uninstall_log_generator_from_ns "example-tenant-ns" + undeploy_test_assets "e2e/testdata/one_tenant_two_subscriptions/" +} + +function test_tenants_with_bridges() +{ + helm_install_log_generator_to_ns "shared" + + deploy_test_assets "e2e/testdata/tenants_with_bridges/" + + # NOTE: Since both database and web tenant is parsing logs from the shared tenant + # if we see logs having Attribute "subscription" with value "Str(database)" or "Str(web)" + # then it means the logs are being parsed by the respective tenants and the bridges are working as expected. + check_logs_in_workload_with_regex "telemetry-controller-system" "receiver-collector" "subscription: Str\(database\)" + check_logs_in_workload_with_regex "telemetry-controller-system" "receiver-collector" "subscription: Str\(web\)" + + helm_uninstall_log_generator_from_ns "shared" + undeploy_test_assets "e2e/testdata/tenants_with_bridges/" +} + +function helm_install_log_generator_to_ns() { + local namespace="$1" + helm install \ --wait \ --create-namespace \ - --namespace example-tenant-ns \ - --generate-name \ + --namespace "$namespace" \ + log-generator \ oci://ghcr.io/kube-logging/helm-charts/log-generator } +function helm_uninstall_log_generator_from_ns() +{ + local namespace="$1" + + helm uninstall --namespace "$namespace" log-generator +} + function deploy_test_assets() { - kubectl apply -f e2e/testdata/one_tenant_two_subscriptions/ + local manifests="$1" + + kubectl apply -f "${manifests}" sleep 5 +} - # Wait for the deployment to be ready - kubectl wait --for=condition=available --timeout=300s deployment/receiver-collector --namespace example-tenant-ns +function undeploy_test_assets() +{ + local manifests="$1" + + kubectl delete -f "${manifests}" + + sleep 5 } -function check_subscription_logs() +function check_logs_in_workload_with_regex() { - local subscription="$1" - local namespace="${2:-example-tenant-ns}" - local deployment="${3:-receiver-collector}" + local namespace="$1" + local deployment="$2" + local regex="$3" local max_duration=300 local start_time=$(date +%s) - echo "Checking for $subscription in $namespace/$deployment logs" + echo "Checking for logs in $namespace/$deployment with regex: $regex" while true; do - if kubectl logs --namespace "$namespace" "deployments/$deployment" | grep -E "$subscription"; then - echo "Found $subscription in logs" + if kubectl logs --namespace "$namespace" "deployments/$deployment" | grep -E "$regex"; then + echo "Logs with regex: $regex found in $namespace/$deployment." return 0 fi @@ -85,7 +132,7 @@ function check_subscription_logs() local elapsed_time=$((current_time - start_time)) if [ "$elapsed_time" -ge "$max_duration" ]; then - echo "ERROR: Subscription $subscription not found in logs after $max_duration seconds" + echo "ERROR: Logs with regex: $regex not found in $namespace/$deployment after $max_duration seconds." return 1 fi done diff --git a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml index 86d5fa68..7f0d29f5 100644 --- a/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml +++ b/e2e/testdata/one_tenant_two_subscriptions/one_tenant_two_subscriptions.yaml @@ -40,7 +40,7 @@ metadata: name: subscription-sample-1 namespace: example-tenant-ns spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-test-output namespace: collector @@ -51,7 +51,7 @@ metadata: name: subscription-sample-2 namespace: example-tenant-ns spec: - ottl: 'route()' + condition: "true" outputs: - name: otlp-test-output-2 namespace: collector diff --git a/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml b/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml index 5a374cbe..d202fa89 100644 --- a/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml +++ b/e2e/testdata/one_tenant_two_subscriptions/receiver.yaml @@ -4,6 +4,7 @@ metadata: name: receiver namespace: example-tenant-ns spec: + managementState: managed config: | receivers: otlp: diff --git a/e2e/testdata/tenants_with_bridges/receiver.yaml b/e2e/testdata/tenants_with_bridges/receiver.yaml new file mode 100644 index 00000000..8ef2c9b9 --- /dev/null +++ b/e2e/testdata/tenants_with_bridges/receiver.yaml @@ -0,0 +1,21 @@ +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: receiver + namespace: telemetry-controller-system +spec: + managementState: managed + config: | + receivers: + otlp: + protocols: + grpc: + + exporters: + debug: + verbosity: detailed + service: + pipelines: + logs: + receivers: [otlp] + exporters: [debug] diff --git a/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml new file mode 100644 index 00000000..595f83f7 --- /dev/null +++ b/e2e/testdata/tenants_with_bridges/tenants_with_bridges.yaml @@ -0,0 +1,169 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: collector +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: shared + name: shared +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: database + name: database +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + tenant: web + name: web +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Collector +metadata: + name: cluster +spec: + controlNamespace: collector + tenantSelector: + matchLabels: + collector: cluster +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: shared +spec: + transform: + name: parse-nginx + logStatements: + - context: log + statements: + - set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))")) + logSourceNamespaceSelectors: + - matchLabels: + tenant: shared + subscriptionNamespaceSelectors: + - matchLabels: + tenant: shared +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: shared + namespace: shared +spec: + condition: "true" + outputs: + - name: otlp-test-output-shared + namespace: collector +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: otlp-test-output-shared + namespace: collector +spec: + otlp: + endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 + tls: + insecure: true +--- +# A tenant that consumes logs from the shared tenant using a bridge +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: database +spec: + logSourceNamespaceSelectors: + - matchLabels: + tenant: database + subscriptionNamespaceSelectors: + - matchLabels: + tenant: database +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Bridge +metadata: + name: shared-database +spec: + sourceTenant: shared + targetTenant: database + condition: 'attributes["parsed"]["method"] == "GET"' +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: database + namespace: database +spec: + condition: "true" + outputs: + - name: otlp-test-output-database + namespace: collector +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: otlp-test-output-database + namespace: collector +spec: + otlp: + endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 + tls: + insecure: true +--- +# Another tenant that consumes logs from the shared tenant using a bridge +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Tenant +metadata: + labels: + collector: cluster + name: web +spec: + logSourceNamespaceSelectors: + - matchLabels: + tenant: web + subscriptionNamespaceSelectors: + - matchLabels: + tenant: web +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Bridge +metadata: + name: shared-web +spec: + sourceTenant: shared + targetTenant: web + condition: 'attributes["parsed"]["method"] == "PUT"' +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Subscription +metadata: + name: web + namespace: web +spec: + condition: "true" + outputs: + - name: otlp-test-output-web + namespace: collector +--- +apiVersion: telemetry.kube-logging.dev/v1alpha1 +kind: Output +metadata: + name: otlp-test-output-web + namespace: collector +spec: + otlp: + endpoint: receiver-collector.telemetry-controller-system.svc.cluster.local:4317 + tls: + insecure: true diff --git a/internal/controller/telemetry/controller_integration_test.go b/internal/controller/telemetry/controller_integration_test.go index 2e0ecc85..87be70f6 100644 --- a/internal/controller/telemetry/controller_integration_test.go +++ b/internal/controller/telemetry/controller_integration_test.go @@ -76,7 +76,7 @@ var _ = Describe("Telemetry controller integration test", func() { Namespace: "tenant-1-ctrl", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", + Condition: "true", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-1", @@ -91,7 +91,7 @@ var _ = Describe("Telemetry controller integration test", func() { Namespace: "tenant-2-all", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", + Condition: "true", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", diff --git a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml index b13610b9..2bfb5eb0 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml +++ b/internal/controller/telemetry/otel_conf_gen/otel_col_conf_test_fixtures/complex.yaml @@ -29,34 +29,32 @@ connectors: - key: k8s.pod.labels.app routing/subscription_example-tenant-a-ns_subscription-example-1_outputs: table: - - pipelines: + - condition: "true" + pipelines: - logs/output_example-tenant-a-ns_subscription-example-1_collector_loki-test-output - logs/output_example-tenant-a-ns_subscription-example-1_collector_otlp-test-output - statement: route() routing/subscription_example-tenant-a-ns_subscription-example-2_outputs: table: - - pipelines: + - condition: "true" + pipelines: - logs/output_example-tenant-a-ns_subscription-example-2_collector_otlp-test-output-2 - statement: route() routing/subscription_example-tenant-b-ns_subscription-example-3_outputs: table: - - pipelines: + - condition: "true" + pipelines: - logs/output_example-tenant-b-ns_subscription-example-3_collector_fluentforward-test-output - logs/output_example-tenant-b-ns_subscription-example-3_collector_otlp-test-output-2 - statement: route() routing/tenant_example-tenant-a_subscriptions: table: - - pipelines: + - condition: "true" + pipelines: - logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-1 - statement: route() - - pipelines: - logs/tenant_example-tenant-a_subscription_example-tenant-a-ns_subscription-example-2 - statement: 'route() ' routing/tenant_example-tenant-b_subscriptions: table: - - pipelines: + - condition: "true" + pipelines: - logs/tenant_example-tenant-b_subscription_example-tenant-b-ns_subscription-example-3 - statement: route() exporters: {} extensions: bearertokenauth/collector_otlp-test-output: diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go index 507bc0f3..525723d6 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen.go @@ -121,7 +121,7 @@ func (cfgInput *OtelColConfigInput) generateConnectors() map[string]any { maps.Copy(connectors, connector.GenerateCountConnectors()) for _, tenant := range cfgInput.Tenants { - rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) + rc := connector.GenerateRoutingConnectorForTenantsSubscriptions(tenant.Name, tenant.Spec.RouteConfig, cfgInput.TenantSubscriptionMap[tenant.Name], cfgInput.Subscriptions) connectors[rc.Name] = rc } diff --git a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go index 57fc4c12..5c875a08 100644 --- a/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go +++ b/internal/controller/telemetry/otel_conf_gen/otel_conf_gen_test.go @@ -49,7 +49,7 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-a-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", + Condition: "true", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output", @@ -68,7 +68,7 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-a-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", + Condition: "true", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", @@ -83,7 +83,7 @@ func TestOtelColConfComplex(t *testing.T) { Namespace: "example-tenant-b-ns", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", + Condition: "true", Outputs: []v1alpha1.NamespacedName{ { Name: "otlp-test-output-2", @@ -372,12 +372,15 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te Name: "subsA", Namespace: "nsA", }, - Spec: v1alpha1.SubscriptionSpec{Outputs: []v1alpha1.NamespacedName{ - { - Namespace: "xy", - Name: "zq", + Spec: v1alpha1.SubscriptionSpec{ + Condition: "true", + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, }, - }, OTTL: `set(attributes["subscription"], "subscriptionA")`}, + }, }, { Name: "subsB", @@ -387,12 +390,15 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te Name: "subsB", Namespace: "nsA", }, - Spec: v1alpha1.SubscriptionSpec{Outputs: []v1alpha1.NamespacedName{ - { - Namespace: "xy", - Name: "zq", + Spec: v1alpha1.SubscriptionSpec{ + Condition: "true", + Outputs: []v1alpha1.NamespacedName{ + { + Namespace: "xy", + Name: "zq", + }, }, - }, OTTL: `set(attributes["subscription"], "subscriptionB")`}, + }, }, }, }, @@ -413,12 +419,8 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te Name: "routing/tenant_tenantA_subscriptions", Table: []connector.RoutingConnectorTableItem{ { - Statement: `set(attributes["subscription"], "subscriptionA")`, - Pipelines: []string{"logs/tenant_tenantA_subscription_nsA_subsA"}, - }, - { - Statement: `set(attributes["subscription"], "subscriptionB") `, - Pipelines: []string{"logs/tenant_tenantA_subscription_nsA_subsB"}, + Condition: "true", + Pipelines: []string{"logs/tenant_tenantA_subscription_nsA_subsA", "logs/tenant_tenantA_subscription_nsA_subsB"}, }, }, }, @@ -430,7 +432,7 @@ func TestOtelColConfigInput_generateRoutingConnectorForTenantsSubscription(t *te cfgInput := &OtelColConfigInput{ Subscriptions: tt.fields.Subscriptions, } - got := connector.GenerateRoutingConnectorForTenantsSubscriptions(tt.args.tenantName, tt.args.subscriptionNames, cfgInput.Subscriptions) + got := connector.GenerateRoutingConnectorForTenantsSubscriptions(tt.args.tenantName, v1alpha1.RouteConfig{}, tt.args.subscriptionNames, cfgInput.Subscriptions) assert.Equal(t, got, tt.want) }) } @@ -493,7 +495,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Spec: v1alpha1.TenantSpec{ Transform: v1alpha1.Transform{ Name: "transform1", - LogStatements: []v1alpha1.Statement{ + LogStatements: []v1alpha1.TransformStatement{ { Statements: []string{`set(resource.attributes["parsed"], ExtractPatterns(body, "(?P(GET|PUT))"))`}, }, @@ -538,8 +540,8 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Namespace: "ns1", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", - Outputs: []v1alpha1.NamespacedName{}, + Condition: "true", + Outputs: []v1alpha1.NamespacedName{}, }, }, { @@ -551,8 +553,8 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Namespace: "ns2", }, Spec: v1alpha1.SubscriptionSpec{ - OTTL: "route()", - Outputs: []v1alpha1.NamespacedName{}, + Condition: "true", + Outputs: []v1alpha1.NamespacedName{}, }, }, }, @@ -598,7 +600,7 @@ func TestOtelColConfigInput_generateNamedPipelines(t *testing.T) { Spec: v1alpha1.BridgeSpec{ SourceTenant: "tenant1", TargetTenant: "tenant2", - OTTL: "route()", + Condition: "true", }, }, }, diff --git a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go index f98547e8..38f85df8 100644 --- a/internal/controller/telemetry/pipeline/components/connector/routing_connector.go +++ b/internal/controller/telemetry/pipeline/components/connector/routing_connector.go @@ -16,7 +16,6 @@ package connector import ( "fmt" - "strings" "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" "github.com/kube-logging/telemetry-controller/internal/controller/telemetry/pipeline/components" @@ -25,8 +24,8 @@ import ( ) type RoutingConnectorTableItem struct { - Statement string `json:"statement"` - Pipelines []string `json:"pipelines"` + Condition string `json:"condition,omitempty"` + Pipelines []string `json:"pipelines,omitempty"` } type RoutingConnector struct { @@ -37,41 +36,47 @@ type RoutingConnector struct { Table []RoutingConnectorTableItem `json:"table"` } -func (rc *RoutingConnector) AddRoutingConnectorTableElem(newTableItem RoutingConnectorTableItem) { - rc.Table = append(rc.Table, newTableItem) -} - -func newRoutingConnector(name string) RoutingConnector { - result := RoutingConnector{} - result.Name = name - - return result +func (rc *RoutingConnector) populateRoutingConnectorTable(seenConditionsPipelineMap map[string][]string) { + for condition, pipelines := range seenConditionsPipelineMap { + tableItem := RoutingConnectorTableItem{ + Condition: condition, + Pipelines: pipelines, + } + rc.Table = append(rc.Table, tableItem) + } } -func buildRoutingTableItemForSubscription(tenantName string, subscription v1alpha1.Subscription, index int) RoutingConnectorTableItem { - pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name) - newItem := RoutingConnectorTableItem{ - Statement: fmt.Sprintf("%s%s", subscription.Spec.OTTL, strings.Repeat(" ", index)), - Pipelines: []string{pipelineName}, +func newRoutingConnector(name string, tenantRouteConfig v1alpha1.RouteConfig) RoutingConnector { + return RoutingConnector{ + Name: name, + DefaultPipelines: tenantRouteConfig.DefaultPipelines, + ErrorMode: components.ErrorMode(tenantRouteConfig.ErrorMode), + MatchOnce: tenantRouteConfig.MatchOnce, } - - return newItem } -func GenerateRoutingConnectorForTenantsSubscriptions(tenantName string, subscriptionNames []v1alpha1.NamespacedName, subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName)) +func GenerateRoutingConnectorForTenantsSubscriptions(tenantName string, tenantRouteConfig v1alpha1.RouteConfig, subscriptionNames []v1alpha1.NamespacedName, subscriptions map[v1alpha1.NamespacedName]v1alpha1.Subscription) RoutingConnector { + rc := newRoutingConnector(fmt.Sprintf("routing/tenant_%s_subscriptions", tenantName), tenantRouteConfig) utils.SortNamespacedNames(subscriptionNames) - for index, subscriptionRef := range subscriptionNames { - subscription := subscriptions[subscriptionRef] - tableItem := buildRoutingTableItemForSubscription(tenantName, subscription, index) - rc.AddRoutingConnectorTableElem(tableItem) + seenConditionsPipelineMap := make(map[string][]string) + for _, subscriptionRef := range subscriptionNames { + subscription, ok := subscriptions[subscriptionRef] + if ok { + pipelineName := fmt.Sprintf("logs/tenant_%s_subscription_%s_%s", tenantName, subscription.Namespace, subscription.Name) + if _, ok := seenConditionsPipelineMap[subscription.Spec.Condition]; !ok { + seenConditionsPipelineMap[subscription.Spec.Condition] = []string{pipelineName} + } else { + seenConditionsPipelineMap[subscription.Spec.Condition] = append(seenConditionsPipelineMap[subscription.Spec.Condition], pipelineName) + } + } } + rc.populateRoutingConnectorTable(seenConditionsPipelineMap) return rc } func GenerateRoutingConnectorForSubscriptionsOutputs(subscriptionRef v1alpha1.NamespacedName, outputNames []v1alpha1.NamespacedName) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/subscription_%s_%s_outputs", subscriptionRef.Namespace, subscriptionRef.Name)) + rc := newRoutingConnector(fmt.Sprintf("routing/subscription_%s_%s_outputs", subscriptionRef.Namespace, subscriptionRef.Name), v1alpha1.RouteConfig{}) utils.SortNamespacedNames(outputNames) pipelines := []string{} for _, outputRef := range outputNames { @@ -79,22 +84,21 @@ func GenerateRoutingConnectorForSubscriptionsOutputs(subscriptionRef v1alpha1.Na } tableItem := RoutingConnectorTableItem{ - Statement: "route()", + Condition: "true", Pipelines: pipelines, } - rc.AddRoutingConnectorTableElem(tableItem) + rc.Table = append(rc.Table, tableItem) return rc } func GenerateRoutingConnectorForBridge(bridge v1alpha1.Bridge) RoutingConnector { - rc := newRoutingConnector(fmt.Sprintf("routing/bridge_%s", bridge.Name)) - + rc := newRoutingConnector(fmt.Sprintf("routing/bridge_%s", bridge.Name), v1alpha1.RouteConfig{}) tableItem := RoutingConnectorTableItem{ - Statement: bridge.Spec.OTTL, + Condition: bridge.Spec.Condition, Pipelines: []string{fmt.Sprintf("logs/tenant_%s", bridge.Spec.TargetTenant)}, } - rc.AddRoutingConnectorTableElem(tableItem) + rc.Table = append(rc.Table, tableItem) return rc } @@ -114,11 +118,9 @@ func checkBridgeConnectorForTenant(tenantName string, bridge v1alpha1.Bridge) (n func GenerateRoutingConnectorForBridgesTenantPipeline(tenantName string, pipeline *otelv1beta1.Pipeline, bridges []v1alpha1.Bridge) { for _, bridge := range bridges { needsReceiver, needsExporter, bridgeName := checkBridgeConnectorForTenant(tenantName, bridge) - if needsReceiver { pipeline.Receivers = append(pipeline.Receivers, fmt.Sprintf("routing/bridge_%s", bridgeName)) } - if needsExporter { pipeline.Exporters = append(pipeline.Exporters, fmt.Sprintf("routing/bridge_%s", bridgeName)) } diff --git a/internal/controller/telemetry/pipeline/components/connector/routing_connector_test.go b/internal/controller/telemetry/pipeline/components/connector/routing_connector_test.go new file mode 100644 index 00000000..019861d3 --- /dev/null +++ b/internal/controller/telemetry/pipeline/components/connector/routing_connector_test.go @@ -0,0 +1,170 @@ +// Copyright © 2024 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connector + +import ( + "testing" + + "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + otelv1beta1 "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGenerateRoutingConnectorForSubscriptionsOutputs(t *testing.T) { + tests := []struct { + name string + subscriptionRef v1alpha1.NamespacedName + outputNames []v1alpha1.NamespacedName + expectedRC RoutingConnector + }{ + { + name: "Single output", + subscriptionRef: v1alpha1.NamespacedName{ + Namespace: "ns1", + Name: "sub1", + }, + outputNames: []v1alpha1.NamespacedName{ + {Namespace: "ns1", Name: "out1"}, + }, + expectedRC: RoutingConnector{ + Name: "routing/subscription_ns1_sub1_outputs", + Table: []RoutingConnectorTableItem{ + { + Condition: "true", + Pipelines: []string{"logs/output_ns1_sub1_ns1_out1"}, + }, + }, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, ttp.expectedRC, GenerateRoutingConnectorForSubscriptionsOutputs(ttp.subscriptionRef, ttp.outputNames)) + }) + } +} + +func TestGenerateRoutingConnectorForBridge(t *testing.T) { + tests := []struct { + name string + bridge v1alpha1.Bridge + expectedRC RoutingConnector + }{ + { + name: "Valid bridge", + bridge: v1alpha1.Bridge{ + Spec: v1alpha1.BridgeSpec{ + Condition: `attributes["X-Tenant"] == "telemetry-controller-system"`, + TargetTenant: "tenant1", + }, + }, + expectedRC: RoutingConnector{ + Name: "routing/bridge_", + Table: []RoutingConnectorTableItem{ + { + Condition: `attributes["X-Tenant"] == "telemetry-controller-system"`, + Pipelines: []string{"logs/tenant_tenant1"}, + }, + }, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, ttp.expectedRC, GenerateRoutingConnectorForBridge(ttp.bridge)) + }) + } +} + +func TestGenerateRoutingConnectorForBridgesTenantPipeline(t *testing.T) { + tests := []struct { + name string + tenantName string + pipeline *otelv1beta1.Pipeline + bridges []v1alpha1.Bridge + expectedPipeline *otelv1beta1.Pipeline + }{ + { + name: "Empty bridges list", + tenantName: "tenant1", + pipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver"}, + Exporters: []string{"already-existing-exporter"}, + }, + bridges: []v1alpha1.Bridge{}, + expectedPipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver"}, + Exporters: []string{"already-existing-exporter"}, + }, + }, + { + name: "Bridge requiring exporter only", + tenantName: "tenant", + pipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver"}, + Exporters: []string{"already-existing-exporter"}, + }, + bridges: []v1alpha1.Bridge{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge1", + }, + Spec: v1alpha1.BridgeSpec{ + SourceTenant: "tenant", + }, + }, + }, + expectedPipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver"}, + Exporters: []string{"already-existing-exporter", "routing/bridge_bridge1"}, + }, + }, + { + name: "Bridge requiring receiver only", + tenantName: "tenant", + pipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver"}, + Exporters: []string{"already-existing-exporter"}, + }, + bridges: []v1alpha1.Bridge{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bridge2", + }, + Spec: v1alpha1.BridgeSpec{ + TargetTenant: "tenant", + }, + }, + }, + expectedPipeline: &otelv1beta1.Pipeline{ + Receivers: []string{"already-existing-receiver", "routing/bridge_bridge2"}, + Exporters: []string{"already-existing-exporter"}, + }, + }, + } + + for _, tt := range tests { + ttp := tt + t.Run(tt.name, func(t *testing.T) { + GenerateRoutingConnectorForBridgesTenantPipeline(ttp.tenantName, ttp.pipeline, ttp.bridges) + assert.Equal(t, ttp.expectedPipeline, ttp.pipeline) + }) + } +} diff --git a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go index 1a554af8..23d52a68 100644 --- a/internal/controller/telemetry/pipeline/components/processor/transform_processor.go +++ b/internal/controller/telemetry/pipeline/components/processor/transform_processor.go @@ -52,7 +52,7 @@ func GenerateTransformProcessorForTenantPipeline(tenantName string, pipeline *ot } } -func convertAPIStatements(APIStatements []v1alpha1.Statement) []TransformProcessorStatement { +func convertAPIStatements(APIStatements []v1alpha1.TransformStatement) []TransformProcessorStatement { statements := make([]TransformProcessorStatement, len(APIStatements)) for i, statement := range APIStatements { statements[i] = TransformProcessorStatement{