Skip to content

Commit

Permalink
fix(storage): recluster endless loop (#16831)
Browse files Browse the repository at this point in the history
* fix recluster endless loop

* fix

* fix test

* fix test
  • Loading branch information
zhyass authored Nov 13, 2024
1 parent 18bd255 commit e909334
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,42 +94,24 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder {
res.push(Self::create_output_data(&mut self.staged_blocks));
}

if !self.check_for_compact() && !self.pending_blocks.is_empty() {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
} else {
if self.pending_blocks.is_empty() || self.check_for_compact() {
// N <= blocks < 2N
std::mem::swap(&mut self.staged_blocks, &mut self.pending_blocks);
} else {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
}
self.staged_blocks.push(data);
self.reset_accumulated();
Ok(res)
}

fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
match (
self.pending_blocks.is_empty(),
self.staged_blocks.is_empty(),
) {
(true, true) => Ok(vec![]),
(true, false) => Ok(vec![Self::create_output_data(&mut self.staged_blocks)]),
(false, true) => Ok(vec![Self::create_output_data(&mut self.pending_blocks)]),
(false, false) => {
for block in &self.staged_blocks {
self.accumulated_rows += block.num_rows();
self.accumulated_bytes += block.memory_size();
}
if self.check_for_compact() {
self.staged_blocks.append(&mut self.pending_blocks);
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
} else {
// blocks > 2N
Ok(vec![
Self::create_output_data(&mut self.staged_blocks),
Self::create_output_data(&mut self.pending_blocks),
])
}
}
self.staged_blocks.append(&mut self.pending_blocks);
if self.staged_blocks.is_empty() {
Ok(vec![])
} else {
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ statement ok
ALTER TABLE t3 RECLUSTER

query FFT
select average_overlaps, average_depth, block_depth_histogram from clustering_information('db_09_0016','t3')
select average_overlaps, average_depth from clustering_information('db_09_0016','t3')
----
0.0 1.0 {"00001":2}
0.0 1.0

# test trim string
statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use i15760;
statement ok
set auto_compaction_imperfect_blocks_threshold = 3;

statement ok
set enable_parallel_multi_merge_sort = 0;

# ISSUE 15760
statement ok
Expand Down Expand Up @@ -83,17 +85,17 @@ statement ok
insert into t1 values(1),(2),(7);

statement ok
insert into t1 values(3),(5),(9);
insert into t1 values(3),(5),(8);

statement ok
insert into t1 values(4),(6),(8);
insert into t1 values(4),(6);

query III
select segment_count, block_count, row_count from fuse_snapshot('i15760', 't1') limit 10;
----
1 2 9
1 2 9
3 3 9
1 1 8
1 2 8
3 3 8
2 2 6
1 1 3

Expand Down

0 comments on commit e909334

Please sign in to comment.