Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(module): modularize converter #2728

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,19 @@
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
"github.com/lf-edge/ekuiper/pkg/modules"
)

// Instantiator The format, schema information are passed in by stream options
// The columns information is defined in the source side, like file source
type Instantiator func(schemaFileName string, SchemaMessageName string, delimiter string) (message.Converter, error)

// init once and read only
var converters = map[string]Instantiator{
message.FormatJson: func(_ string, _ string, _ string) (message.Converter, error) {
func init() {
modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string) (message.Converter, error) {
return json.GetConverter()
},
message.FormatBinary: func(_ string, _ string, _ string) (message.Converter, error) {
})

Check warning on line 33 in internal/converter/converter.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/converter.go#L33

Added line #L33 was not covered by tests
modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string) (message.Converter, error) {
return binary.GetConverter()
},
message.FormatDelimited: func(_ string, _ string, delimiter string) (message.Converter, error) {
})
modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string) (message.Converter, error) {
return delimited.NewConverter(delimiter)
},
})
}

func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) {
Expand All @@ -66,13 +62,12 @@
schemaName := options.SCHEMAID
if schemaName != "" {
r := strings.Split(schemaName, ".")
if len(r) != 2 {
return nil, fmt.Errorf("invalid schemaId: %s", schemaName)
}
schemaFile = r[0]
schemaName = r[1]
if len(r) >= 2 {
schemaName = r[1]
}
}
if c, ok := converters[t]; ok {
if c, ok := modules.Converters[t]; ok {
return c(schemaFile, schemaName, options.DELIMITER)
}
return nil, fmt.Errorf("format type %s not supported", t)
Expand Down
9 changes: 5 additions & 4 deletions internal/converter/ext_converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -22,15 +22,16 @@ import (
"github.com/lf-edge/ekuiper/internal/pkg/def"
"github.com/lf-edge/ekuiper/internal/schema"
"github.com/lf-edge/ekuiper/pkg/message"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
converters[message.FormatProtobuf] = func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) {
modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) {
ffs, err := schema.GetSchemaFile(def.PROTOBUF, schemaFileName)
if err != nil {
return nil, err
}
return protobuf.NewConverter(ffs.SchemaFile, ffs.SoFile, schemaMessageName)
}
converters[message.FormatCustom] = custom.LoadConverter
})
modules.RegisterConverter(message.FormatCustom, custom.LoadConverter)
}
5 changes: 3 additions & 2 deletions internal/converter/ext_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/pkg/message"
"github.com/lf-edge/ekuiper/pkg/modules"
)

func init() {
converters["mock"] = func(_ string, _ string, _ string) (message.Converter, error) {
modules.RegisterConverter("mock", func(_ string, _ string, _ string) (message.Converter, error) {
return &MockConverter{}, nil
}
})

Check warning on line 31 in internal/converter/ext_mock.go

View check run for this annotation

Codecov / codecov/patch

internal/converter/ext_mock.go#L31

Added line #L31 was not covered by tests
}

// MockConverter mocks a slow converter for benchmark test
Expand Down
16 changes: 3 additions & 13 deletions internal/topo/operator/preprocessor_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,13 +15,10 @@
package operator

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"path"
"reflect"
"testing"
"time"
Expand All @@ -32,6 +29,7 @@ import (
"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/converter"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode"
"github.com/lf-edge/ekuiper/internal/xsql"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
Expand Down Expand Up @@ -993,15 +991,7 @@ func TestPreprocessorError(t *testing.T) {
}

func TestPreprocessorForBinary(t *testing.T) {
docsFolder, err := conf.GetLoc("docs/")
if err != nil {
t.Errorf("Cannot find docs folder: %v", err)
}
image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
if err != nil {
t.Errorf("Cannot read image: %v", err)
}
b64img := base64.StdEncoding.EncodeToString(image)
image, b64img := mocknode.GetImg()
tests := []struct {
stmt *ast.StreamStmt
data []byte
Expand Down
20 changes: 5 additions & 15 deletions internal/topo/topotest/mocknode/mock_data.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,7 @@ package mocknode

import (
"encoding/base64"
"os"
"path"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/xsql"
)

Expand Down Expand Up @@ -1187,17 +1184,10 @@ var TestData = map[string][]*xsql.Tuple{
},
}

var Image, _ = getImg()
var Image, _ = GetImg()

func getImg() ([]byte, string) {
docsFolder, err := conf.GetLoc("docs/")
if err != nil {
conf.Log.Fatalf("Cannot find docs folder: %v", err)
}
image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg"))
if err != nil {
conf.Log.Fatalf("Cannot read image: %v", err)
}
b64img := base64.StdEncoding.EncodeToString(image)
func GetImg() ([]byte, string) {
b64img := "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAICAIAAABLbSncAAAA2ElEQVR4nADIADf/BC0WH5nMXfXHJV5I6UzACsst5FWwnirr5ACEv1em5DJEytIpPnKs3g91xCvGpd4hE5gDZYFg109pkvLp4R9Dxcjo8hyb1QCEiMkUArLqIWvHEgIkyCl6Iefgs/I1EOoH2ch98gLoWT0RcAjxv4G05nKc5BDoz+xuQwM3490Ax3L9MTmNm7YCUfjfuW2T7loUJy/9QMN4BK765ATI+UPcFsvYLHyEvjGaqzUIQWkIpQRpQ/Ok1bjz57qgxd7NH+xIQnMxsPgKw4QBAAD//1xYZo1yq5mgAAAAAElFTkSuQmCC"
image, _ := base64.StdEncoding.DecodeString(b64img)
return image, b64img
}
5 changes: 3 additions & 2 deletions internal/xsql/parser.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 EMQ Technologies Co., Ltd.
// Copyright 2022-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ import (
"github.com/lf-edge/ekuiper/internal/binder/function"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/message"
"github.com/lf-edge/ekuiper/pkg/modules"
)

type Parser struct {
Expand Down Expand Up @@ -1193,7 +1194,7 @@ func validateStream(stmt *ast.StreamStmt) error {
return fmt.Errorf("'binary' format stream can have only one field")
}
default:
if !message.IsFormatSupported(lf) {
if !modules.IsFormatSupported(lf) {
return fmt.Errorf("option 'format=%s' is invalid", f)
}
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/message/artifacts.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,9 @@

package message

import "github.com/lf-edge/ekuiper/pkg/ast"
import (
"github.com/lf-edge/ekuiper/pkg/ast"
)

const (
FormatBinary = "binary"
Expand All @@ -27,21 +29,16 @@ const (
MetaKey = "__meta"
)

func IsFormatSupported(format string) bool {
switch format {
case FormatBinary, FormatJson, FormatProtobuf, FormatCustom, FormatDelimited:
return true
default:
return false
}
}

// Converter converts bytes & map or []map according to the schema
type Converter interface {
Encode(d interface{}) ([]byte, error)
Decode(b []byte) (interface{}, error)
}

// ConverterProvider The format, schema information are passed in by stream options
// The columns information is defined in the source side, like file source
type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string) (Converter, error)

type SchemaMergeAbleConverter interface {
MergeSchema(key, datasource string, newSchema map[string]*ast.JsonStreamField, isWildcard bool) error
DetachSchema(key string) error
Expand Down
30 changes: 11 additions & 19 deletions pkg/message/artifacts_test.go → pkg/modules/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 EMQ Technologies Co., Ltd.
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,26 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package message
package modules

import (
"testing"
import "github.com/lf-edge/ekuiper/pkg/message"

"github.com/stretchr/testify/assert"
)
var Converters = map[string]message.ConverterProvider{}

func TestIsFormatSupported(t *testing.T) {
formats := []string{
FormatBinary, FormatJson, FormatProtobuf, FormatDelimited, FormatCustom,
}
for _, format := range formats {
assert.True(t, IsFormatSupported(format))
}
// RegisterConverter registers a converter with the given name.
func RegisterConverter(name string, provider message.ConverterProvider) {
Converters[name] = provider
}

badFormats := []string{
"BINARY", "Json", "DIY",
}
for _, format := range badFormats {
assert.False(t, IsFormatSupported(format))
}
func IsFormatSupported(format string) bool {
_, ok := Converters[format]
return ok
}
Loading