From 7d2211df1d772479aa4d230f8e12b00e38803cb6 Mon Sep 17 00:00:00 2001 From: Zhehao Wu Date: Tue, 3 Oct 2023 19:04:07 +0800 Subject: [PATCH] Fix: Block stream read process would be terminated by empty block with zero rows (#1104) * support skip empty block while reading rows from stream * remove unrelated comments --- clickhouse_rows.go | 2 +- clickhouse_rows_test.go | 116 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 clickhouse_rows_test.go diff --git a/clickhouse_rows.go b/clickhouse_rows.go index 150e82bfb9..698905e7f0 100644 --- a/clickhouse_rows.go +++ b/clickhouse_rows.go @@ -52,7 +52,6 @@ next: r.err = err return false } - goto next case block := <-r.stream: if block == nil { return false @@ -63,6 +62,7 @@ next: } r.row, r.block = 0, block } + goto next } r.row++ return r.row <= r.block.Rows() diff --git a/clickhouse_rows_test.go b/clickhouse_rows_test.go new file mode 100644 index 0000000000..6de3b89e3f --- /dev/null +++ b/clickhouse_rows_test.go @@ -0,0 +1,116 @@ +package clickhouse + +import ( + "github.com/ClickHouse/clickhouse-go/v2/lib/proto" + "github.com/stretchr/testify/assert" + "strconv" + "testing" +) + +func TestReadWithEmptyBlock(t *testing.T) { + blockInitFunc := func() *proto.Block { + retVal := &proto.Block{ + Packet: 0, + Columns: nil, + Timezone: nil, + } + retVal.AddColumn("col1", ("Int64")) + retVal.AddColumn("col2", ("String")) + return retVal + } + + testCases := map[string]struct { + actual func() rows + expected int + }{ + "none empty": { + func() rows { + firstBlock := blockInitFunc() + firstBlock.Append(int64(0), strconv.Itoa(0)) + blockChan := make(chan *proto.Block) + go func() { + for i := 1; i < 10; i++ { + block := blockInitFunc() + block.Append(int64(i), strconv.Itoa(i)) + blockChan <- block + } + close(blockChan) + }() + return rows{ + err: nil, + row: 0, + block: firstBlock, + totals: nil, + errors: nil, + stream: blockChan, + columns: nil, + structMap: nil, + } + }, + 10, + }, + "all empty": { + func() rows { + firstBlock := blockInitFunc() + blockChan := make(chan *proto.Block) + go func() { + for i := 1; i < 10; i++ { + block := blockInitFunc() + blockChan <- block + } + close(blockChan) + }() + return rows{ + err: nil, + row: 0, + block: firstBlock, + totals: nil, + errors: nil, + stream: blockChan, + columns: nil, + structMap: nil, + } + }, + 0, + }, + "some empty": { + func() rows { + firstBlock := blockInitFunc() + blockChan := make(chan *proto.Block) + go func() { + for i := 1; i < 10; i++ { + block := blockInitFunc() + if i%2 == 0 { + block.Append(int64(i), strconv.Itoa(i)) + } + blockChan <- block + } + close(blockChan) + }() + return rows{ + err: nil, + row: 0, + block: firstBlock, + totals: nil, + errors: nil, + stream: blockChan, + columns: nil, + structMap: nil, + } + }, + 4, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + actual := testCase.actual() + + rowCnt := 0 + for actual.Next() { + rowCnt++ + } + assert.Equal(t, testCase.expected, rowCnt) + }) + } +}