From cd37406d257c2bf3a96bfe63f497be2ae3e1704d Mon Sep 17 00:00:00 2001 From: hongker Date: Wed, 8 May 2024 15:50:30 +0800 Subject: [PATCH] Optional flag to close query with flush (#1276) * close query when use batch.Flush * modify message by author * Update 1271_test.go change name * Update options.go change WithCloseQuery to WithCloseOnFlush * modify property name * change property name * modify property name --------- Co-authored-by: colincchen Co-authored-by: hongker Co-authored-by: Kuba Kaflik --- conn_batch.go | 41 ++++++------ lib/driver/options.go | 8 +++ tests/issues/1271_test.go | 128 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+), 18 deletions(-) create mode 100644 tests/issues/1271_test.go diff --git a/conn_batch.go b/conn_batch.go index 6d8b1ce5f0..26ad4ac024 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -83,14 +83,15 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr } b := &batch{ - ctx: ctx, - query: query, - conn: c, - block: block, - released: false, - connRelease: release, - connAcquire: acquire, - onProcess: onProcess, + ctx: ctx, + query: query, + conn: c, + block: block, + released: false, + connRelease: release, + connAcquire: acquire, + onProcess: onProcess, + closeOnFlush: opts.CloseOnFlush, } if opts.ReleaseConnection { @@ -101,16 +102,17 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr } type batch struct { - err error - ctx context.Context - query string - conn *connect - sent bool // sent signalize that batch is send to ClickHouse. - released bool // released signalize that conn was returned to pool and can't be used. - block *proto.Block - connRelease func(*connect, error) - connAcquire func(context.Context) (*connect, error) - onProcess *onProcess + err error + ctx context.Context + query string + conn *connect + sent bool // sent signalize that batch is send to ClickHouse. + released bool // released signalize that conn was returned to pool and can't be used. + closeOnFlush bool // closeOnFlush signalize that batch should close query and release conn when use Flush + block *proto.Block + connRelease func(*connect, error) + connAcquire func(context.Context) (*connect, error) + onProcess *onProcess } func (b *batch) release(err error) { @@ -305,6 +307,9 @@ func (b *batch) Flush() error { if err := b.conn.sendData(b.block, ""); err != nil { return err } + if b.closeOnFlush { + b.release(b.closeQuery()) + } } b.block.Reset() return nil diff --git a/lib/driver/options.go b/lib/driver/options.go index d81760c987..c214c2c922 100644 --- a/lib/driver/options.go +++ b/lib/driver/options.go @@ -2,6 +2,7 @@ package driver type PrepareBatchOptions struct { ReleaseConnection bool + CloseOnFlush bool } type PrepareBatchOption func(options *PrepareBatchOptions) @@ -11,3 +12,10 @@ func WithReleaseConnection() PrepareBatchOption { options.ReleaseConnection = true } } + +// WithCloseOnFlush closes batch INSERT query when Flush is executed +func WithCloseOnFlush() PrepareBatchOption { + return func(options *PrepareBatchOptions) { + options.CloseOnFlush = true + } +} diff --git a/tests/issues/1271_test.go b/tests/issues/1271_test.go new file mode 100644 index 0000000000..14fed38bae --- /dev/null +++ b/tests/issues/1271_test.go @@ -0,0 +1,128 @@ +package issues + +import ( + "context" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/google/uuid" + "math/rand" + "runtime" + "strings" + "testing" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" + "github.com/stretchr/testify/require" +) + +// test for https://github.com/ClickHouse/clickhouse-go/issues/1271 +func Test1271(t *testing.T) { + var ( + conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{ + "max_execution_time": 60, + "allow_experimental_object_type": true, + }, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + ) + defer func() { + conn.Exec(context.Background(), "DROP TABLE flush_with_close_query_example") + conn.Close() + }() + conn.Exec(context.Background(), "DROP TABLE IF EXISTS flush_with_close_query_example") + ctx := context.Background() + err = conn.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS flush_with_close_query_example ( + Col1 UInt64 + , Col2 String + , Col3 FixedString(3) + , Col4 UUID + , Col5 Map(String, UInt64) + , Col6 Array(String) + , Col7 Tuple(String, UInt64, Array(Map(String, UInt64))) + , Col8 DateTime + ) Engine = MergeTree() ORDER BY tuple() + `) + require.NoError(t, err) + + batch, err := conn.PrepareBatch(ctx, "INSERT INTO flush_with_close_query_example", driver.WithCloseOnFlush()) + require.NoError(t, err) + // 1 million rows should only take < 1s on most desktops + for i := 0; i < 100_000; i++ { + require.NoError(t, batch.Append( + uint64(i), + RandAsciiString(5), + RandAsciiString(3), + uuid.New(), + map[string]uint64{"key": uint64(i)}, // Map(String, UInt64) + []string{RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1)}, // Array(String) + []any{ // Tuple(String, UInt64, Array(Map(String, UInt64))) + RandAsciiString(10), uint64(i), []map[string]uint64{ + {"key": uint64(i)}, + {"key": uint64(i + 1)}, + {"key": uint64(i) + 2}, + }, + }, + time.Now().Add(time.Duration(i)*time.Second), + )) + if i > 0 && i%10000 == 0 { + require.NoError(t, batch.Flush()) + PrintMemUsage() + } + } + require.NoError(t, batch.Flush()) + // confirm we have the right count + var col1 uint64 + require.NoError(t, conn.QueryRow(ctx, "SELECT count() FROM flush_with_close_query_example").Scan(&col1)) + require.Equal(t, uint64(100_000), col1) +} + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(bytes) { + sb.WriteByte(bytes[idx]) + i-- + } + cache >>= letterIdxBits + remain-- + } + + return sb.String() +} + +// PrintMemUsage outputs the current, total and OS memory being used. As well as the number +// of garage collection cycles completed. +// thanks to https://golangcode.com/print-the-current-memory-usage/ +func PrintMemUsage() { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) + fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) + fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) + fmt.Printf("\tNumGC = %v\n", m.NumGC) +} + +func bToMb(b uint64) uint64 { + return b / 1024 / 1024 +}