Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add error code #2598

Merged
merged 7 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions internal/converter/binary/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package binary
import (
"fmt"

"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
)

Expand All @@ -28,11 +29,17 @@ func GetConverter() (message.Converter, error) {
return converter, nil
}

func (c *Converter) Encode(d interface{}) ([]byte, error) {
func (c *Converter) Encode(d interface{}) (b []byte, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()

return nil, fmt.Errorf("not supported")
}

func (c *Converter) Decode(b []byte) (interface{}, error) {
func (c *Converter) Decode(b []byte) (m interface{}, err error) {
result := make(map[string]interface{})
result[message.DefaultField] = b
return result, nil
Expand Down
12 changes: 12 additions & 0 deletions internal/converter/binary/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"path"
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/errorx"
)

func TestMessageDecode(t *testing.T) {
Expand Down Expand Up @@ -48,3 +52,11 @@ func TestMessageDecode(t *testing.T) {
}
}
}

func TestError(t *testing.T) {
_, err := converter.Encode(nil)
require.Error(t, err)
errWithCode, ok := err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
}
9 changes: 8 additions & 1 deletion internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lf-edge/ekuiper/internal/converter/delimited"
"github.com/lf-edge/ekuiper/internal/converter/json"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
)

Expand All @@ -42,7 +43,13 @@ var converters = map[string]Instantiator{
},
}

func GetOrCreateConverter(options *ast.Options) (message.Converter, error) {
func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()

t := strings.ToLower(options.FORMAT)
if t == "" {
t = message.FormatJson
Expand Down
10 changes: 8 additions & 2 deletions internal/converter/delimited/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strconv"
"strings"

"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
)

Expand All @@ -41,7 +42,12 @@ func (c *Converter) SetColumns(cols []string) {
}

// Encode If no columns defined, the default order is sort by key
func (c *Converter) Encode(d interface{}) ([]byte, error) {
func (c *Converter) Encode(d interface{}) (b []byte, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
switch m := d.(type) {
case map[string]interface{}:
sb := &bytes.Buffer{}
Expand All @@ -68,7 +74,7 @@ func (c *Converter) Encode(d interface{}) ([]byte, error) {

// Decode If the cols is not set, the default key name is col1, col2, col3...
// The return value is always a map
func (c *Converter) Decode(b []byte) (interface{}, error) {
func (c *Converter) Decode(b []byte) (ma interface{}, err error) {
tokens := strings.Split(string(b), c.delimiter)
m := make(map[string]interface{})
if len(c.cols) == 0 {
Expand Down
13 changes: 13 additions & 0 deletions internal/converter/delimited/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/internal/testx"
"github.com/lf-edge/ekuiper/pkg/errorx"
)

func TestEncode(t *testing.T) {
Expand Down Expand Up @@ -114,3 +117,13 @@ func TestDecode(t *testing.T) {
}
}
}

func TestError(t *testing.T) {
converter, err := NewConverter(",")
require.NoError(t, err)
_, err = converter.Encode(nil)
require.Error(t, err)
errWithCode, ok := err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
}
21 changes: 16 additions & 5 deletions internal/converter/json/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/lf-edge/ekuiper/internal/converter/merge"
"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
)

Expand All @@ -35,13 +36,18 @@ func GetConverter() (message.Converter, error) {
return converter, nil
}

func (c *Converter) Encode(d interface{}) ([]byte, error) {
func (c *Converter) Encode(d interface{}) (b []byte, err error) {
return json.Marshal(d)
}

func (c *Converter) Decode(b []byte) (interface{}, error) {
func (c *Converter) Decode(b []byte) (m interface{}, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
var r0 interface{}
err := json.Unmarshal(b, &r0)
err = json.Unmarshal(b, &r0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,11 +139,16 @@ func mergeSchema(originSchema, newSchema map[string]*ast.JsonStreamField) (map[s
return merge.MergeSchema(originSchema, newSchema)
}

func (c *FastJsonConverter) Encode(d interface{}) ([]byte, error) {
func (c *FastJsonConverter) Encode(d interface{}) (b []byte, err error) {
return json.Marshal(d)
}

func (c *FastJsonConverter) Decode(b []byte) (interface{}, error) {
func (c *FastJsonConverter) Decode(b []byte) (m interface{}, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
c.RLock()
defer c.RUnlock()
if len(c.wildcardMap) > 0 {
Expand Down
18 changes: 17 additions & 1 deletion internal/converter/json/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/pkg/ast"
"github.com/lf-edge/ekuiper/pkg/errorx"
)

func TestMessageDecode(t *testing.T) {
Expand Down Expand Up @@ -387,7 +388,7 @@ func TestFastJsonConverterWithSchemaError(t *testing.T) {
f := NewFastJsonConverter("", "", tc.schema, false)
_, err := f.Decode(tc.payload)
require.Error(t, err)
require.Equal(t, err, tc.err)
require.Equal(t, err.Error(), tc.err.Error())
}
}

Expand Down Expand Up @@ -906,3 +907,18 @@ func TestSchemaless(t *testing.T) {
require.Equal(t, tc.expect, v)
}
}

func TestJsonError(t *testing.T) {
_, err := converter.Decode(nil)
require.Error(t, err)
errWithCode, ok := err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
// fastjson
c := NewFastJsonConverter("", "", nil, true)
_, err = c.Decode(nil)
require.Error(t, err)
errWithCode, ok = err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
}
17 changes: 14 additions & 3 deletions internal/converter/protobuf/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

kconf "github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/converter/static"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/message"
)

Expand Down Expand Up @@ -57,7 +58,12 @@ func NewConverter(schemaFile string, soFile string, messageName string) (message
}
}

func (c *Converter) Encode(d interface{}) ([]byte, error) {
func (c *Converter) Encode(d interface{}) (b []byte, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
switch m := d.(type) {
case map[string]interface{}:
msg, err := c.fc.encodeMap(c.descriptor, m)
Expand All @@ -70,9 +76,14 @@ func (c *Converter) Encode(d interface{}) ([]byte, error) {
}
}

func (c *Converter) Decode(b []byte) (interface{}, error) {
func (c *Converter) Decode(b []byte) (m interface{}, err error) {
defer func() {
if err != nil {
err = errorx.NewWithCode(errorx.CovnerterErr, err.Error())
}
}()
result := mf.NewDynamicMessage(c.descriptor)
err := result.Unmarshal(b)
err = result.Unmarshal(b)
if err != nil {
return nil, err
}
Expand Down
17 changes: 17 additions & 0 deletions internal/converter/protobuf/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/schema"
"github.com/lf-edge/ekuiper/internal/testx"
"github.com/lf-edge/ekuiper/pkg/errorx"
)

func TestEncode(t *testing.T) {
Expand Down Expand Up @@ -328,3 +330,18 @@ func TestEncodeDecodeForAllTypes(t *testing.T) {
})
}
}

func TestErr(t *testing.T) {
c, err := NewConverter("../../schema/test/test1.proto", "", "Person")
require.NoError(t, err)
_, err = c.Encode("123")
require.Error(t, err)
errWithCode, ok := err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
_, err = c.Decode(nil)
require.Error(t, err)
errWithCode, ok = err.(errorx.ErrorWithCode)
require.True(t, ok)
require.Equal(t, errorx.CovnerterErr, errWithCode.Code())
}
Loading
Loading