Skip to content

Commit

Permalink
fix(kamelet): override data type action
Browse files Browse the repository at this point in the history
  • Loading branch information
squakez committed Jan 24, 2024
1 parent 169387b commit 93ffbf5
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 27 deletions.
4 changes: 3 additions & 1 deletion docs/modules/ROOT/pages/kamelets/kamelets-dev.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,9 @@ spec:
The very same concept of data types can also be used on Kamelet sinks and input data types.
As soon as the user chooses a specific input data type for a Kamelet the Pipe processing will try to resolve a matching transformer implementation and apply its logic.

You may also use a `data-type-action` Kamelet in your Pipe binding in order to apply a specific data type transformation at any step.
NOTE: by default, the operator will use a `data-type-action` Kamelet that has to be an available Kamelet in the catalog. This is provided out of the box installing bundled Apache Kamelet catalog. It will fail if the Kamelet is not available. You can also override the Kamelet action to use adding the `camel.apache.org/kamelet.data.type` annotation to the Pipe specification.

You may also use the `data-type-action` Kamelet in your Pipe binding in order to apply a specific data type transformation at any step.

.my-sample-source-binding.yaml
[source,yaml]
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/camel/v1/kamelet_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
KameletTypeLabel = "camel.apache.org/kamelet.type"
// KameletGroupLabel label used to group Kamelets.
KameletGroupLabel = "camel.apache.org/kamelet.group"
// KameletDataTypeLabel label used to override the default Kamelet action data type.
KameletDataTypeLabel = "camel.apache.org/kamelet.data.type"

// KameletTypeSink type Sink.
KameletTypeSink = "sink"
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/pipe/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
endpointTypeSinkContext = bindings.EndpointContext{Type: v1.EndpointTypeSink}
)

// CreateIntegrationFor creates and Integration from the a Pipe
func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1.Pipe) (*v1.Integration, error) {
controller := true
blockOwnerDeletion := true
Expand Down Expand Up @@ -102,6 +103,7 @@ func CreateIntegrationFor(ctx context.Context, c client.Client, binding *v1.Pipe
Client: c,
Namespace: it.Namespace,
Profile: profile,
Metadata: it.Annotations,
}

from, err := bindings.Translate(bindingContext, endpointTypeSourceContext, binding.Spec.Source)
Expand Down
138 changes: 138 additions & 0 deletions pkg/controller/pipe/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipe

import (
"context"
"testing"

v1 "github.com/apache/camel-k/v2/pkg/apis/camel/v1"
"github.com/apache/camel-k/v2/pkg/util/dsl"
"github.com/apache/camel-k/v2/pkg/util/test"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
)

func TestCreateIntegrationForPipe(t *testing.T) {
client, err := test.NewFakeClient()
assert.NoError(t, err)

pipe := nominalPipe("my-pipe")
it, err := CreateIntegrationFor(context.TODO(), client, &pipe)
assert.Nil(t, err)
assert.Equal(t, "my-pipe", it.Name)
assert.Equal(t, "default", it.Namespace)
assert.Equal(t, map[string]string{
"my-annotation": "my-annotation-val",
}, it.Annotations)
assert.Equal(t, map[string]string{
"camel.apache.org/created.by.kind": "Pipe",
"camel.apache.org/created.by.name": "my-pipe",
"my-label": "my-label-val",
}, it.Labels)
assert.Equal(t, "camel.apache.org/v1", it.OwnerReferences[0].APIVersion)
assert.Equal(t, "Pipe", it.OwnerReferences[0].Kind)
assert.Equal(t, "my-pipe", it.OwnerReferences[0].Name)
dsl, err := dsl.ToYamlDSL(it.Spec.Flows)
assert.Nil(t, err)
assert.Equal(t, expectedNominalRoute(), string(dsl))
}

func TestCreateIntegrationForPipeDataType(t *testing.T) {
client, err := test.NewFakeClient()
assert.NoError(t, err)

pipe := nominalPipe("my-pipe-data-type")
pipe.Spec.Sink.DataTypes = map[v1.TypeSlot]v1.DataTypeReference{
v1.TypeSlotIn: v1.DataTypeReference{
Format: "string",
},
}
it, err := CreateIntegrationFor(context.TODO(), client, &pipe)
assert.Nil(t, err)
dsl, err := dsl.ToYamlDSL(it.Spec.Flows)
assert.Nil(t, err)
assert.Equal(t, expectedNominalRouteWithDataType("data-type-action"), string(dsl))
}

func TestCreateIntegrationForPipeDataTypeOverridden(t *testing.T) {
client, err := test.NewFakeClient()
assert.NoError(t, err)

pipe := nominalPipe("my-pipe-data-type")
pipe.Spec.Sink.DataTypes = map[v1.TypeSlot]v1.DataTypeReference{
v1.TypeSlotIn: v1.DataTypeReference{
Format: "string",
},
}
newDataTypeKameletAction := "data-type-action-v4-2"
pipe.Annotations[v1.KameletDataTypeLabel] = newDataTypeKameletAction
it, err := CreateIntegrationFor(context.TODO(), client, &pipe)
assert.Nil(t, err)
dsl, err := dsl.ToYamlDSL(it.Spec.Flows)
assert.Nil(t, err)
assert.Equal(t, expectedNominalRouteWithDataType(newDataTypeKameletAction), string(dsl))
}

func nominalPipe(name string) v1.Pipe {
pipe := v1.NewPipe("default", name)
pipe.Annotations = map[string]string{
"my-annotation": "my-annotation-val",
}
pipe.Labels = map[string]string{
"my-label": "my-label-val",
}
pipe.Spec.Source = v1.Endpoint{
Ref: &corev1.ObjectReference{
Kind: "Kamelet",
Name: "my-source",
APIVersion: "camel.apache.org/v1",
},
}
pipe.Spec.Sink = v1.Endpoint{
Ref: &corev1.ObjectReference{
Kind: "Kamelet",
Name: "my-sink",
APIVersion: "camel.apache.org/v1",
},
}
pipe.Status.Phase = v1.PipePhaseReady
return pipe
}

func expectedNominalRoute() string {
return `- route:
from:
steps:
- to: kamelet:my-sink/sink
uri: kamelet:my-source/source
id: binding
`
}

func expectedNominalRouteWithDataType(name string) string {
return `- route:
from:
steps:
- kamelet:
name: ` + name + `/sink-in
- to: kamelet:my-sink/sink
uri: kamelet:my-source/source
id: binding
`
}
1 change: 1 addition & 0 deletions pkg/util/bindings/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type BindingContext struct {
Client client.Client
Namespace string
Profile v1.TraitProfile
Metadata map[string]string
}

type EndpointContext struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/util/bindings/api_v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type V1alpha1BindingContext struct {
Client client.Client
Namespace string
Profile v1.TraitProfile
Metadata map[string]string
}

// V1alpha1EndpointContext --
Expand Down
44 changes: 27 additions & 17 deletions pkg/util/bindings/kamelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
datTypeActionKamelet = "data-type-action"
defaultDataTypeActionKamelet = "data-type-action"
)

// BindingConverter converts a reference to a Kamelet into a Camel URI.
Expand Down Expand Up @@ -73,11 +73,16 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont
binding.ApplicationProperties[propKey] = v
}

dataTypeActionKamelet := ctx.Metadata[v1.KameletDataTypeLabel]
if dataTypeActionKamelet == "" {
dataTypeActionKamelet = defaultDataTypeActionKamelet
}

switch endpointCtx.Type {
case v1.EndpointTypeAction:
steps := make([]map[string]interface{}, 0)

if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn); in != nil {
if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn, dataTypeActionKamelet); in != nil {
steps = append(steps, in)
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -90,7 +95,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont
},
})

if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut); out != nil {
if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut, dataTypeActionKamelet); out != nil {
steps = append(steps, out)
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -109,7 +114,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont
binding.Step = steps[0]
}
case v1.EndpointTypeSource:
if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut); out != nil {
if out, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotOut, dataTypeActionKamelet); out != nil {
binding.Step = out
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -118,7 +123,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont

binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
case v1.EndpointTypeSink:
if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn); in != nil {
if in, applicationProperties := k.DataTypeStep(e, id, v1.TypeSlotIn, dataTypeActionKamelet); in != nil {
binding.Step = in
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -136,7 +141,7 @@ func (k BindingConverter) Translate(ctx BindingContext, endpointCtx EndpointCont
}

// DataTypeStep --.
func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.TypeSlot) (map[string]interface{}, map[string]string) {
func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.TypeSlot, dataTypeActionKamelet string) (map[string]interface{}, map[string]string) {
if e.DataTypes == nil {
return nil, nil
}
Expand All @@ -153,12 +158,12 @@ func (k BindingConverter) DataTypeStep(e v1.Endpoint, id string, typeSlot v1.Typ
}

props := make(map[string]string, 2)
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = format
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", dataTypeActionKamelet, id, typeSlot)] = scheme
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", dataTypeActionKamelet, id, typeSlot)] = format

stepDsl := map[string]interface{}{
"kamelet": map[string]interface{}{
"name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot),
"name": fmt.Sprintf("%s/%s-%s", dataTypeActionKamelet, url.PathEscape(id), typeSlot),
},
}

Expand Down Expand Up @@ -217,11 +222,16 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint
binding.ApplicationProperties[propKey] = v
}

dataTypeActionKamelet := ctx.Metadata[v1.KameletDataTypeLabel]
if dataTypeActionKamelet == "" {
dataTypeActionKamelet = defaultDataTypeActionKamelet
}

switch endpointCtx.Type {
case v1alpha1.EndpointTypeAction:
steps := make([]map[string]interface{}, 0)

if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn, dataTypeActionKamelet); in != nil {
steps = append(steps, in)
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -234,7 +244,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint
},
})

if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut, dataTypeActionKamelet); out != nil {
steps = append(steps, out)
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -253,7 +263,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint
binding.Step = steps[0]
}
case v1alpha1.EndpointTypeSource:
if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut); out != nil {
if out, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotOut, dataTypeActionKamelet); out != nil {
binding.Step = out
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -262,7 +272,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint

binding.URI = fmt.Sprintf("kamelet:%s/%s", kameletName, url.PathEscape(id))
case v1alpha1.EndpointTypeSink:
if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn); in != nil {
if in, applicationProperties := k.DataTypeStep(e, id, v1alpha1.TypeSlotIn, dataTypeActionKamelet); in != nil {
binding.Step = in
for k, v := range applicationProperties {
binding.ApplicationProperties[k] = v
Expand All @@ -281,7 +291,7 @@ func (k V1alpha1BindingConverter) Translate(ctx V1alpha1BindingContext, endpoint

// DataTypeStep -- .
// Deprecated.
func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot) (map[string]interface{}, map[string]string) {
func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, typeSlot v1alpha1.TypeSlot, dataTypeActionKamelet string) (map[string]interface{}, map[string]string) {
if e.DataTypes == nil {
return nil, nil
}
Expand All @@ -293,12 +303,12 @@ func (k V1alpha1BindingConverter) DataTypeStep(e v1alpha1.Endpoint, id string, t
}

props := make(map[string]string, 2)
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", datTypeActionKamelet, id, typeSlot)] = scheme
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", datTypeActionKamelet, id, typeSlot)] = inDataType.Format
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.scheme", dataTypeActionKamelet, id, typeSlot)] = scheme
props[fmt.Sprintf("camel.kamelet.%s.%s-%s.format", dataTypeActionKamelet, id, typeSlot)] = inDataType.Format

stepDsl := map[string]interface{}{
"kamelet": map[string]interface{}{
"name": fmt.Sprintf("%s/%s-%s", datTypeActionKamelet, url.PathEscape(id), typeSlot),
"name": fmt.Sprintf("%s/%s-%s", dataTypeActionKamelet, url.PathEscape(id), typeSlot),
},
}

Expand Down
Loading

0 comments on commit 93ffbf5

Please sign in to comment.