Skip to content

Commit

Permalink
VStreamer: improve representation of integers in json data types (vit…
Browse files Browse the repository at this point in the history
…essio#12630)

* Add signed and unsigned integer types to forked ason library to fix issue of integers being parsed as float64 by the source binlog parser. This results in larger integers being stored as floats on the target and sent with scientific notation in vstream events.

Signed-off-by: Rohit Nayak <[email protected]>

* Use go.mod reference to the ajson module to make it easier to switch to the upstream module once changes are made there

Signed-off-by: Rohit Nayak <[email protected]>

---------

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Mar 16, 2023
1 parent 0489fdb commit 2b43fd7
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 79 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/spyzhov/ajson v0.7.2
github.com/stretchr/testify v1.8.1
github.com/tchap/go-patricia v2.3.0+incompatible
github.com/tidwall/gjson v1.12.1
Expand Down Expand Up @@ -112,6 +111,7 @@ require (
github.com/kr/pretty v0.3.1
github.com/kr/text v0.2.0
github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249
github.com/spyzhov/ajson v0.8.0
golang.org/x/exp v0.0.0-20230131160201-f062dba9d201
modernc.org/sqlite v1.20.3
)
Expand Down Expand Up @@ -216,3 +216,5 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/spyzhov/ajson v0.8.0 => github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c h1:Y/4qcogoZA2WUtLWMk/yXfJSpaIG3mK3r9Lw4kaARL4=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -734,8 +736,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/spyzhov/ajson v0.7.2 h1:kyl+ovUoId/RSBbSbCm31xyQvPixA6Sxgvb0eWyt1Ko=
github.com/spyzhov/ajson v0.7.2/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
196 changes: 147 additions & 49 deletions go/mysql/binlog_event_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (jh *BinlogJSON) register(typ jsonDataType, Plugin jsonPlugin) {
func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
Plugin := jh.plugins[typ]
if Plugin == nil {
return nil, fmt.Errorf("Plugin not found for type %d", typ)
return nil, fmt.Errorf("plugin not found for type %d", typ)
}
return Plugin.getNode(typ, data, pos)
}
Expand Down Expand Up @@ -316,59 +316,157 @@ type intPlugin struct {

var _ jsonPlugin = (*intPlugin)(nil)

func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
func (ipl intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value int64) {
var val uint64
var val2 float64
size := ih.sizes[typ]
var val2 int64
size := ipl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonInt16:
val2 = float64(int16(val))
case jsonUint16:
val2 = float64(uint16(val))
val2 = int64(int16(val))
case jsonInt32:
val2 = float64(int32(val))
case jsonUint32:
val2 = float64(uint32(val))
val2 = int64(int32(val))
case jsonInt64:
val2 = float64(int64(val))
case jsonUint64:
val2 = float64(val)
case jsonDouble:
val2 = math.Float64frombits(val)
val2 = int64(val)
}
return val2
}

func (ih intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ih.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
func (ipl intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ipl.getVal(typ, data, pos)
node = ajson.IntegerNode("", val)
return node, nil
}

func newIntPlugin() *intPlugin {
ih := &intPlugin{
ipl := &intPlugin{
info: &jsonPluginInfo{
name: "Int",
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16, jsonUint16, jsonUint32, jsonUint64, jsonDouble},
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16},
},
sizes: make(map[jsonDataType]int),
}
ipl.sizes = map[jsonDataType]int{
jsonInt64: 8,
jsonInt32: 4,
jsonInt16: 2,
}
for _, typ := range ipl.info.types {
binlogJSON.register(typ, ipl)
}
return ipl
}

//endregion

//region uint plugin

func init() {
newUintPlugin()
}

type uintPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*uintPlugin)(nil)

func (upl uintPlugin) getVal(typ jsonDataType, data []byte, pos int) (value uint64) {
var val uint64
var val2 uint64
size := upl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonUint16:
val2 = uint64(uint16(val))
case jsonUint32:
val2 = uint64(uint32(val))
case jsonUint64:
val2 = val
}
return val2
}

func (upl uintPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := upl.getVal(typ, data, pos)
node = ajson.UnsignedIntegerNode("", val)
return node, nil
}

func newUintPlugin() *uintPlugin {
upl := &uintPlugin{
info: &jsonPluginInfo{
name: "Uint",
types: []jsonDataType{jsonUint16, jsonUint32, jsonUint64},
},
sizes: make(map[jsonDataType]int),
}
ih.sizes = map[jsonDataType]int{
upl.sizes = map[jsonDataType]int{
jsonUint64: 8,
jsonInt64: 8,
jsonUint32: 4,
jsonInt32: 4,
jsonUint16: 2,
jsonInt16: 2,
}
for _, typ := range upl.info.types {
binlogJSON.register(typ, upl)
}
return upl
}

//endregion

//region float plugin

func init() {
newFloatPlugin()
}

type floatPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*floatPlugin)(nil)

func (flp floatPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
var val uint64
var val2 float64
size := flp.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonDouble:
val2 = math.Float64frombits(val)
}
return val2
}

func (flp floatPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := flp.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
return node, nil
}

func newFloatPlugin() *floatPlugin {
fp := &floatPlugin{
info: &jsonPluginInfo{
name: "Float",
types: []jsonDataType{jsonDouble},
},
sizes: make(map[jsonDataType]int),
}
fp.sizes = map[jsonDataType]int{
jsonDouble: 8,
}
for _, typ := range ih.info.types {
binlogJSON.register(typ, ih)
for _, typ := range fp.info.types {
binlogJSON.register(typ, fp)
}
return ih
return fp
}

//endregion
Expand All @@ -385,7 +483,7 @@ type literalPlugin struct {

var _ jsonPlugin = (*literalPlugin)(nil)

func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (lpl literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := jsonDataLiteral(data[pos])
switch val {
case jsonNullLiteral:
Expand All @@ -401,14 +499,14 @@ func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *a
}

func newLiteralPlugin() *literalPlugin {
lh := &literalPlugin{
lpl := &literalPlugin{
info: &jsonPluginInfo{
name: "Literal",
types: []jsonDataType{jsonLiteral},
},
}
binlogJSON.register(jsonLiteral, lh)
return lh
binlogJSON.register(jsonLiteral, lpl)
return lpl
}

//endregion
Expand All @@ -427,7 +525,7 @@ var _ jsonPlugin = (*opaquePlugin)(nil)

// other types are stored as catch-all opaque types: documentation on these is scarce.
// we currently know about (and support) date/time/datetime/decimal.
func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
dataType := data[pos]
start := 3 // account for length of stored value
end := start + 8 // all currently supported opaque data types are 8 bytes in size
Expand Down Expand Up @@ -484,14 +582,14 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newOpaquePlugin() *opaquePlugin {
oh := &opaquePlugin{
opl := &opaquePlugin{
info: &jsonPluginInfo{
name: "Opaque",
types: []jsonDataType{jsonOpaque},
},
}
binlogJSON.register(jsonOpaque, oh)
return oh
binlogJSON.register(jsonOpaque, opl)
return opl
}

//endregion
Expand All @@ -508,22 +606,22 @@ type stringPlugin struct {

var _ jsonPlugin = (*stringPlugin)(nil)

func (sh stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (spl stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
size, pos := readVariableLength(data, pos)
node = ajson.StringNode("", string(data[pos:pos+size]))

return node, nil
}

func newStringPlugin() *stringPlugin {
sh := &stringPlugin{
spl := &stringPlugin{
info: &jsonPluginInfo{
name: "String",
types: []jsonDataType{jsonString},
},
}
binlogJSON.register(jsonString, sh)
return sh
binlogJSON.register(jsonString, spl)
return spl
}

//endregion
Expand All @@ -542,7 +640,7 @@ var _ jsonPlugin = (*arrayPlugin)(nil)

// arrays are stored thus:
// | type_identifier(one of [2,3]) | elem count | obj size | list of offsets+lengths of values | actual values |
func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (apl arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Array %s, len %d", jsonDataTypeToString(uint(typ)), len(data))
var nodes []*ajson.Node
var elem *ajson.Node
Expand All @@ -565,15 +663,15 @@ func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajs
}

func newArrayPlugin() *arrayPlugin {
ah := &arrayPlugin{
apl := &arrayPlugin{
info: &jsonPluginInfo{
name: "Array",
types: []jsonDataType{jsonSmallArray, jsonLargeArray},
},
}
binlogJSON.register(jsonSmallArray, ah)
binlogJSON.register(jsonLargeArray, ah)
return ah
binlogJSON.register(jsonSmallArray, apl)
binlogJSON.register(jsonLargeArray, apl)
return apl
}

//endregion
Expand All @@ -592,7 +690,7 @@ var _ jsonPlugin = (*objectPlugin)(nil)

// objects are stored thus:
// | type_identifier(0/1) | elem count | obj size | list of offsets+lengths of keys | list of offsets+lengths of values | actual keys | actual values |
func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Type is %s, len %d", jsonDataTypeToString(uint(typ)), len(data))

// "large" decides number of bytes used to specify element count and total object size: 4 bytes for large, 2 for small
Expand Down Expand Up @@ -640,15 +738,15 @@ func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newObjectPlugin() *objectPlugin {
oh := &objectPlugin{
opl := &objectPlugin{
info: &jsonPluginInfo{
name: "Object",
types: []jsonDataType{jsonSmallObject, jsonLargeObject},
},
}
binlogJSON.register(jsonSmallObject, oh)
binlogJSON.register(jsonLargeObject, oh)
return oh
binlogJSON.register(jsonSmallObject, opl)
binlogJSON.register(jsonLargeObject, opl)
return opl
}

//endregion
Loading

0 comments on commit 2b43fd7

Please sign in to comment.