From dcf2b3fd2877e482710b12f32a70fb6066459cfe Mon Sep 17 00:00:00 2001 From: Kuba Kaflik Date: Thu, 12 Oct 2023 11:28:17 +0200 Subject: [PATCH] Add context watchdog for batch send cancellation * batch: context watchdog * fix context cancellation --- conn_batch.go | 13 ++++++++++++ context_watchdog.go | 47 ++++++++++++++++++++++++++++++++++++++++ tests/batch_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+) create mode 100644 context_watchdog.go create mode 100644 tests/batch_test.go diff --git a/conn_batch.go b/conn_batch.go index a4081a7f7d..0bfbc64ae9 100644 --- a/conn_batch.go +++ b/conn_batch.go @@ -178,7 +178,14 @@ func (b *batch) Column(idx int) driver.BatchColumn { } func (b *batch) Send() (err error) { + stopCW := contextWatchdog(b.ctx, func() { + // close TCP connection on context cancel. There is no other way simple way to interrupt underlying operations. + // as verified in the test, this is safe to do and cleanups resources later on + _ = b.conn.conn.Close() + }) + defer func() { + stopCW() b.sent = true b.release(err) }() @@ -192,6 +199,12 @@ func (b *batch) Send() (err error) { } if b.block.Rows() != 0 { if err = b.conn.sendData(b.block, ""); err != nil { + // there might be an error caused by context cancellation + // in this case we should return context error instead of net.OpError + if ctxErr := b.ctx.Err(); ctxErr != nil { + return ctxErr + } + return err } } diff --git a/context_watchdog.go b/context_watchdog.go new file mode 100644 index 0000000000..8c2257e4b4 --- /dev/null +++ b/context_watchdog.go @@ -0,0 +1,47 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package clickhouse + +import "context" + +// contextWatchdog is a helper function to run a callback when the context is done. +// it has a cancellation function to prevent the callback from running. +// Useful for interrupting some logic when the context is done, +// but you want to not bother about context cancellation if your logic is already done. +// Example: +// stopCW := contextWatchdog(ctx, func() { /* do something */ }) +// // do something else +// defer stopCW() +func contextWatchdog(ctx context.Context, callback func()) (cancel func()) { + exit := make(chan struct{}) + + go func() { + for { + select { + case <-exit: + return + case <-ctx.Done(): + callback() + } + } + }() + + return func() { + exit <- struct{}{} + } +} diff --git a/tests/batch_test.go b/tests/batch_test.go new file mode 100644 index 0000000000..2f3f04bc9c --- /dev/null +++ b/tests/batch_test.go @@ -0,0 +1,52 @@ +// Licensed to ClickHouse, Inc. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. ClickHouse, Inc. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package tests + +import ( + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "testing" + "time" +) + +func TestBatchContextCancellation(t *testing.T) { + te, err := GetTestEnvironment(testSet) + require.NoError(t, err) + opts := ClientOptionsFromEnv(te, clickhouse.Settings{}) + opts.MaxOpenConns = 1 + conn, err := GetConnectionWithOptions(&opts) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + require.NoError(t, conn.Exec(context.Background(), "create table if not exists test_batch_cancellation (x String) engine=Memory")) + defer conn.Exec(context.Background(), "drop table if exists test_batch_cancellation") + + b, err := conn.PrepareBatch(ctx, "insert into test_batch_cancellation") + require.NoError(t, err) + for i := 0; i < 1_000_000; i++ { + require.NoError(t, b.Append("value")) + } + + require.Equal(t, context.DeadlineExceeded, b.Send()) + + // assert if connection is properly released after context cancellation + require.NoError(t, conn.Exec(context.Background(), "SELECT 1")) +}