Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection resets/broken pipes during batch flush are unrecoverable #1420

Closed
rgettys-tesla opened this issue Oct 10, 2024 · 1 comment · Fixed by #1423
Closed

Connection resets/broken pipes during batch flush are unrecoverable #1420

rgettys-tesla opened this issue Oct 10, 2024 · 1 comment · Fixed by #1423

Comments

@rgettys-tesla
Copy link
Contributor

rgettys-tesla commented Oct 10, 2024

Observed

  1. Prepared statement with driver.WithCloseOnFlush()
  2. Appended very large (or a lot of) rows to statement
  3. Flushed
  4. Restarted clickhouse server while still flushing
  5. Received broken pipe or connection reset
  6. Attempted retries forever with same result, since connection never released.

Expected behaviour

Blocks sent on a pool containing connections which are no longer valid should (eventually) be flushable once those connections are cycled out of the pool. The only current alternative client side is to retain a second buffer so that the statement can be completely re-prepared and appended, which requires double the memory.

Code example

// The following test case relies on timing and sufficiently large batches to trigger that timing but is more illustrative of the problem without building mock pool/statement/connection structs in the clickhouse package

package tests

import (
"context"
"errors"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"os"
"syscall"
"testing"
)

//goland:noinspection ALL
const insertQry = "INSERT INTO test (foo, foo2)"

func TestBatchFlushBrokenConn(t *testing.T) {
env, err := GetNativeTestEnvironment()
require.NoError(t, err)
require.NotNil(t, env)
ctx := context.Background()
client, err := testcontainers.NewDockerClientWithOpts(ctx)
require.NoError(t, err)
chClient, err := getConnection(env, env.Database, clickhouse.Settings{}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})

err = chClient.Exec(ctx, "CREATE TABLE test (foo String, foo2 String)  ENGINE = MergeTree ORDER BY (foo)")
require.NoError(t, err)
batch, err := chClient.PrepareBatch(ctx, insertQry, driver.WithCloseOnFlush())
require.NoError(t, err)
err = batch.Append("bar", "bar")
require.NoError(t, err)
err = batch.Flush()
require.NoError(t, err)
err = batch.Append("bar2", "bar2")
require.NoError(t, err)
err = batch.Flush()
require.NoError(t, err)

err = batch.Append(RandAsciiString(200000000), RandAsciiString(20000000))

require.NoError(t, err)
ch := make(chan struct{})
go func() {
	err = batch.Flush()
	close(ch)
}()
//timeout := 0
err2 := client.ContainerKill(ctx, env.ContainerID, "KILL")
<-ch
require.NoError(t, err2)
require.True(t, errors.Is(err, syscall.EPIPE) || errors.Is(err, syscall.ECONNRESET))
// unbreak the test environment
env, err = CreateClickHouseTestEnvironment("native")
require.NoError(t, err)
SetTestEnvironment("native", env)
if err := CreateDatabase("native"); err != nil {
	panic(err)
}

err = batch.Flush()
// retry after server is up should have either no error, or a reconnect error (for example because the mapped port
// changed on container startup)
require.True(t, err == nil || errors.Is(err, syscall.ECONNREFUSED) || os.IsTimeout(err), err)

}


## Error log
write tcp 127.0.0.1:60823->127.0.0.1:60787: write: broken pipe
write
github.com/ClickHouse/clickhouse-go/v2.(*connect).flush
	/Users/rgettys/git/clickhouse-go-upstream/conn.go:363
github.com/ClickHouse/clickhouse-go/v2.(*connect).sendData
	/Users/rgettys/git/clickhouse-go-upstream/conn.go:283
github.com/ClickHouse/clickhouse-go/v2.(*batch).Flush
	/Users/rgettys/git/clickhouse-go-upstream/conn_batch.go:289

## Details

### Environment
* [X] `clickhouse-go` version: v2.29.0
* [X] Interface: ClickHouse API 
* [X] Go version: 1.22
* [X] Operating system: Ubuntu 22.04, kernel 6.8
* [X] ClickHouse version: 24.9
* [X] Is it a ClickHouse Cloud? Yes, sortof. Private SMT Binary.
* [X] ClickHouse Server non-default settings, if any: 
* [X] `CREATE TABLE` statements for tables involved: In the test code
* [X] Sample data for all these tables, use [clickhouse-obfuscator](https://github.com/ClickHouse/ClickHouse/blob/master/programs/obfuscator/Obfuscator.cpp#L42-L80) if necessary (see test)
@rgettys-tesla
Copy link
Contributor Author

PR will be forthcoming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant