diff --git a/README.md b/README.md index bb1373020b..867737cc82 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,10 @@ Support for the ClickHouse protocol advanced features using `Context`: * Profile info * Profile events +## Documentation + +[https://clickhouse.com/docs/en/integrations/go](https://clickhouse.com/docs/en/integrations/go) + # `clickhouse` interface (formally `native` interface) ```go @@ -81,6 +85,7 @@ Support for the ClickHouse protocol advanced features using `Context`: MaxIdleConns: 5, ConnMaxLifetime: time.Duration(10) * time.Minute, ConnOpenStrategy: clickhouse.ConnOpenInOrder, + BlockBufferSize: 10, }) if err != nil { return err @@ -111,6 +116,7 @@ conn := clickhouse.OpenDB(&clickhouse.Options{ clickhouse.CompressionLZ4, }, Debug: true, + BlockBufferSize: 10, }) conn.SetMaxIdleConns(5) conn.SetMaxOpenConns(10) @@ -133,6 +139,7 @@ conn.SetConnMaxLifetime(time.Hour) - `deflate` - `-2` (Best Speed) to `9` (Best Compression) - `br` - `0` (Best Speed) to `11` (Best Compression) - `zstd`, `lz4` - ignored +* block_buffer_size - size of block buffer (default 2) SSL/TLS parameters: diff --git a/benchmark/v2/read-native/basic_test.go b/benchmark/v2/read-native/basic_test.go index ad68e56a5a..9005e3c3c6 100644 --- a/benchmark/v2/read-native/basic_test.go +++ b/benchmark/v2/read-native/basic_test.go @@ -22,6 +22,10 @@ func getConnection() clickhouse.Conn { MaxOpenConns: 10, MaxIdleConns: 5, ConnMaxLifetime: time.Hour, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + BlockBufferSize: 100, }) if err != nil { log.Fatal(err) diff --git a/clickhouse_options.go b/clickhouse_options.go index a701fd7876..cf412dd6c0 100644 --- a/clickhouse_options.go +++ b/clickhouse_options.go @@ -129,6 +129,7 @@ type Options struct { MaxIdleConns int // default 5 ConnMaxLifetime time.Duration // default 1 hour ConnOpenStrategy ConnOpenStrategy + BlockBufferSize uint8 // default 2 - can be overwritten on query scheme string ReadTimeout time.Duration @@ -185,6 +186,8 @@ func (o *Options) fromDSN(in string) error { } else { o.Compression.Level = int(level) } + } else { + return err } case "dial_timeout": duration, err := time.ParseDuration(params.Get(v)) @@ -192,6 +195,15 @@ func (o *Options) fromDSN(in string) error { return fmt.Errorf("clickhouse [dsn parse]: dial timeout: %s", err) } o.DialTimeout = duration + case "block_buffer_size": + if blockBufferSize, err := strconv.ParseUint(params.Get(v), 10, 8); err == nil { + if blockBufferSize <= 0 { + return fmt.Errorf("block_buffer_size must be greater than 0") + } + o.BlockBufferSize = uint8(blockBufferSize) + } else { + return err + } case "read_timeout": duration, err := time.ParseDuration(params.Get(v)) if err != nil { @@ -252,7 +264,7 @@ func (o *Options) fromDSN(in string) error { return nil } -// receive copy of Options so we don't modify original - so its reusable +// receive copy of Options, so we don't modify original - so its reusable func (o Options) setDefaults() *Options { if len(o.Auth.Database) == 0 { o.Auth.Database = "default" @@ -275,6 +287,9 @@ func (o Options) setDefaults() *Options { if o.ConnMaxLifetime == 0 { o.ConnMaxLifetime = time.Hour } + if o.BlockBufferSize <= 0 { + o.BlockBufferSize = 2 + } if o.Addr == nil || len(o.Addr) == 0 { switch o.Protocol { case Native: diff --git a/conn.go b/conn.go index 59b140f7b7..e32bf6cc32 100644 --- a/conn.go +++ b/conn.go @@ -70,17 +70,18 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er } var ( connect = &connect{ - opt: opt, - conn: conn, - debugf: debugf, - buffer: new(chproto.Buffer), - reader: chproto.NewReader(conn), - revision: proto.ClientTCPProtocolVersion, - structMap: &structMap{}, - compression: compression, - connectedAt: time.Now(), - compressor: compress.NewWriter(), - readTimeout: opt.ReadTimeout, + opt: opt, + conn: conn, + debugf: debugf, + buffer: new(chproto.Buffer), + reader: chproto.NewReader(conn), + revision: proto.ClientTCPProtocolVersion, + structMap: &structMap{}, + compression: compression, + connectedAt: time.Now(), + compressor: compress.NewWriter(), + readTimeout: opt.ReadTimeout, + blockBufferSize: opt.BlockBufferSize, } ) if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil { @@ -91,20 +92,21 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er // https://github.com/ClickHouse/ClickHouse/blob/master/src/Client/Connection.cpp type connect struct { - opt *Options - conn net.Conn - debugf func(format string, v ...interface{}) - server ServerVersion - closed bool - buffer *chproto.Buffer - reader *chproto.Reader - released bool - revision uint64 - structMap *structMap - compression CompressionMethod - connectedAt time.Time - compressor *compress.Writer - readTimeout time.Duration + opt *Options + conn net.Conn + debugf func(format string, v ...interface{}) + server ServerVersion + closed bool + buffer *chproto.Buffer + reader *chproto.Reader + released bool + revision uint64 + structMap *structMap + compression CompressionMethod + connectedAt time.Time + compressor *compress.Writer + readTimeout time.Duration + blockBufferSize uint8 } func (c *connect) settings(querySettings Settings) []proto.Setting { diff --git a/conn_http.go b/conn_http.go index 64a165f151..da5a92e368 100644 --- a/conn_http.go +++ b/conn_http.go @@ -199,6 +199,7 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon compression: opt.Compression.Method, blockCompressor: compress.NewWriter(), compressionPool: compressionPool, + blockBufferSize: opt.BlockBufferSize, } location, err := conn.readTimeZone(ctx) if err != nil { @@ -215,6 +216,7 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon blockCompressor: compress.NewWriter(), compressionPool: compressionPool, location: location, + blockBufferSize: opt.BlockBufferSize, }, nil } @@ -226,6 +228,7 @@ type httpConnect struct { compression CompressionMethod blockCompressor *compress.Writer compressionPool Pool[HTTPReaderWriter] + blockBufferSize uint8 } func (h *httpConnect) isBad() bool { diff --git a/conn_http_query.go b/conn_http_query.go index 0096186842..a5c301f6ff 100644 --- a/conn_http_query.go +++ b/conn_http_query.go @@ -55,10 +55,14 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error), rw := h.compressionPool.Get() body, err = rw.read(res) - + bufferSize := h.blockBufferSize + if options.blockBufferSize > 0 { + // allow block buffer sze to be overridden per query + bufferSize = options.blockBufferSize + } var ( errCh = make(chan error) - stream = make(chan *proto.Block, 2) + stream = make(chan *proto.Block, bufferSize) ) if len(body) == 0 { diff --git a/conn_query.go b/conn_query.go index 2a6ffdc2a5..9dfa9f7999 100644 --- a/conn_query.go +++ b/conn_query.go @@ -56,10 +56,14 @@ func (c *connect) query(ctx context.Context, release func(*connect, error), quer release(c, err) return nil, err } - + bufferSize := c.blockBufferSize + if options.blockBufferSize > 0 { + // allow block buffer sze to be overridden per query + bufferSize = options.blockBufferSize + } var ( errors = make(chan error) - stream = make(chan *proto.Block, 2) + stream = make(chan *proto.Block, bufferSize) ) go func() { diff --git a/context.go b/context.go index b20f8be159..588382edf9 100644 --- a/context.go +++ b/context.go @@ -48,8 +48,9 @@ type ( profileInfo func(*ProfileInfo) profileEvents func([]ProfileEvent) } - settings Settings - external []*ext.Table + settings Settings + external []*ext.Table + blockBufferSize uint8 } ) @@ -67,6 +68,13 @@ func WithQueryID(queryID string) QueryOption { } } +func WithBlockBufferSize(size uint8) QueryOption { + return func(o *QueryOptions) error { + o.blockBufferSize = size + return nil + } +} + func WithQuotaKey(quotaKey string) QueryOption { return func(o *QueryOptions) error { o.quotaKey = quotaKey diff --git a/examples/clickhouse_api/connect_settings.go b/examples/clickhouse_api/connect_settings.go index 6392d3246b..63f684a4c7 100644 --- a/examples/clickhouse_api/connect_settings.go +++ b/examples/clickhouse_api/connect_settings.go @@ -58,6 +58,7 @@ func PingWithSettings() error { MaxIdleConns: 5, ConnMaxLifetime: time.Duration(10) * time.Minute, ConnOpenStrategy: clickhouse.ConnOpenInOrder, + BlockBufferSize: 10, }) if err != nil { return err diff --git a/examples/clickhouse_api/context.go b/examples/clickhouse_api/context.go index 3c7a27ece2..dd83d7d78d 100644 --- a/examples/clickhouse_api/context.go +++ b/examples/clickhouse_api/context.go @@ -98,7 +98,7 @@ func UseContext() error { defer func() { conn.Exec(context.Background(), "DROP QUOTA IF EXISTS foobar") }() - ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde")) + ctx = clickhouse.Context(context.Background(), clickhouse.WithQuotaKey("abcde"), clickhouse.WithBlockBufferSize(100)) // set a quota key - first create the quota if err = conn.Exec(ctx, "CREATE QUOTA IF NOT EXISTS foobar KEYED BY client_key FOR INTERVAL 1 minute MAX queries = 5 TO default"); err != nil { return err diff --git a/go.mod b/go.mod index 71f21a2e8a..6e2974e319 100644 --- a/go.mod +++ b/go.mod @@ -31,14 +31,18 @@ require ( github.com/containerd/cgroups v1.0.4 // indirect github.com/containerd/containerd v1.6.8 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dmarkham/enumer v1.5.6 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.6.1 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/moby/sys/mount v0.3.3 // indirect @@ -48,6 +52,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/opencontainers/runc v1.1.3 // indirect + github.com/pascaldekloe/name v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.16 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -57,8 +62,15 @@ require ( github.com/tklauser/numcpus v0.4.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opencensus.io v0.23.0 // indirect + go.opentelemetry.io/otel/metric v0.31.0 // indirect + go.uber.org/atomic v1.10.0 // indirect + go.uber.org/multierr v1.8.0 // indirect + go.uber.org/zap v1.23.0 // indirect + golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect + golang.org/x/tools v0.1.12 // indirect google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/go.sum b/go.sum index 6ce831d059..817f05a35a 100644 --- a/go.sum +++ b/go.sum @@ -320,6 +320,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/dmarkham/enumer v1.5.6 h1:afhpzVOu8PoBL/+4J07PxVBf9cNnSawS/jAZK1snyLw= +github.com/dmarkham/enumer v1.5.6/go.mod h1:eAawajOQnFBxf0NndBKgbqJImkHytg3eFEngUovqgo8= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= github.com/docker/cli v0.0.0-20191017083524-a8ff7f821017/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= @@ -395,7 +397,10 @@ github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= @@ -554,6 +559,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -753,6 +760,8 @@ github.com/opencontainers/selinux v1.10.0/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuh github.com/opencontainers/selinux v1.10.1/go.mod h1:2i0OySw99QjzBBQByd1Gr9gSjvuho1lHsJxIJ3gGbJI= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/pascaldekloe/name v1.0.1 h1:9lnXOHeqeHHnWLbKfH6X98+4+ETVqFqxN09UXSjcMb0= +github.com/pascaldekloe/name v1.0.1/go.mod h1:Z//MfYJnH4jVpQ9wkclwu2I2MkHmXTlT9wR5UZScttM= github.com/paulmach/orb v0.7.1 h1:Zha++Z5OX/l168sqHK3k4z18LDvr+YAO/VjK0ReQ9rU= github.com/paulmach/orb v0.7.1/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= @@ -959,6 +968,8 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.3.0/go.mod h1:hO1KLR7jcKaDD go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.3.0/go.mod h1:keUU7UfnwWTWpJ+FWnyqmogPa82nuU5VUANFq49hlMY= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.3.0/go.mod h1:QNX1aly8ehqqX1LEa6YniTU7VY9I6R3X/oPxhGdTceE= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= +go.opentelemetry.io/otel/metric v0.31.0 h1:6SiklT+gfWAwWUR0meEMxQBtihpiEs4c+vL9spDTqUs= +go.opentelemetry.io/otel/metric v0.31.0/go.mod h1:ohmwj9KTSIeBnDBm/ZwH2PSZxZzoOaG2xZeekTRzL5A= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= @@ -973,12 +984,18 @@ go.opentelemetry.io/proto/otlp v0.11.0/go.mod h1:QpEjXPrNQzrFDZgoTo49dgHR9RYRSrg go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= +go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181009213950-7c1a557ab941/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -1031,6 +1048,7 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1117,6 +1135,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1316,6 +1336,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/lib/proto/block.go b/lib/proto/block.go index 4d13375b23..3d93247244 100644 --- a/lib/proto/block.go +++ b/lib/proto/block.go @@ -128,10 +128,11 @@ func (b *Block) Decode(reader *proto.Reader, revision uint64) (err error) { if numRows > 1_000_000_000 { return &BlockError{ Op: "Decode", - Err: errors.New("more then 1 billion rows in block"), + Err: errors.New("more then 1 billion rows in block - suspiciously big - preventing OOM"), } } - b.Columns = make([]column.Interface, 0, numCols) + b.Columns = make([]column.Interface, numCols, numCols) + b.names = make([]string, numCols, numCols) for i := 0; i < int(numCols); i++ { var ( columnName string @@ -165,7 +166,8 @@ func (b *Block) Decode(reader *proto.Reader, revision uint64) (err error) { } } } - b.names, b.Columns = append(b.names, columnName), append(b.Columns, c) + b.names[i] = columnName + b.Columns[i] = c } return nil } diff --git a/tests/conn_test.go b/tests/conn_test.go index 0e231b9113..0422815ca3 100644 --- a/tests/conn_test.go +++ b/tests/conn_test.go @@ -206,3 +206,39 @@ func TestQueryDeadline(t *testing.T) { require.Error(t, err) assert.ErrorIs(t, err, os.ErrDeadlineExceeded) } + +func TestBlockBufferSize(t *testing.T) { + env, err := GetNativeTestEnvironment() + require.NoError(t, err) + useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false")) + require.NoError(t, err) + port := env.Port + var tlsConfig *tls.Config + if useSSL { + port = env.SslPort + tlsConfig = &tls.Config{} + } + conn, err := GetConnectionWithOptions(&clickhouse.Options{ + Addr: []string{fmt.Sprintf("%s:%d", env.Host, port)}, + Auth: clickhouse.Auth{ + Database: "default", + Username: env.Username, + Password: env.Password, + }, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + TLS: tlsConfig, + BlockBufferSize: 100, + }) + require.NoError(t, err) + var count uint64 + rows, err := conn.Query(clickhouse.Context(context.Background(), clickhouse.WithBlockBufferSize(50)), "SELECT number FROM numbers(10000000)") + require.NoError(t, err) + i := 0 + for rows.Next() { + require.NoError(t, rows.Scan(&count)) + i++ + } + require.Equal(t, 10000000, i) +} diff --git a/tests/std/conn_test.go b/tests/std/conn_test.go index 7018c744d7..691506c938 100644 --- a/tests/std/conn_test.go +++ b/tests/std/conn_test.go @@ -226,3 +226,32 @@ func TestStdConnector(t *testing.T) { err = db.Ping() require.NoError(t, err) } + +func TestBlockBufferSize(t *testing.T) { + env, err := GetStdTestEnvironment() + require.NoError(t, err) + dsns := map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s", env.Host, env.Port, env.Username, env.Password), + "Http": fmt.Sprintf("http://%s:%d?username=%s&password=%s", env.Host, env.HttpPort, env.Username, env.Password)} + useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false")) + require.NoError(t, err) + if useSSL { + dsns = map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%d?username=%s&password=%s&secure=true", env.Host, env.SslPort, env.Username, env.Password), + "Http": fmt.Sprintf("https://%s:%d?username=%s&password=%s&secure=true", env.Host, env.HttpsPort, env.Username, env.Password)} + } + for name, dsn := range dsns { + t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) { + dsn := fmt.Sprintf("%s&block_buffer_size=100", dsn) + conn, err := GetConnectionFromDSN(dsn) + require.NoError(t, err) + var count uint64 + rows, err := conn.Query("SELECT number FROM numbers(1000000)") + require.NoError(t, err) + i := 0 + for rows.Next() { + require.NoError(t, rows.Scan(&count)) + i++ + } + require.Equal(t, 1000000, i) + }) + } +}