From 538862bb6bdb6d19b194010500e60cb20ff4e57a Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Fri, 22 Mar 2024 10:34:04 +0800 Subject: [PATCH] feat(module): modularize converter Signed-off-by: Jiyong Huang --- internal/converter/converter.go | 29 ++++++++---------- internal/converter/ext_converter.go | 9 +++--- internal/converter/ext_mock.go | 5 ++-- internal/topo/operator/preprocessor_test.go | 16 ++-------- internal/topo/topotest/mocknode/mock_data.go | 20 ++++--------- internal/xsql/parser.go | 5 ++-- pkg/message/artifacts.go | 19 +++++------- .../converter.go} | 30 +++++++------------ 8 files changed, 50 insertions(+), 83 deletions(-) rename pkg/{message/artifacts_test.go => modules/converter.go} (53%) diff --git a/internal/converter/converter.go b/internal/converter/converter.go index b1183aecfa..cdd9a44b2d 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -24,23 +24,19 @@ import ( "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/errorx" "github.com/lf-edge/ekuiper/pkg/message" + "github.com/lf-edge/ekuiper/pkg/modules" ) -// Instantiator The format, schema information are passed in by stream options -// The columns information is defined in the source side, like file source -type Instantiator func(schemaFileName string, SchemaMessageName string, delimiter string) (message.Converter, error) - -// init once and read only -var converters = map[string]Instantiator{ - message.FormatJson: func(_ string, _ string, _ string) (message.Converter, error) { +func init() { + modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string) (message.Converter, error) { return json.GetConverter() - }, - message.FormatBinary: func(_ string, _ string, _ string) (message.Converter, error) { + }) + modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string) (message.Converter, error) { return binary.GetConverter() - }, - message.FormatDelimited: func(_ string, _ string, delimiter string) (message.Converter, error) { + }) + modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string) (message.Converter, error) { return delimited.NewConverter(delimiter) - }, + }) } func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) { @@ -66,13 +62,12 @@ func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) schemaName := options.SCHEMAID if schemaName != "" { r := strings.Split(schemaName, ".") - if len(r) != 2 { - return nil, fmt.Errorf("invalid schemaId: %s", schemaName) - } schemaFile = r[0] - schemaName = r[1] + if len(r) >= 2 { + schemaName = r[1] + } } - if c, ok := converters[t]; ok { + if c, ok := modules.Converters[t]; ok { return c(schemaFile, schemaName, options.DELIMITER) } return nil, fmt.Errorf("format type %s not supported", t) diff --git a/internal/converter/ext_converter.go b/internal/converter/ext_converter.go index efe7159a84..d402bc3b25 100644 --- a/internal/converter/ext_converter.go +++ b/internal/converter/ext_converter.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,15 +22,16 @@ import ( "github.com/lf-edge/ekuiper/internal/pkg/def" "github.com/lf-edge/ekuiper/internal/schema" "github.com/lf-edge/ekuiper/pkg/message" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - converters[message.FormatProtobuf] = func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) { + modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) { ffs, err := schema.GetSchemaFile(def.PROTOBUF, schemaFileName) if err != nil { return nil, err } return protobuf.NewConverter(ffs.SchemaFile, ffs.SoFile, schemaMessageName) - } - converters[message.FormatCustom] = custom.LoadConverter + }) + modules.RegisterConverter(message.FormatCustom, custom.LoadConverter) } diff --git a/internal/converter/ext_mock.go b/internal/converter/ext_mock.go index 352ff46136..639b4215cc 100644 --- a/internal/converter/ext_mock.go +++ b/internal/converter/ext_mock.go @@ -22,12 +22,13 @@ import ( "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/pkg/message" + "github.com/lf-edge/ekuiper/pkg/modules" ) func init() { - converters["mock"] = func(_ string, _ string, _ string) (message.Converter, error) { + modules.RegisterConverter("mock", func(_ string, _ string, _ string) (message.Converter, error) { return &MockConverter{}, nil - } + }) } // MockConverter mocks a slow converter for benchmark test diff --git a/internal/topo/operator/preprocessor_test.go b/internal/topo/operator/preprocessor_test.go index 22d43e634f..c578c76183 100644 --- a/internal/topo/operator/preprocessor_test.go +++ b/internal/topo/operator/preprocessor_test.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,13 +15,10 @@ package operator import ( - "encoding/base64" "encoding/json" "errors" "fmt" "log" - "os" - "path" "reflect" "testing" "time" @@ -32,6 +29,7 @@ import ( "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/converter" "github.com/lf-edge/ekuiper/internal/topo/context" + "github.com/lf-edge/ekuiper/internal/topo/topotest/mocknode" "github.com/lf-edge/ekuiper/internal/xsql" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/cast" @@ -993,15 +991,7 @@ func TestPreprocessorError(t *testing.T) { } func TestPreprocessorForBinary(t *testing.T) { - docsFolder, err := conf.GetLoc("docs/") - if err != nil { - t.Errorf("Cannot find docs folder: %v", err) - } - image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg")) - if err != nil { - t.Errorf("Cannot read image: %v", err) - } - b64img := base64.StdEncoding.EncodeToString(image) + image, b64img := mocknode.GetImg() tests := []struct { stmt *ast.StreamStmt data []byte diff --git a/internal/topo/topotest/mocknode/mock_data.go b/internal/topo/topotest/mocknode/mock_data.go index 31790aef83..995d28d7ec 100644 --- a/internal/topo/topotest/mocknode/mock_data.go +++ b/internal/topo/topotest/mocknode/mock_data.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,10 +16,7 @@ package mocknode import ( "encoding/base64" - "os" - "path" - "github.com/lf-edge/ekuiper/internal/conf" "github.com/lf-edge/ekuiper/internal/xsql" ) @@ -1187,17 +1184,10 @@ var TestData = map[string][]*xsql.Tuple{ }, } -var Image, _ = getImg() +var Image, _ = GetImg() -func getImg() ([]byte, string) { - docsFolder, err := conf.GetLoc("docs/") - if err != nil { - conf.Log.Fatalf("Cannot find docs folder: %v", err) - } - image, err := os.ReadFile(path.Join(docsFolder, "cover.jpg")) - if err != nil { - conf.Log.Fatalf("Cannot read image: %v", err) - } - b64img := base64.StdEncoding.EncodeToString(image) +func GetImg() ([]byte, string) { + b64img := "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAICAIAAABLbSncAAAA2ElEQVR4nADIADf/BC0WH5nMXfXHJV5I6UzACsst5FWwnirr5ACEv1em5DJEytIpPnKs3g91xCvGpd4hE5gDZYFg109pkvLp4R9Dxcjo8hyb1QCEiMkUArLqIWvHEgIkyCl6Iefgs/I1EOoH2ch98gLoWT0RcAjxv4G05nKc5BDoz+xuQwM3490Ax3L9MTmNm7YCUfjfuW2T7loUJy/9QMN4BK765ATI+UPcFsvYLHyEvjGaqzUIQWkIpQRpQ/Ok1bjz57qgxd7NH+xIQnMxsPgKw4QBAAD//1xYZo1yq5mgAAAAAElFTkSuQmCC" + image, _ := base64.StdEncoding.DecodeString(b64img) return image, b64img } diff --git a/internal/xsql/parser.go b/internal/xsql/parser.go index c0ad5b30de..6077511468 100644 --- a/internal/xsql/parser.go +++ b/internal/xsql/parser.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 EMQ Technologies Co., Ltd. +// Copyright 2022-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import ( "github.com/lf-edge/ekuiper/internal/binder/function" "github.com/lf-edge/ekuiper/pkg/ast" "github.com/lf-edge/ekuiper/pkg/message" + "github.com/lf-edge/ekuiper/pkg/modules" ) type Parser struct { @@ -1193,7 +1194,7 @@ func validateStream(stmt *ast.StreamStmt) error { return fmt.Errorf("'binary' format stream can have only one field") } default: - if !message.IsFormatSupported(lf) { + if !modules.IsFormatSupported(lf) { return fmt.Errorf("option 'format=%s' is invalid", f) } } diff --git a/pkg/message/artifacts.go b/pkg/message/artifacts.go index 50f5ebc47f..9c441d246b 100644 --- a/pkg/message/artifacts.go +++ b/pkg/message/artifacts.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,7 +14,9 @@ package message -import "github.com/lf-edge/ekuiper/pkg/ast" +import ( + "github.com/lf-edge/ekuiper/pkg/ast" +) const ( FormatBinary = "binary" @@ -27,21 +29,16 @@ const ( MetaKey = "__meta" ) -func IsFormatSupported(format string) bool { - switch format { - case FormatBinary, FormatJson, FormatProtobuf, FormatCustom, FormatDelimited: - return true - default: - return false - } -} - // Converter converts bytes & map or []map according to the schema type Converter interface { Encode(d interface{}) ([]byte, error) Decode(b []byte) (interface{}, error) } +// ConverterProvider The format, schema information are passed in by stream options +// The columns information is defined in the source side, like file source +type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string) (Converter, error) + type SchemaMergeAbleConverter interface { MergeSchema(key, datasource string, newSchema map[string]*ast.JsonStreamField, isWildcard bool) error DetachSchema(key string) error diff --git a/pkg/message/artifacts_test.go b/pkg/modules/converter.go similarity index 53% rename from pkg/message/artifacts_test.go rename to pkg/modules/converter.go index f502adb90d..e11f724cea 100644 --- a/pkg/message/artifacts_test.go +++ b/pkg/modules/converter.go @@ -1,4 +1,4 @@ -// Copyright 2023 EMQ Technologies Co., Ltd. +// Copyright 2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,26 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -package message +package modules -import ( - "testing" +import "github.com/lf-edge/ekuiper/pkg/message" - "github.com/stretchr/testify/assert" -) +var Converters = map[string]message.ConverterProvider{} -func TestIsFormatSupported(t *testing.T) { - formats := []string{ - FormatBinary, FormatJson, FormatProtobuf, FormatDelimited, FormatCustom, - } - for _, format := range formats { - assert.True(t, IsFormatSupported(format)) - } +// RegisterConverter registers a converter with the given name. +func RegisterConverter(name string, provider message.ConverterProvider) { + Converters[name] = provider +} - badFormats := []string{ - "BINARY", "Json", "DIY", - } - for _, format := range badFormats { - assert.False(t, IsFormatSupported(format)) - } +func IsFormatSupported(format string) bool { + _, ok := Converters[format] + return ok }