Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Better processors #1378

Merged
merged 32 commits into from
Mar 7, 2024
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7782b08
Refactor pkg/plugin and isolate connector plugin (#1302)
lovromazgon Jan 9, 2024
7b3087e
BP: WASM: Load available WASM processors (#1322)
hariso Jan 26, 2024
cf47e68
Refactor pkg/plugin and isolate connector plugin (#1302)
lovromazgon Jan 9, 2024
6807027
BP: WASM: Load available WASM processors (#1322)
hariso Jan 30, 2024
f3fc6de
Merge branch 'feature/better-processors' of github.com:ConduitIO/cond…
hariso Jan 30, 2024
4cf9959
Refactor processor service, processor and node (#1354)
hariso Feb 12, 2024
a2a4712
Merge branch 'main' into feature/better-processors
lovromazgon Feb 13, 2024
f52598a
linter
hariso Feb 14, 2024
97b0d1a
Run processor examples, gather specifications (#1384)
lovromazgon Feb 16, 2024
321937b
BP: Display processor example raw data as string (#1397)
lovromazgon Feb 29, 2024
5614c58
BP: add builtin processor part#1 (#1371)
maha-hajja Mar 1, 2024
99655a2
BP: Add unwrap.debezium (#1393)
hariso Mar 4, 2024
619238f
BP: Add unwrap.kafkaconnect (#1395)
hariso Mar 4, 2024
2b5dea4
BP: Add webhook.http (#1382)
hariso Mar 4, 2024
6a4ab4c
BP: Add unwrap.opencdc (#1387)
hariso Mar 5, 2024
c8eeaac
BP: Add encode.avro (#1401)
hariso Mar 5, 2024
4721a9b
BP: Add custom.javascript (#1374)
hariso Mar 6, 2024
004faa0
add json.decode processor (#1405)
maha-hajja Mar 6, 2024
86ff56e
BP: List processor plugins in API (#1406)
lovromazgon Mar 6, 2024
f3410c1
BP: add avro.decode (#1410)
hariso Mar 6, 2024
6877e25
BP: Base64 decode / encode processors (#1412)
lovromazgon Mar 6, 2024
95e1a55
add encodeJson processor (#1413)
maha-hajja Mar 7, 2024
d2c1b77
Builtin processors refactoring, housekeeping (#1402)
lovromazgon Mar 7, 2024
e3845c3
Merge branch 'main' into feature/better-processors
hariso Mar 7, 2024
09be701
fix linter
hariso Mar 7, 2024
28128ee
BP: call the conditional execution methods in the processors (#1411)
hariso Mar 7, 2024
60b2788
update examples and specs
lovromazgon Mar 7, 2024
107d104
disable goconst in processors (docs are repetitive)
lovromazgon Mar 7, 2024
16beec9
upgrade processor sdk
hariso Mar 7, 2024
7d09a57
Merge branch 'feature/better-processors' of github.com:ConduitIO/cond…
hariso Mar 7, 2024
769ee22
Merge branch 'main' into feature/better-processors
hariso Mar 7, 2024
68d0a26
add log when loading wasm processors, more docs updates
lovromazgon Mar 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
BP: add builtin processor part#1 (#1371)
* add processors field.set & field.subset.exclude

* delete extra test

* fix tests

* add field.rename processor

* add exclusion validation to field.rename processor

* refactor + fix error return in Process method

* add field.convert processor

* add filter processor

* only structured data

* address reviews part1

* use paramgen, update Configure method, update tests

* evaluate value for processor setField

* use a slice of referenceResolvers for processors with multiple fields params

* do not allow .Position to be set in setField processor

* update setField test, add non existent field test

* use the new Rename() method from referenceResolver

* address reviews

* address reviews2

* fix renameField bug + add New() method to each processor

* linter fix

* add processor examples

* processor json

* Update pkg/plugin/processor/builtin/convertField.go

Co-authored-by: Lovro Mažgon <[email protected]>

* address reviews

* setFeild example

* setFeild example

* make generate

* fix paramgen regex

* generate

* fix regex

* add paramgen to tools

---------

Co-authored-by: Lovro Mažgon <[email protected]>
maha-hajja and lovromazgon authored Mar 1, 2024

Verified

This commit was signed with the committer’s verified signature.
commit 5614c58696d0792206421e906157c644fc8b703c
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ require (
github.com/conduitio/conduit-connector-protocol v0.5.1-0.20240104160905-e9e61586fb8d
github.com/conduitio/conduit-connector-s3 v0.5.1
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/gammazero/deque v0.2.1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1095,8 +1095,8 @@ github.com/conduitio/conduit-connector-s3 v0.5.1 h1:yRo8004ryCIZc/S3iWQ1rN6pm6bj
github.com/conduitio/conduit-connector-s3 v0.5.1/go.mod h1:nbxzsyS95gbFJ28Job9vFFB+byRFINSv70/13Yi4mKQ=
github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4=
github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31 h1:a4x/bVFMZrPGnOM502FwbVrM0dfrttL1FZvKqTtYVP0=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240216180055-cbdc5dcb5d31/go.mod h1:F/tmVZiXzZY60bzUSqwStM1TPaGvmQ9b1n1LuVtAOio=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29 h1:X6e4OnuJOHb3znmkvSPEf1hVJB7aWAOW7jZX+fMhLf8=
github.com/conduitio/conduit-processor-sdk v0.0.0-20240228181202-04383fd82d29/go.mod h1:F/tmVZiXzZY60bzUSqwStM1TPaGvmQ9b1n1LuVtAOio=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
156 changes: 156 additions & 0 deletions pkg/plugin/processor/builtin/convertField.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate paramgen -output=convertField_paramgen.go convertFieldConfig

package builtin

import (
"context"
"fmt"
"strconv"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
)

type convertField struct {
referenceResolver sdk.ReferenceResolver
config convertFieldConfig

sdk.UnimplementedProcessor
}

func newConvertField() *convertField {
return &convertField{}
}

type convertFieldConfig struct {
// Field is the target field, as it would be addressed in a Go template (e.g. `.Payload.After.foo`).
// you can only convert fields that are under .Key and .Payload, and said fields should contain structured data.
Field string `json:"field" validate:"required,regex=^\\.(Payload|Key).*"`
// Type is the target field type after conversion, available options are: string, int, float, bool.
Type string `json:"type" validate:"required,inclusion=string|int|float|bool"`
}

func (p *convertField) Specification() (sdk.Specification, error) {
return sdk.Specification{
Name: "field.convert",
Summary: "Convert the type of a field.",
Description: `Convert takes the field of one type and converts it into another type (e.g. string to integer).
The applicable types are string, int, float and bool. Converting can be done between any combination of types. Note that
booleans will be converted to numeric values 1 (true) and 0 (false). Processor is only applicable to .Key, .Payload.Before
and .Payload.After prefixes, and only applicable if said fields contain structured data.
If the record contains raw JSON data, then use the processor "decode.json" to parse it into structured data first.`,
Version: "v0.1.0",
Author: "Meroxa, Inc.",
Parameters: convertFieldConfig{}.Parameters(),
}, nil
}

func (p *convertField) Configure(ctx context.Context, m map[string]string) error {
err := sdk.ParseConfig(ctx, m, &p.config, convertFieldConfig{}.Parameters())
if err != nil {
return cerrors.Errorf("failed to parse configuration: %w", err)
}

resolver, err := sdk.NewReferenceResolver(p.config.Field)
if err != nil {
return cerrors.Errorf("failed to parse the %q param: %w", "field", err)
}
p.referenceResolver = resolver
return nil
}

func (p *convertField) Open(context.Context) error {
return nil
}

func (p *convertField) Process(_ context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, record := range records {
rec := record
ref, err := p.referenceResolver.Resolve(&rec)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
newVal, err := p.stringToType(p.toString(ref.Get()), p.config.Type)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
err = ref.Set(newVal)
if err != nil {
return append(out, sdk.ErrorRecord{Error: err})
}
out = append(out, sdk.SingleRecord(rec))
}
return out
}

func (p *convertField) stringToType(value, typ string) (any, error) {
switch typ {
case "string":
return value, nil
case "int":
newVal, err := strconv.Atoi(value)
if err != nil {
return nil, err
}
return newVal, nil
case "float":
newVal, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, err
}
return newVal, nil
case "bool":
newVal, err := strconv.ParseBool(value)
if err != nil {
return nil, err
}
return newVal, nil
default:
return nil, cerrors.Errorf("undefined type %q", typ)
}
}

func (p *convertField) toString(value any) string {
switch v := value.(type) {
case string:
return v
case int:
return strconv.Itoa(v)
case float64:
return strconv.FormatFloat(v, 'f', -1, 64)
case bool:
if p.config.Type == "int" || p.config.Type == "float" {
return p.boolToStringNumber(v)
}
return strconv.FormatBool(v)
default:
return fmt.Sprintf("%v", value)
}
}

func (p *convertField) boolToStringNumber(b bool) string {
if b {
return "1"
}
return "0"
}

func (p *convertField) Teardown(context.Context) error {
return nil
}
140 changes: 140 additions & 0 deletions pkg/plugin/processor/builtin/convertField_examples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builtin

import (
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
)

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_StringToInt() {
p := newConvertField()

RunExample(p, example{
Description: `change .Key.id type to int`,
Config: map[string]string{"field": ".Key.id", "type": "int"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// - "id": "123"
// + "id": 123
// },
// "payload": {
// "before": null,
// "after": {
// "foo": "bar"
// }
// }
// }
}

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_IntToBool() {
p := newConvertField()

RunExample(p, example{
Description: `change .Payload.After.done type to bool`,
Config: map[string]string{"field": ".Payload.After.done", "type": "bool"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": "1"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123"},
Payload: opencdc.Change{After: opencdc.StructuredData{"done": true}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// "id": "123"
// },
// "payload": {
// "before": null,
// "after": {
// - "done": "1"
// + "done": true
// }
// }
// }
}

//nolint:govet // a more descriptive example description
func ExampleConvertFieldProcessor_FloatToString() {
p := newConvertField()

RunExample(p, example{
Description: `change .Key.id type to string`,
Config: map[string]string{"field": ".Key.id", "type": "string"},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": 123.345},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Key: opencdc.StructuredData{"id": "123.345"},
Payload: opencdc.Change{After: opencdc.StructuredData{"foo": "bar"}},
}})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,14 +1,14 @@
// {
// "position": null,
// "operation": "update",
// "metadata": null,
// "key": {
// - "id": 123.345
// + "id": "123.345"
// },
// "payload": {
// "before": null,
// "after": {
// "foo": "bar"
// }
// }
// }
}
33 changes: 33 additions & 0 deletions pkg/plugin/processor/builtin/convertField_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading