diff --git a/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc b/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc index 23e90e59a7..ab5e533e62 100644 --- a/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc +++ b/docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc @@ -419,7 +419,9 @@ spec: The very same concept of data types can also be used on Kamelet sinks and input data types. As soon as the user chooses a specific input data type for a Kamelet the Pipe processing will try to resolve a matching transformer implementation and apply its logic. -You may also use a `data-type-action` Kamelet in your Pipe binding in order to apply a specific data type transformation at any step. +NOTE: by default, the operator will use a `data-type-action` Kamelet that has to be an available Kamelet in the catalog. This is provided out of the box installing bundled Apache Kamelet catalog. It will fail if the Kamelet is not available. You can also override the Kamelet action to use adding the `camel.apache.org/kamelet.data.type` annotation to the Pipe specification. + +You may also use the `data-type-action` Kamelet in your Pipe binding in order to apply a specific data type transformation at any step. .my-sample-source-binding.yaml [source,yaml] diff --git a/pkg/apis/camel/v1/kamelet_types.go b/pkg/apis/camel/v1/kamelet_types.go index d45ad4a3c8..0c09263560 100644 --- a/pkg/apis/camel/v1/kamelet_types.go +++ b/pkg/apis/camel/v1/kamelet_types.go @@ -33,6 +33,8 @@ const ( KameletTypeLabel = "camel.apache.org/kamelet.type" // KameletGroupLabel label used to group Kamelets. KameletGroupLabel = "camel.apache.org/kamelet.group" + // KameletDataTypeLabel label used to override the default Kamelet action data type. + KameletDataTypeLabel = "camel.apache.org/kamelet.data.type" // KameletTypeSink type Sink. KameletTypeSink = "sink" diff --git a/pkg/controller/pipe/integration.go b/pkg/controller/pipe/integration.go index 7effc4869f..2348529508 100644 --- a/pkg/controller/pipe/integration.go +++ b/pkg/controller/pipe/integration.go @@ -42,6 +42,7 @@ var ( endpointTypeSinkContext = bindings.EndpointContext{Type: v1.EndpointTypeSink} ) +// CreateIntegrationFor creates and Integration from the a Pipe func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1.Pipe) (*v1.Integration, error) { controller := true blockOwnerDeletion := true @@ -102,6 +103,7 @@ func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1.Pipe Client: c, Namespace: it.Namespace, Profile: profile, + Metadata: it.Annotations, } from, err := bindings.Translate(bindingContext, endpointTypeSourceContext, binding.Spec.Source) diff --git a/pkg/controller/pipe/integration_test.go b/pkg/controller/pipe/integration_test.go new file mode 100644 index 0000000000..a59e89bbfa --- /dev/null +++ b/pkg/controller/pipe/integration_test.go @@ -0,0 +1,138 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipe + +import ( + "context" + "testing" + + v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1" + "github.com/apache/camel-k/v2/pkg/util/dsl" + "github.com/apache/camel-k/v2/pkg/util/test" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestCreateIntegrationForPipe(t *testing.T) { + client, err := test.NewFakeClient() + assert.NoError(t, err) + + pipe := nominalPipe("my-pipe") + it, err := CreateIntegrationFor(context.TODO(), client, &pipe) + assert.Nil(t, err) + assert.Equal(t, "my-pipe", it.Name) + assert.Equal(t, "default", it.Namespace) + assert.Equal(t, map[string]string{ + "my-annotation": "my-annotation-val", + }, it.Annotations) + assert.Equal(t, map[string]string{ + "camel.apache.org/created.by.kind": "Pipe", + "camel.apache.org/created.by.name": "my-pipe", + "my-label": "my-label-val", + }, it.Labels) + assert.Equal(t, "camel.apache.org/v1", it.OwnerReferences[0].APIVersion) + assert.Equal(t, "Pipe", it.OwnerReferences[0].Kind) + assert.Equal(t, "my-pipe", it.OwnerReferences[0].Name) + dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + assert.Nil(t, err) + assert.Equal(t, expectedNominalRoute(), string(dsl)) +} + +func TestCreateIntegrationForPipeDataType(t *testing.T) { + client, err := test.NewFakeClient() + assert.NoError(t, err) + + pipe := nominalPipe("my-pipe-data-type") + pipe.Spec.Sink.DataTypes = map[v1.TypeSlot]v1.DataTypeReference{ + v1.TypeSlotIn: v1.DataTypeReference{ + Format: "string", + }, + } + it, err := CreateIntegrationFor(context.TODO(), client, &pipe) + assert.Nil(t, err) + dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + assert.Nil(t, err) + assert.Equal(t, expectedNominalRouteWithDataType("data-type-action"), string(dsl)) +} + +func TestCreateIntegrationForPipeDataTypeOverridden(t *testing.T) { + client, err := test.NewFakeClient() + assert.NoError(t, err) + + pipe := nominalPipe("my-pipe-data-type") + pipe.Spec.Sink.DataTypes = map[v1.TypeSlot]v1.DataTypeReference{ + v1.TypeSlotIn: v1.DataTypeReference{ + Format: "string", + }, + } + newDataTypeKameletAction := "data-type-action-v4-2" + pipe.Annotations[v1.KameletDataTypeLabel] = newDataTypeKameletAction + it, err := CreateIntegrationFor(context.TODO(), client, &pipe) + assert.Nil(t, err) + dsl, err := dsl.ToYamlDSL(it.Spec.Flows) + assert.Nil(t, err) + assert.Equal(t, expectedNominalRouteWithDataType(newDataTypeKameletAction), string(dsl)) +} + +func nominalPipe(name string) v1.Pipe { + pipe := v1.NewPipe("default", name) + pipe.Annotations = map[string]string{ + "my-annotation": "my-annotation-val", + } + pipe.Labels = map[string]string{ + "my-label": "my-label-val", + } + pipe.Spec.Source = v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kamelet", + Name: "my-source", + APIVersion: "camel.apache.org/v1", + }, + } + pipe.Spec.Sink = v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kamelet", + Name: "my-sink", + APIVersion: "camel.apache.org/v1", + }, + } + pipe.Status.Phase = v1.PipePhaseReady + return pipe +} + +func expectedNominalRoute() string { + return `- route: + from: + steps: + - to: kamelet:my-sink/sink + uri: kamelet:my-source/source + id: binding +` +} + +func expectedNominalRouteWithDataType(name string) string { + return `- route: + from: + steps: + - kamelet: + name: ` + name + `/sink-in + - to: kamelet:my-sink/sink + uri: kamelet:my-source/source + id: binding +` +} diff --git a/pkg/util/bindings/api.go b/pkg/util/bindings/api.go index 93b2130d2e..d724af753e 100644 --- a/pkg/util/bindings/api.go +++ b/pkg/util/bindings/api.go @@ -60,6 +60,7 @@ type BindingContext struct { Client client.Client Namespace string Profile v1.TraitProfile + Metadata map[string]string } type EndpointContext struct { diff --git a/pkg/util/bindings/api_v1alpha1.go b/pkg/util/bindings/api_v1alpha1.go index 8d97e80233..dddf28d0ea 100644 --- a/pkg/util/bindings/api_v1alpha1.go +++ b/pkg/util/bindings/api_v1alpha1.go @@ -45,6 +45,7 @@ type V1alpha1BindingContext struct { Client client.Client Namespace string Profile v1.TraitProfile + Metadata map[string]string } // V1alpha1EndpointContext -- diff --git a/pkg/util/bindings/kamelet.go b/pkg/util/bindings/kamelet.go index cb05d96462..c651773341 100644 --- a/pkg/util/bindings/kamelet.go +++ b/pkg/util/bindings/kamelet.go @@ -29,7 +29,7 @@ import ( ) const ( - datTypeActionKamelet = "data-type-action" + defaultDataTypeActionKamelet = "data-type-action" ) // BindingConverter converts a reference to a Kamelet into a Camel URI. @@ -73,11 +73,16 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont binding.ApplicationProperties[propKey] = v } + dataTypeActionKamelet := ctx.Metadata[v1.KameletDataTypeLabel] + if dataTypeActionKamelet == "" { + dataTypeActionKamelet = defaultDataTypeActionKamelet + } + switch endpointCtx.Type { case v1.EndpointTypeAction: steps := make([]map[string]interface{}, 0) - if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn); in != nil { + if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn, dataTypeActionKamelet); in != nil { steps = append(steps, in) for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -90,7 +95,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont }, }) - if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut); out != nil { + if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut, dataTypeActionKamelet); out != nil { steps = append(steps, out) for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -109,7 +114,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont binding.Step = steps[0] } case v1.EndpointTypeSource: - if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut); out != nil { + if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut, dataTypeActionKamelet); out != nil { binding.Step = out for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -118,7 +123,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id)) case v1.EndpointTypeSink: - if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn); in != nil { + if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn, dataTypeActionKamelet); in != nil { binding.Step = in for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -136,7 +141,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont } // DataTypeStep --. -func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.TypeSlot) (map[string]interface{}, map[string]string) { +func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.TypeSlot, dataTypeActionKamelet string) (map[string]interface{}, map[string]string) { if e.DataTypes == nil { return nil, nil } @@ -153,12 +158,12 @@ func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.Typ } props := make(map[string]string, 2) - props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme - props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = format + props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", dataTypeActionKamelet, id, typeSlot)] = scheme + props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", dataTypeActionKamelet, id, typeSlot)] = format stepDsl := map[string]interface{}{ "kamelet": map[string]interface{}{ - "name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot), + "name": fmt.Sprintf("%s/%s-%s", dataTypeActionKamelet, url.PathEscape(id), typeSlot), }, } @@ -217,11 +222,16 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint binding.ApplicationProperties[propKey] = v } + dataTypeActionKamelet := ctx.Metadata[v1.KameletDataTypeLabel] + if dataTypeActionKamelet == "" { + dataTypeActionKamelet = defaultDataTypeActionKamelet + } + switch endpointCtx.Type { case v1alpha1.EndpointTypeAction: steps := make([]map[string]interface{}, 0) - if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil { + if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn, dataTypeActionKamelet); in != nil { steps = append(steps, in) for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -234,7 +244,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint }, }) - if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil { + if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut, dataTypeActionKamelet); out != nil { steps = append(steps, out) for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -253,7 +263,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint binding.Step = steps[0] } case v1alpha1.EndpointTypeSource: - if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil { + if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut, dataTypeActionKamelet); out != nil { binding.Step = out for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -262,7 +272,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id)) case v1alpha1.EndpointTypeSink: - if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil { + if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn, dataTypeActionKamelet); in != nil { binding.Step = in for k, v := range applicationProperties { binding.ApplicationProperties[k] = v @@ -281,7 +291,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint // DataTypeStep -- . // Deprecated. -func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot) (map[string]interface{}, map[string]string) { +func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot, dataTypeActionKamelet string) (map[string]interface{}, map[string]string) { if e.DataTypes == nil { return nil, nil } @@ -293,12 +303,12 @@ func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, t } props := make(map[string]string, 2) - props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme - props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = inDataType.Format + props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", dataTypeActionKamelet, id, typeSlot)] = scheme + props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", dataTypeActionKamelet, id, typeSlot)] = inDataType.Format stepDsl := map[string]interface{}{ "kamelet": map[string]interface{}{ - "name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot), + "name": fmt.Sprintf("%s/%s-%s", dataTypeActionKamelet, url.PathEscape(id), typeSlot), }, } diff --git a/pkg/util/bindings/kamelet_test.go b/pkg/util/bindings/kamelet_test.go index 3721c3392e..d8826efdfa 100644 --- a/pkg/util/bindings/kamelet_test.go +++ b/pkg/util/bindings/kamelet_test.go @@ -139,7 +139,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-input", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(true, false), + step: getExpectedStep(true, false, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -154,7 +154,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-input-scheme-prefix", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(true, false), + step: getExpectedStep(true, false, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -169,7 +169,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-output", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(false, true), + step: getExpectedStep(false, true, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -184,7 +184,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-output-scheme-prefix", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(false, true), + step: getExpectedStep(false, true, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -199,7 +199,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-input-output", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(true, true), + step: getExpectedStep(true, true, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -219,7 +219,7 @@ func TestBindingConverterWithDataTypes(t *testing.T) { name: "action-input-output-schema-and-prefix", endpointType: v1.EndpointTypeAction, uri: "", - step: getExpectedStep(true, true), + step: getExpectedStep(true, true, defaultDataTypeActionKamelet), endpointProperties: map[string]string{ "foo": "bar", }, @@ -300,13 +300,109 @@ func TestBindingConverterWithDataTypes(t *testing.T) { } } -func getExpectedStep(withIn bool, withOut bool) map[string]interface{} { +func TestBindingConverterWithDataTypesOverridden(t *testing.T) { + testcases := []struct { + name string + endpointType v1.EndpointType + uri string + step map[string]interface{} + endpointProperties map[string]string + applicationProperties map[string]string + inputScheme string + inputFormat string + outputScheme string + outputFormat string + }{ + { + name: "action-input", + endpointType: v1.EndpointTypeAction, + uri: "", + step: getExpectedStep(true, false, "data-type-action-v2"), + endpointProperties: map[string]string{ + "foo": "bar", + }, + applicationProperties: map[string]string{ + "camel.kamelet.mykamelet.action-0.foo": "bar", + "camel.kamelet.data-type-action-v2.action-0-in.scheme": "camel", + "camel.kamelet.data-type-action-v2.action-0-in.format": "string", + }, + inputFormat: "string", + }, + } + + for i, tc := range testcases { + t.Run(fmt.Sprintf("test-%d-%s", i, tc.name), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := test.NewFakeClient() + assert.NoError(t, err) + + endpoint := v1.Endpoint{ + Ref: &corev1.ObjectReference{ + Kind: "Kamelet", + APIVersion: "camel.apache.org/v1any1", + Name: "mykamelet", + }, + } + + if len(tc.endpointProperties) > 0 { + endpoint.Properties = asEndpointProperties(tc.endpointProperties) + } + + endpoint.DataTypes = make(map[v1.TypeSlot]v1.DataTypeReference) + if tc.inputFormat != "" { + endpoint.DataTypes[v1.TypeSlotIn] = v1.DataTypeReference{ + Scheme: tc.inputScheme, + Format: tc.inputFormat, + } + } + + if tc.outputFormat != "" { + endpoint.DataTypes[v1.TypeSlotOut] = v1.DataTypeReference{ + Scheme: tc.outputScheme, + Format: tc.outputFormat, + } + } + + pos := 0 + binding, err := BindingConverter{}.Translate( + BindingContext{ + Ctx: ctx, + Client: client, + Namespace: "test", + Profile: v1.TraitProfileKubernetes, + Metadata: map[string]string{ + v1.KameletDataTypeLabel: "data-type-action-v2", + }, + }, + EndpointContext{ + Type: tc.endpointType, + Position: &pos, + }, + endpoint) + + assert.NoError(t, err) + assert.NotNil(t, binding) + assert.Equal(t, tc.step, binding.Step) + assert.Equal(t, tc.uri, binding.URI) + + if len(tc.applicationProperties) > 0 { + assert.Equal(t, tc.applicationProperties, binding.ApplicationProperties) + } else { + assert.True(t, len(binding.ApplicationProperties) == 0) + } + }) + } +} + +func getExpectedStep(withIn bool, withOut bool, dataTypeActionKamelet string) map[string]interface{} { var steps []map[string]interface{} if withIn { steps = append(steps, map[string]interface{}{ "kamelet": map[string]interface{}{ - "name": "data-type-action/action-0-in", + "name": dataTypeActionKamelet + "/action-0-in", }, }) } @@ -320,7 +416,7 @@ func getExpectedStep(withIn bool, withOut bool) map[string]interface{} { if withOut { steps = append(steps, map[string]interface{}{ "kamelet": map[string]interface{}{ - "name": "data-type-action/action-0-out", + "name": dataTypeActionKamelet + "/action-0-out", }, }) }