diff --git a/chcol.go b/chcol.go index 7adbb8a1da..fff58c831f 100644 --- a/chcol.go +++ b/chcol.go @@ -24,6 +24,8 @@ import "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" type ( Variant = chcol.Variant VariantWithType = chcol.VariantWithType + Dynamic = chcol.Dynamic + DynamicWithType = chcol.DynamicWithType ) // NewVariant creates a new Variant with the given value @@ -35,3 +37,13 @@ func NewVariant(v any) Variant { func NewVariantWithType(v any, chType string) VariantWithType { return chcol.NewVariantWithType(v, chType) } + +// NewDynamic creates a new Dynamic with the given value +func NewDynamic(v any) Dynamic { + return chcol.NewDynamic(v) +} + +// NewDynamicWithType creates a new Dynamic with the given value and ClickHouse type +func NewDynamicWithType(v any, chType string) DynamicWithType { + return chcol.NewDynamicWithType(v, chType) +} diff --git a/examples/clickhouse_api/dynamic.go b/examples/clickhouse_api/dynamic.go new file mode 100644 index 0000000000..ef354b769e --- /dev/null +++ b/examples/clickhouse_api/dynamic.go @@ -0,0 +1,136 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse_api + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" +) + +func DynamicExample() error { + ctx := context.Background() + + conn, err := GetNativeConnection(clickhouse.Settings{ + "allow_experimental_dynamic_type": true, + }, nil, nil) + if err != nil { + return err + } + + err = conn.Exec(ctx, "DROP TABLE IF EXISTS go_dynamic_example") + if err != nil { + return err + } + + err = conn.Exec(ctx, ` + CREATE TABLE go_dynamic_example ( + c Dynamic + ) ENGINE = Memory + `) + if err != nil { + return err + } + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO go_dynamic_example (c)") + if err != nil { + return err + } + + if err = batch.Append(true); err != nil { + return err + } + + if err = batch.Append(int64(42)); err != nil { + return err + } + + if err = batch.Append("example"); err != nil { + return err + } + + if err = batch.Append(clickhouse.NewVariant("example dynamic")); err != nil { + return err + } + + if err = batch.Append(clickhouse.NewVariantWithType("example dynamic with specific type", "String")); err != nil { + return err + } + + if err = batch.Append(nil); err != nil { + return err + } + + if err = batch.Send(); err != nil { + return err + } + + // Switch on Go Type + + rows, err := conn.Query(ctx, "SELECT c FROM go_dynamic_example") + if err != nil { + return err + } + + for i := 0; rows.Next(); i++ { + var row clickhouse.Variant + err := rows.Scan(&row) + if err != nil { + return fmt.Errorf("failed to scan row index %d: %w", i, err) + } + + switch row.Any().(type) { + case bool: + fmt.Printf("row at index %d is Bool: %v\n", i, row.Any()) + case int64: + fmt.Printf("row at index %d is Int64: %v\n", i, row.Any()) + case string: + fmt.Printf("row at index %d is String: %v\n", i, row.Any()) + case nil: + fmt.Printf("row at index %d is NULL\n", i) + } + } + + // Switch on ClickHouse Type + + rows, err = conn.Query(ctx, "SELECT c FROM go_dynamic_example") + if err != nil { + return err + } + + for i := 0; rows.Next(); i++ { + var row clickhouse.VariantWithType + err := rows.Scan(&row) + if err != nil { + return fmt.Errorf("failed to scan row index %d: %w", i, err) + } + + switch row.Type() { + case "Bool": + fmt.Printf("row at index %d is bool: %v\n", i, row.Any()) + case "Int64": + fmt.Printf("row at index %d is int64: %v\n", i, row.Any()) + case "String": + fmt.Printf("row at index %d is string: %v\n", i, row.Any()) + case "": + fmt.Printf("row at index %d is nil\n", i) + } + } + + return nil +} diff --git a/examples/clickhouse_api/main_test.go b/examples/clickhouse_api/main_test.go index 40b59e36b2..d1b15637d9 100644 --- a/examples/clickhouse_api/main_test.go +++ b/examples/clickhouse_api/main_test.go @@ -213,3 +213,7 @@ func TestSSLNoVerify(t *testing.T) { func TestVariantExample(t *testing.T) { require.NoError(t, VariantExample()) } + +func TestDynamicExample(t *testing.T) { + require.NoError(t, DynamicExample()) +} diff --git a/lib/chcol/dynamic.go b/lib/chcol/dynamic.go new file mode 100644 index 0000000000..7e081376a7 --- /dev/null +++ b/lib/chcol/dynamic.go @@ -0,0 +1,35 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package chcol + +type Dynamic = Variant + +// NewDynamic creates a new Dynamic with the given value +func NewDynamic(v any) Dynamic { + return Dynamic{value: v} +} + +type DynamicWithType = VariantWithType + +// NewDynamicWithType creates a new Dynamic with the given value and ClickHouse type +func NewDynamicWithType(v any, chType string) DynamicWithType { + return DynamicWithType{ + Variant: Variant{value: v}, + chType: chType, + } +} diff --git a/lib/column/codegen/column.tpl b/lib/column/codegen/column.tpl index 8697a67930..78eed58c56 100644 --- a/lib/column/codegen/column.tpl +++ b/lib/column/codegen/column.tpl @@ -122,6 +122,8 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return &Point{name: name}, nil case "String": return &String{name: name, col: colStrProvider()}, nil + case "SharedVariant": + return &SharedVariant{name: name}, nil case "Object('json')": return &JSONObject{name: name, root: true, tz: tz}, nil } @@ -133,6 +135,8 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return (&Tuple{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Variant("): return (&Variant{name: name}).parse(t, tz) + case strings.HasPrefix(string(t), "Dynamic"): + return (&Dynamic{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Decimal("): return (&Decimal{name: name}).parse(t) case strings.HasPrefix(strType, "Nested("): @@ -195,6 +199,7 @@ var ( scanTypeDecimal = reflect.TypeOf(decimal.Decimal{}) scanTypeMultiPolygon = reflect.TypeOf(orb.MultiPolygon{}) scanTypeVariant = reflect.TypeOf(chcol.Variant{}) + scanTypeDynamic = reflect.TypeOf(chcol.Dynamic{}) ) {{- range . }} diff --git a/lib/column/codegen/dynamic.tpl b/lib/column/codegen/dynamic.tpl new file mode 100644 index 0000000000..b725c069c5 --- /dev/null +++ b/lib/column/codegen/dynamic.tpl @@ -0,0 +1,60 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by make codegen DO NOT EDIT. +// source: lib/column/codegen/dynamic.tpl + +package column + +import ( + "database/sql" + "encoding/json" + "github.com/ClickHouse/ch-go/proto" + "github.com/google/uuid" + "github.com/paulmach/orb" + "time" +) + +// inferClickHouseTypeFromGoType takes a Go interface{} and converts it to a ClickHouse type. +// Returns empty string if type was not matched. +// This is best effort and does not work for all types. +// Optimally, users should provide a type using DynamicWithType. +func inferClickHouseTypeFromGoType(v any) string { + switch v.(type) { + {{- range . }} + case {{ .GoType }}: + return "{{ .ChType }}" + case *{{ .GoType }}: + return "{{ .ChType }}" + {{- end }} + {{- range . }} + {{- if .SkipArray }} + {{- else}} + case []{{ .GoType }}: + return "Array({{ .ChType }})" + {{- end}} + case []*{{ .GoType }}: + return "Array({{ .ChType }})" + {{- end }} + {{- range . }} + case map[string]{{ .GoType }}: + return "Map(String, {{ .ChType }})" + {{- end }} + default: + return "" + } +} diff --git a/lib/column/codegen/main.go b/lib/column/codegen/main.go index f427acdd54..46f8875040 100644 --- a/lib/column/codegen/main.go +++ b/lib/column/codegen/main.go @@ -34,16 +34,22 @@ var ( columnSrc string //go:embed array.tpl arraySrc string + //go:embed dynamic.tpl + dynamicSrc string ) var ( types []_type supportedGoTypes []string + dynamicTypes []_type ) type _type struct { - Size int + Size int + ChType string GoType string + + SkipArray bool } func init() { @@ -72,6 +78,7 @@ func init() { for _, typ := range types { supportedGoTypes = append(supportedGoTypes, typ.GoType) } + supportedGoTypes = append(supportedGoTypes, "string", "[]byte", "sql.NullString", "int", "uint", "big.Int", "decimal.Decimal", @@ -81,6 +88,37 @@ func init() { "netip.Addr", "net.IP", "proto.IPv6", "[16]byte", "orb.MultiPolygon", "orb.Point", "orb.Polygon", "orb.Ring", ) + + dynamicTypes = make([]_type, 0, len(types)) + for _, typ := range types { + + if typ.GoType == "uint8" { + // Prevent conflict with []byte and []uint8 + typ.SkipArray = true + dynamicTypes = append(dynamicTypes, typ) + continue + } + + dynamicTypes = append(dynamicTypes, typ) + } + + // Best-effort type matching for Dynamic inference + dynamicTypes = append(dynamicTypes, []_type{ + {ChType: "String", GoType: "string"}, + {ChType: "String", GoType: "json.RawMessage"}, + {ChType: "String", GoType: "sql.NullString"}, + {ChType: "Bool", GoType: "bool"}, + {ChType: "Bool", GoType: "sql.NullBool"}, + {ChType: "DateTime64(3)", GoType: "time.Time"}, + {ChType: "DateTime64(3)", GoType: "sql.NullTime"}, + {ChType: "UUID", GoType: "uuid.UUID"}, + {ChType: "IPv6", GoType: "proto.IPv6"}, + {ChType: "MultiPolygon", GoType: "orb.MultiPolygon"}, + {ChType: "Point", GoType: "orb.Point"}, + {ChType: "Polygon", GoType: "orb.Polygon"}, + {ChType: "Ring", GoType: "orb.Ring"}, + }...) + } func write(name string, v any, t *template.Template) error { out := new(bytes.Buffer) @@ -107,8 +145,9 @@ func main() { template *template.Template args any }{ - "column_gen": {template.Must(template.New("column").Parse(columnSrc)), types}, - "array_gen": {template.Must(template.New("array").Parse(arraySrc)), supportedGoTypes}, + "column_gen": {template.Must(template.New("column").Parse(columnSrc)), types}, + "array_gen": {template.Must(template.New("array").Parse(arraySrc)), supportedGoTypes}, + "dynamic_gen": {template.Must(template.New("dynamic").Parse(dynamicSrc)), dynamicTypes}, } { if err := write(name, tpl.args, tpl.template); err != nil { log.Fatal(err) diff --git a/lib/column/column_gen.go b/lib/column/column_gen.go index 8367e3fe14..cc601212f5 100644 --- a/lib/column/column_gen.go +++ b/lib/column/column_gen.go @@ -138,6 +138,8 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return &Point{name: name}, nil case "String": return &String{name: name, col: colStrProvider()}, nil + case "SharedVariant": + return &SharedVariant{name: name}, nil case "Object('json')": return &JSONObject{name: name, root: true, tz: tz}, nil } @@ -149,6 +151,8 @@ func (t Type) Column(name string, tz *time.Location) (Interface, error) { return (&Tuple{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Variant("): return (&Variant{name: name}).parse(t, tz) + case strings.HasPrefix(string(t), "Dynamic"): + return (&Dynamic{name: name}).parse(t, tz) case strings.HasPrefix(string(t), "Decimal("): return (&Decimal{name: name}).parse(t) case strings.HasPrefix(strType, "Nested("): @@ -259,6 +263,7 @@ var ( scanTypeDecimal = reflect.TypeOf(decimal.Decimal{}) scanTypeMultiPolygon = reflect.TypeOf(orb.MultiPolygon{}) scanTypeVariant = reflect.TypeOf(chcol.Variant{}) + scanTypeDynamic = reflect.TypeOf(chcol.Dynamic{}) ) func (col *Float32) Name() string { diff --git a/lib/column/dynamic.go b/lib/column/dynamic.go new file mode 100644 index 0000000000..2378df104b --- /dev/null +++ b/lib/column/dynamic.go @@ -0,0 +1,442 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package column + +import ( + "database/sql/driver" + "fmt" + "reflect" + "slices" + "strconv" + "strings" + "time" + + "github.com/ClickHouse/ch-go/proto" + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" +) + +const SupportedDynamicSerializationVersion = 1 +const DefaultMaxDynamicTypes = 32 + +type Dynamic struct { + chType Type + tz *time.Location + + name string + + maxTypes uint8 + totalTypes uint8 + typeNames []string + typeNamesIndex map[string]int + + variant Variant +} + +func (c *Dynamic) parse(t Type, tz *time.Location) (_ *Dynamic, err error) { + c.chType = t + c.tz = tz + tStr := string(t) + + // SharedVariant is special, and does not count against totalTypes + c.typeNamesIndex = make(map[string]int) + c.variant.columnTypeIndex = make(map[string]uint8) + sv, _ := Type("SharedVariant").Column("", tz) + c.addColumn(sv) + + c.maxTypes = DefaultMaxDynamicTypes + c.totalTypes = 0 // Reset to 0 after adding SharedVariant + + if tStr == "Dynamic" { + return c, nil + } + + if !strings.HasPrefix(tStr, "Dynamic(") || !strings.HasSuffix(tStr, ")") { + return nil, &UnsupportedColumnTypeError{t: t} + } + + typeParamsStr := strings.TrimPrefix(tStr, "Dynamic(") + typeParamsStr = strings.TrimSuffix(typeParamsStr, ")") + + if strings.HasPrefix(typeParamsStr, "max_types=") { + v := strings.TrimPrefix(typeParamsStr, "max_types=") + if maxTypes, err := strconv.Atoi(v); err == nil { + c.maxTypes = uint8(maxTypes) + } + } + + return c, nil +} + +func (c *Dynamic) addColumn(col Interface) { + typeName := string(col.Type()) + c.typeNames = append(c.typeNames, typeName) + c.typeNamesIndex[typeName] = len(c.typeNames) - 1 + c.totalTypes++ + c.variant.addColumn(col) +} + +func (c *Dynamic) Name() string { + return c.name +} + +func (c *Dynamic) Type() Type { + return c.chType +} + +func (c *Dynamic) Rows() int { + return c.variant.Rows() +} + +func (c *Dynamic) Row(i int, ptr bool) any { + typeIndex := c.variant.discriminators[i] + offsetIndex := c.variant.offsets[i] + var value any + var chType string + if typeIndex != NullVariantDiscriminator { + value = c.variant.columns[typeIndex].Row(offsetIndex, ptr) + chType = string(c.variant.columns[typeIndex].Type()) + } + + dyn := chcol.NewDynamicWithType(value, chType) + if ptr { + return &dyn + } + + return dyn +} + +func (c *Dynamic) ScanRow(dest any, row int) error { + typeIndex := c.variant.discriminators[row] + offsetIndex := c.variant.offsets[row] + var value any + var chType string + if typeIndex != NullVariantDiscriminator { + value = c.variant.columns[typeIndex].Row(offsetIndex, false) + chType = string(c.variant.columns[typeIndex].Type()) + } + + switch v := dest.(type) { + case *chcol.Dynamic: + vt := chcol.NewDynamic(value) + *v = vt + case **chcol.Dynamic: + vt := chcol.NewDynamic(value) + **v = vt + case *chcol.DynamicWithType: + vt := chcol.NewDynamicWithType(value, chType) + *v = vt + case **chcol.DynamicWithType: + vt := chcol.NewDynamicWithType(value, chType) + **v = vt + default: + if typeIndex == NullVariantDiscriminator { + return nil + } + + if err := c.variant.columns[typeIndex].ScanRow(dest, offsetIndex); err != nil { + return err + } + } + + return nil +} + +func (c *Dynamic) Append(v any) (nulls []uint8, err error) { + switch v.(type) { + case []chcol.Dynamic: + for i, vt := range v.([]chcol.Dynamic) { + err := c.AppendRow(vt) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) + } + } + + return nil, nil + case []*chcol.Dynamic: + for i, vt := range v.([]*chcol.Dynamic) { + err := c.AppendRow(vt) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) + } + } + + return nil, nil + case []chcol.DynamicWithType: + for i, vt := range v.([]chcol.DynamicWithType) { + err := c.AppendRow(vt) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) + } + } + + return nil, nil + case []*chcol.DynamicWithType: + for i, vt := range v.([]*chcol.DynamicWithType) { + err := c.AppendRow(vt) + if err != nil { + return nil, fmt.Errorf("failed to AppendRow at index %d: %w", i, err) + } + } + + return nil, nil + default: + if valuer, ok := v.(driver.Valuer); ok { + val, err := valuer.Value() + if err != nil { + return nil, &ColumnConverterError{ + Op: "Append", + To: string(c.chType), + From: fmt.Sprintf("%T", v), + Hint: "could not get driver.Valuer value", + } + } + + return c.Append(val) + } + + return nil, &ColumnConverterError{ + Op: "Append", + To: string(c.chType), + From: fmt.Sprintf("%T", v), + } + } +} + +func (c *Dynamic) AppendRow(v any) error { + var requestedType string + switch v.(type) { + case nil: + c.variant.appendNullRow() + return nil + case chcol.Dynamic: + v = v.(chcol.Dynamic).Any() + if v == nil { + c.variant.appendNullRow() + return nil + } + case *chcol.Dynamic: + v = v.(*chcol.Dynamic).Any() + if v == nil { + c.variant.appendNullRow() + return nil + } + case chcol.DynamicWithType: + requestedType = v.(chcol.DynamicWithType).Type() + v = v.(chcol.DynamicWithType).Any() + if v == nil { + c.variant.appendNullRow() + return nil + } + case *chcol.DynamicWithType: + requestedType = v.(*chcol.DynamicWithType).Type() + v = v.(*chcol.DynamicWithType).Any() + if v == nil { + c.variant.appendNullRow() + return nil + } + } + + if requestedType != "" { + var col Interface + colIndex, ok := c.typeNamesIndex[requestedType] + if ok { + col = c.variant.columns[colIndex] + } else { + newCol, err := Type(requestedType).Column("", c.tz) + if err != nil { + return fmt.Errorf("value \"%v\" cannot be stored in dynamic column %s with requested type %s: unable to append type: %w", v, c.chType, requestedType, err) + } + + c.addColumn(newCol) + colIndex = int(c.totalTypes) + col = newCol + } + + if err := col.AppendRow(v); err != nil { + return fmt.Errorf("value \"%v\" cannot be stored in dynamic column %s with requested type %s: %w", v, c.chType, requestedType, err) + } + + c.variant.appendDiscriminatorRow(uint8(colIndex)) + return nil + } + + // If preferred type wasn't provided, try each column + for i, col := range c.variant.columns { + if c.typeNames[i] == "SharedVariant" { + // Do not try to fit into SharedVariant + continue + } + + if err := col.AppendRow(v); err == nil { + c.variant.appendDiscriminatorRow(uint8(i)) + return nil + } + } + + // If no existing columns match, try matching a ClickHouse type from common Go types + inferredTypeName := inferClickHouseTypeFromGoType(v) + if inferredTypeName != "" { + return c.AppendRow(chcol.NewDynamicWithType(v, inferredTypeName)) + } + + return fmt.Errorf("value \"%v\" cannot be stored in dynamic column: no compatible types. hint: use clickhouse.DynamicWithType to wrap the value", v) +} + +func (c *Dynamic) sortColumnsForEncoding() { + previousTypeNames := make([]string, 0, len(c.typeNames)) + previousTypeNames = append(previousTypeNames, c.typeNames...) + slices.Sort(c.typeNames) + + for i, typeName := range c.typeNames { + c.typeNamesIndex[typeName] = i + c.variant.columnTypeIndex[typeName] = uint8(i) + } + + sortedDiscriminatorMap := make([]uint8, len(c.variant.columns)) + sortedColumns := make([]Interface, len(c.variant.columns)) + for i, typeName := range previousTypeNames { + correctIndex := c.typeNamesIndex[typeName] + + sortedDiscriminatorMap[i] = uint8(correctIndex) + sortedColumns[correctIndex] = c.variant.columns[i] + } + c.variant.columns = sortedColumns + + for i := range c.variant.discriminators { + if c.variant.discriminators[i] == NullVariantDiscriminator { + continue + } + + c.variant.discriminators[i] = sortedDiscriminatorMap[c.variant.discriminators[i]] + } +} + +func (c *Dynamic) encodeHeader(buffer *proto.Buffer) { + c.sortColumnsForEncoding() + + buffer.PutUInt64(SupportedDynamicSerializationVersion) + buffer.PutUVarInt(uint64(c.maxTypes)) + buffer.PutUVarInt(uint64(c.totalTypes)) + + for _, typeName := range c.typeNames { + if typeName == "SharedVariant" { + // SharedVariant is implicitly present in Dynamic, do not append to type names + continue + } + + buffer.PutString(typeName) + } + + c.variant.encodeHeader(buffer) +} + +func (c *Dynamic) encodeData(buffer *proto.Buffer) { + c.variant.encodeData(buffer) +} + +func (c *Dynamic) Encode(buffer *proto.Buffer) { + c.encodeHeader(buffer) + c.encodeData(buffer) +} + +func (c *Dynamic) ScanType() reflect.Type { + return scanTypeDynamic +} + +func (c *Dynamic) Reset() { + c.variant.Reset() +} + +func (c *Dynamic) decodeHeader(reader *proto.Reader) error { + dynamicSerializationVersion, err := reader.UInt64() + if err != nil { + return fmt.Errorf("failed to read dynamic serialization version: %w", err) + } else if dynamicSerializationVersion != SupportedDynamicSerializationVersion { + return fmt.Errorf("unsupported dynamic serialization version: %d", dynamicSerializationVersion) + } + + maxTypes, err := reader.UVarInt() + if err != nil { + return fmt.Errorf("failed to read max types for dynamic column: %w", err) + } + c.maxTypes = uint8(maxTypes) + + totalTypes, err := reader.UVarInt() + if err != nil { + return fmt.Errorf("failed to read total types for dynamic column: %w", err) + } + + sortedTypeNames := make([]string, 0, totalTypes+1) + for i := uint64(0); i < totalTypes; i++ { + typeName, err := reader.Str() + if err != nil { + return fmt.Errorf("failed to read type name at index %d for dynamic column: %w", i, err) + } + + sortedTypeNames = append(sortedTypeNames, typeName) + } + + sortedTypeNames = append(sortedTypeNames, "SharedVariant") + slices.Sort(sortedTypeNames) // Re-sort after adding SharedVariant + + c.typeNames = make([]string, 0, len(sortedTypeNames)) + c.typeNamesIndex = make(map[string]int, len(sortedTypeNames)) + c.variant.columns = make([]Interface, 0, len(sortedTypeNames)) + c.variant.columnTypeIndex = make(map[string]uint8, len(sortedTypeNames)) + + for _, typeName := range sortedTypeNames { + col, err := Type(typeName).Column("", c.tz) + if err != nil { + return fmt.Errorf("failed to add dynamic column with type %s: %w", typeName, err) + } + + c.addColumn(col) + } + + c.totalTypes = uint8(totalTypes) // Reset to server's totalTypes + + err = c.variant.decodeHeader(reader) + if err != nil { + return fmt.Errorf("failed to decode variant header: %w", err) + } + + return nil +} + +func (c *Dynamic) decodeData(reader *proto.Reader, rows int) error { + err := c.variant.decodeData(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode variant data: %w", err) + } + + return nil +} + +func (c *Dynamic) Decode(reader *proto.Reader, rows int) error { + err := c.decodeHeader(reader) + if err != nil { + return fmt.Errorf("failed to decode dynamic header: %w", err) + } + + err = c.decodeData(reader, rows) + if err != nil { + return fmt.Errorf("failed to decode dynamic data: %w", err) + } + + return nil +} diff --git a/lib/column/dynamic_gen.go b/lib/column/dynamic_gen.go new file mode 100644 index 0000000000..425b14892d --- /dev/null +++ b/lib/column/dynamic_gen.go @@ -0,0 +1,269 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by make codegen DO NOT EDIT. +// source: lib/column/codegen/dynamic.tpl + +package column + +import ( + "database/sql" + "encoding/json" + "github.com/ClickHouse/ch-go/proto" + "github.com/google/uuid" + "github.com/paulmach/orb" + "time" +) + +// inferClickHouseTypeFromGoType takes a Go interface{} and converts it to a ClickHouse type. +// Returns empty string if type was not matched. +// This is best effort and does not work for all types. +// Optimally, users should provide a type using DynamicWithType. +func inferClickHouseTypeFromGoType(v any) string { + switch v.(type) { + case float32: + return "Float32" + case *float32: + return "Float32" + case float64: + return "Float64" + case *float64: + return "Float64" + case int8: + return "Int8" + case *int8: + return "Int8" + case int16: + return "Int16" + case *int16: + return "Int16" + case int32: + return "Int32" + case *int32: + return "Int32" + case int64: + return "Int64" + case *int64: + return "Int64" + case uint8: + return "UInt8" + case *uint8: + return "UInt8" + case uint16: + return "UInt16" + case *uint16: + return "UInt16" + case uint32: + return "UInt32" + case *uint32: + return "UInt32" + case uint64: + return "UInt64" + case *uint64: + return "UInt64" + case string: + return "String" + case *string: + return "String" + case json.RawMessage: + return "String" + case *json.RawMessage: + return "String" + case sql.NullString: + return "String" + case *sql.NullString: + return "String" + case bool: + return "Bool" + case *bool: + return "Bool" + case sql.NullBool: + return "Bool" + case *sql.NullBool: + return "Bool" + case time.Time: + return "DateTime64(3)" + case *time.Time: + return "DateTime64(3)" + case sql.NullTime: + return "DateTime64(3)" + case *sql.NullTime: + return "DateTime64(3)" + case uuid.UUID: + return "UUID" + case *uuid.UUID: + return "UUID" + case proto.IPv6: + return "IPv6" + case *proto.IPv6: + return "IPv6" + case orb.MultiPolygon: + return "MultiPolygon" + case *orb.MultiPolygon: + return "MultiPolygon" + case orb.Point: + return "Point" + case *orb.Point: + return "Point" + case orb.Polygon: + return "Polygon" + case *orb.Polygon: + return "Polygon" + case orb.Ring: + return "Ring" + case *orb.Ring: + return "Ring" + case []float32: + return "Array(Float32)" + case []*float32: + return "Array(Float32)" + case []float64: + return "Array(Float64)" + case []*float64: + return "Array(Float64)" + case []int8: + return "Array(Int8)" + case []*int8: + return "Array(Int8)" + case []int16: + return "Array(Int16)" + case []*int16: + return "Array(Int16)" + case []int32: + return "Array(Int32)" + case []*int32: + return "Array(Int32)" + case []int64: + return "Array(Int64)" + case []*int64: + return "Array(Int64)" + case []*uint8: + return "Array(UInt8)" + case []uint16: + return "Array(UInt16)" + case []*uint16: + return "Array(UInt16)" + case []uint32: + return "Array(UInt32)" + case []*uint32: + return "Array(UInt32)" + case []uint64: + return "Array(UInt64)" + case []*uint64: + return "Array(UInt64)" + case []string: + return "Array(String)" + case []*string: + return "Array(String)" + case []json.RawMessage: + return "Array(String)" + case []*json.RawMessage: + return "Array(String)" + case []sql.NullString: + return "Array(String)" + case []*sql.NullString: + return "Array(String)" + case []bool: + return "Array(Bool)" + case []*bool: + return "Array(Bool)" + case []sql.NullBool: + return "Array(Bool)" + case []*sql.NullBool: + return "Array(Bool)" + case []time.Time: + return "Array(DateTime64(3))" + case []*time.Time: + return "Array(DateTime64(3))" + case []sql.NullTime: + return "Array(DateTime64(3))" + case []*sql.NullTime: + return "Array(DateTime64(3))" + case []uuid.UUID: + return "Array(UUID)" + case []*uuid.UUID: + return "Array(UUID)" + case []proto.IPv6: + return "Array(IPv6)" + case []*proto.IPv6: + return "Array(IPv6)" + case []orb.MultiPolygon: + return "Array(MultiPolygon)" + case []*orb.MultiPolygon: + return "Array(MultiPolygon)" + case []orb.Point: + return "Array(Point)" + case []*orb.Point: + return "Array(Point)" + case []orb.Polygon: + return "Array(Polygon)" + case []*orb.Polygon: + return "Array(Polygon)" + case []orb.Ring: + return "Array(Ring)" + case []*orb.Ring: + return "Array(Ring)" + case map[string]float32: + return "Map(String, Float32)" + case map[string]float64: + return "Map(String, Float64)" + case map[string]int8: + return "Map(String, Int8)" + case map[string]int16: + return "Map(String, Int16)" + case map[string]int32: + return "Map(String, Int32)" + case map[string]int64: + return "Map(String, Int64)" + case map[string]uint8: + return "Map(String, UInt8)" + case map[string]uint16: + return "Map(String, UInt16)" + case map[string]uint32: + return "Map(String, UInt32)" + case map[string]uint64: + return "Map(String, UInt64)" + case map[string]string: + return "Map(String, String)" + case map[string]json.RawMessage: + return "Map(String, String)" + case map[string]sql.NullString: + return "Map(String, String)" + case map[string]bool: + return "Map(String, Bool)" + case map[string]sql.NullBool: + return "Map(String, Bool)" + case map[string]time.Time: + return "Map(String, DateTime64(3))" + case map[string]sql.NullTime: + return "Map(String, DateTime64(3))" + case map[string]uuid.UUID: + return "Map(String, UUID)" + case map[string]proto.IPv6: + return "Map(String, IPv6)" + case map[string]orb.MultiPolygon: + return "Map(String, MultiPolygon)" + case map[string]orb.Point: + return "Map(String, Point)" + case map[string]orb.Polygon: + return "Map(String, Polygon)" + case map[string]orb.Ring: + return "Map(String, Ring)" + default: + return "" + } +} diff --git a/lib/column/sharedvariant.go b/lib/column/sharedvariant.go new file mode 100644 index 0000000000..b356f1ac61 --- /dev/null +++ b/lib/column/sharedvariant.go @@ -0,0 +1,72 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package column + +import ( + "github.com/ClickHouse/ch-go/proto" + "reflect" +) + +type SharedVariant struct { + name string + stringData String +} + +func (c *SharedVariant) Name() string { + return c.name +} + +func (c *SharedVariant) Type() Type { + return "SharedVariant" +} + +func (c *SharedVariant) Rows() int { + return c.stringData.Rows() +} + +func (c *SharedVariant) Row(i int, ptr bool) any { + return c.stringData.Row(i, ptr) +} + +func (c *SharedVariant) ScanRow(dest any, row int) error { + return c.stringData.ScanRow(dest, row) +} + +func (c *SharedVariant) Append(v any) (nulls []uint8, err error) { + return c.stringData.Append(v) +} + +func (c *SharedVariant) AppendRow(v any) error { + return c.stringData.AppendRow(v) +} + +func (c *SharedVariant) Decode(reader *proto.Reader, rows int) error { + return c.stringData.Decode(reader, rows) +} + +func (c *SharedVariant) Encode(buffer *proto.Buffer) { + c.stringData.Encode(buffer) +} + +func (c *SharedVariant) ScanType() reflect.Type { + return c.stringData.ScanType() +} + +func (c *SharedVariant) Reset() { + c.stringData.Reset() +} diff --git a/tests/dynamic_test.go b/tests/dynamic_test.go new file mode 100644 index 0000000000..6e98eda085 --- /dev/null +++ b/tests/dynamic_test.go @@ -0,0 +1,233 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/chcol" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var dynamicTestDate, _ = time.Parse(time.RFC3339, "2024-12-13T02:09:30.123Z") + +func setupDynamicTest(t *testing.T) driver.Conn { + conn, err := GetNativeConnection(clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_dynamic_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + require.NoError(t, err) + + if !CheckMinServerServerVersion(conn, 24, 1, 0) { + t.Skip(fmt.Errorf("unsupported clickhouse version for Dynamic type")) + return nil + } + + return conn +} + +func TestDynamic(t *testing.T) { + ctx := context.Background() + conn := setupDynamicTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_dynamic ( + c Dynamic + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_dynamic")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_dynamic (c)") + require.NoError(t, err) + + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(true, "Bool"))) + colInt64 := int64(42) + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colInt64, "Int64"))) + colString := "test" + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colString, "String"))) + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(dynamicTestDate, "DateTime64(3)"))) + var colNil any = nil + require.NoError(t, batch.Append(colNil)) + colSliceUInt8 := []uint8{0xA, 0xB, 0xC} + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colSliceUInt8, "Array(UInt8)"))) + colSliceMapStringString := []map[string]string{{"key1": "value1", "key2": "value2"}, {"key3": "value3"}} + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colSliceMapStringString, "Array(Map(String, String))"))) + colMapStringString := map[string]string{"key1": "value1", "key2": "value2"} + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colMapStringString, "Map(String, String)"))) + colMapStringInt64 := map[string]int64{"key1": 42, "key2": 84} + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(colMapStringInt64, "Map(String, Int64)"))) + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic") + require.NoError(t, err) + + var row chcol.Dynamic + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, true, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colInt64, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colString, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, dynamicTestDate, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colNil, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colSliceUInt8, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colSliceMapStringString, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colMapStringString, row.Any()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, colMapStringInt64, row.Any()) +} + +func TestDynamic_ScanWithType(t *testing.T) { + ctx := context.Background() + conn := setupDynamicTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_dynamic ( + c Dynamic + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_dynamic")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_dynamic (c)") + require.NoError(t, err) + + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(true, "Bool"))) + require.NoError(t, batch.Append(clickhouse.NewDynamicWithType(int64(42), "Int64"))) + require.NoError(t, batch.Append(nil)) + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic") + require.NoError(t, err) + + var row chcol.DynamicWithType + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, true, row.Any()) + require.Equal(t, "Bool", row.Type()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, int64(42), row.Any()) + require.Equal(t, "Int64", row.Type()) + + require.True(t, rows.Next()) + err = rows.Scan(&row) + require.NoError(t, err) + require.Equal(t, nil, row.Any()) + require.Equal(t, "", row.Type()) +} + +func TestDynamic_BatchFlush(t *testing.T) { + t.Skip(fmt.Errorf("server-side Dynamic bug")) + + ctx := context.Background() + conn := setupDynamicTest(t) + + const ddl = ` + CREATE TABLE IF NOT EXISTS test_dynamic ( + c Dynamic + ) Engine = MergeTree() ORDER BY tuple() + ` + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + require.NoError(t, conn.Exec(ctx, "DROP TABLE IF EXISTS test_dynamic")) + }() + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_dynamic (c)") + require.NoError(t, err) + + vals := make([]clickhouse.DynamicWithType, 0, 1000) + for i := 0; i < 1000; i++ { + if i%2 == 0 { + vals = append(vals, clickhouse.NewDynamicWithType(int64(i), "Int64")) + } else { + vals = append(vals, clickhouse.NewDynamicWithType(i%5 == 0, "Bool")) + } + + require.NoError(t, batch.Append(vals[i])) + require.NoError(t, batch.Flush()) + } + require.NoError(t, batch.Send()) + + rows, err := conn.Query(ctx, "SELECT c FROM test_dynamic") + require.NoError(t, err) + + i := 0 + for rows.Next() { + var row clickhouse.DynamicWithType + err = rows.Scan(&row) + require.NoError(t, err) + + if i%2 == 0 { + require.Equal(t, int64(i), row.Any()) + require.Equal(t, "Int64", row.Type()) + } else { + require.Equal(t, i%5 == 0, row.Any()) + require.Equal(t, "Bool", row.Type()) + } + + i++ + } +}