Skip to content

Commit

Permalink
Optional flag to close query with flush (#1276)
Browse files Browse the repository at this point in the history
* close query when use batch.Flush

* modify message by author

* Update 1271_test.go

change name

* Update options.go

change WithCloseQuery to WithCloseOnFlush

* modify property name

* change property name

* modify property name

---------

Co-authored-by: colincchen <[email protected]>
Co-authored-by: hongker <[email protected]>
Co-authored-by: Kuba Kaflik <[email protected]>
  • Loading branch information
4 people authored May 8, 2024
1 parent 1ae716e commit cd37406
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 18 deletions.
41 changes: 23 additions & 18 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr
}

b := &batch{
ctx: ctx,
query: query,
conn: c,
block: block,
released: false,
connRelease: release,
connAcquire: acquire,
onProcess: onProcess,
ctx: ctx,
query: query,
conn: c,
block: block,
released: false,
connRelease: release,
connAcquire: acquire,
onProcess: onProcess,
closeOnFlush: opts.CloseOnFlush,
}

if opts.ReleaseConnection {
Expand All @@ -101,16 +102,17 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr
}

type batch struct {
err error
ctx context.Context
query string
conn *connect
sent bool // sent signalize that batch is send to ClickHouse.
released bool // released signalize that conn was returned to pool and can't be used.
block *proto.Block
connRelease func(*connect, error)
connAcquire func(context.Context) (*connect, error)
onProcess *onProcess
err error
ctx context.Context
query string
conn *connect
sent bool // sent signalize that batch is send to ClickHouse.
released bool // released signalize that conn was returned to pool and can't be used.
closeOnFlush bool // closeOnFlush signalize that batch should close query and release conn when use Flush
block *proto.Block
connRelease func(*connect, error)
connAcquire func(context.Context) (*connect, error)
onProcess *onProcess
}

func (b *batch) release(err error) {
Expand Down Expand Up @@ -305,6 +307,9 @@ func (b *batch) Flush() error {
if err := b.conn.sendData(b.block, ""); err != nil {
return err
}
if b.closeOnFlush {
b.release(b.closeQuery())
}
}
b.block.Reset()
return nil
Expand Down
8 changes: 8 additions & 0 deletions lib/driver/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package driver

type PrepareBatchOptions struct {
ReleaseConnection bool
CloseOnFlush bool
}

type PrepareBatchOption func(options *PrepareBatchOptions)
Expand All @@ -11,3 +12,10 @@ func WithReleaseConnection() PrepareBatchOption {
options.ReleaseConnection = true
}
}

// WithCloseOnFlush closes batch INSERT query when Flush is executed
func WithCloseOnFlush() PrepareBatchOption {
return func(options *PrepareBatchOptions) {
options.CloseOnFlush = true
}
}
128 changes: 128 additions & 0 deletions tests/issues/1271_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package issues

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"math/rand"
"runtime"
"strings"
"testing"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
)

// test for https://github.com/ClickHouse/clickhouse-go/issues/1271
func Test1271(t *testing.T) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
defer func() {
conn.Exec(context.Background(), "DROP TABLE flush_with_close_query_example")
conn.Close()
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS flush_with_close_query_example")
ctx := context.Background()
err = conn.Exec(ctx, `
CREATE TABLE IF NOT EXISTS flush_with_close_query_example (
Col1 UInt64
, Col2 String
, Col3 FixedString(3)
, Col4 UUID
, Col5 Map(String, UInt64)
, Col6 Array(String)
, Col7 Tuple(String, UInt64, Array(Map(String, UInt64)))
, Col8 DateTime
) Engine = MergeTree() ORDER BY tuple()
`)
require.NoError(t, err)

batch, err := conn.PrepareBatch(ctx, "INSERT INTO flush_with_close_query_example", driver.WithCloseOnFlush())
require.NoError(t, err)
// 1 million rows should only take < 1s on most desktops
for i := 0; i < 100_000; i++ {
require.NoError(t, batch.Append(
uint64(i),
RandAsciiString(5),
RandAsciiString(3),
uuid.New(),
map[string]uint64{"key": uint64(i)}, // Map(String, UInt64)
[]string{RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1), RandAsciiString(1)}, // Array(String)
[]any{ // Tuple(String, UInt64, Array(Map(String, UInt64)))
RandAsciiString(10), uint64(i), []map[string]uint64{
{"key": uint64(i)},
{"key": uint64(i + 1)},
{"key": uint64(i) + 2},
},
},
time.Now().Add(time.Duration(i)*time.Second),
))
if i > 0 && i%10000 == 0 {
require.NoError(t, batch.Flush())
PrintMemUsage()
}
}
require.NoError(t, batch.Flush())
// confirm we have the right count
var col1 uint64
require.NoError(t, conn.QueryRow(ctx, "SELECT count() FROM flush_with_close_query_example").Scan(&col1))
require.Equal(t, uint64(100_000), col1)
}

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)

func RandAsciiString(n int) string {
return randString(n, letterBytes)
}

var src = rand.NewSource(time.Now().UnixNano())

func randString(n int, bytes string) string {
sb := strings.Builder{}
sb.Grow(n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(bytes) {
sb.WriteByte(bytes[idx])
i--
}
cache >>= letterIdxBits
remain--
}

return sb.String()
}

// PrintMemUsage outputs the current, total and OS memory being used. As well as the number
// of garage collection cycles completed.
// thanks to https://golangcode.com/print-the-current-memory-usage/
func PrintMemUsage() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// For info on each, see: https://golang.org/pkg/runtime/#MemStats
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
}

func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}

0 comments on commit cd37406

Please sign in to comment.