From f8b2a8037e2808be29ae096cbbd7c18aec6cda61 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 2 Jun 2022 18:22:28 +0800 Subject: [PATCH] flush cache before segment merge (#4955) (#4966) close pingcap/tiflash#4956 --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 23 +++++++++++ dbms/src/Storages/DeltaMerge/Segment.cpp | 38 ++++++++++++++++++- dbms/src/Storages/DeltaMerge/Segment.h | 2 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 28 +++++++++----- 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 7b1c7eff457..315fdb3d202 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1721,6 +1721,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le (is_foreground ? "Foreground" : "Background") << " merge Segment [" << left->info() << "] and [" << right->info() << "], safe point:" << dm_context.min_version); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); + return; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); + return; + } + } + SegmentSnapshotPtr left_snap; SegmentSnapshotPtr right_snap; ColumnDefinesPtr schema_snap; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index f1f1d0553cb..be38ed9702e 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -286,7 +286,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o return delta->appendToCache(dm_context, block, offset, limit); } -bool Segment::write(DMContext & dm_context, const Block & block) +bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache) { LOG_TRACE(log, "Segment [" << segment_id << "] write to disk rows: " << block.rows()); WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); @@ -296,7 +296,14 @@ bool Segment::write(DMContext & dm_context, const Block & block) if (delta->appendPack(dm_context, pack)) { - flushCache(dm_context); + if (flush_cache) + { + while (!flushCache(dm_context)) + { + if (hasAbandoned()) + return false; + } + } return true; } else @@ -1056,6 +1063,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right) { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + while (!left->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_DEBUG(left->log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); + return {}; + } + } + while (!right->flushCache(dm_context)) + { + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_DEBUG(right->log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); + return {}; + } + } + auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); @@ -1076,6 +1106,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem return merged; } +/// Segments may contain some rows that not belong to its range which is left by previous split operation. +/// And only saved data in the segment will be filtered by the segment range in the merge process, +/// unsaved data will be directly copied to the new segment. +/// So remember to do a flush for the segments before merge. StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index a405030be25..00f1834a422 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -116,7 +116,7 @@ class Segment : private boost::noncopyable bool writeToDisk(DMContext & dm_context, const DeltaPackPtr & pack); bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); - bool write(DMContext & dm_context, const Block & block); // For test only + bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); // For test only bool write(DMContext & dm_context, const RowKeyRange & delete_range); bool ingestPacks(DMContext & dm_context, const RowKeyRange & range, const DeltaPacks & packs, bool clear_data_in_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a2d7c821ff1..566c72b7b9e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -783,11 +783,17 @@ CATCH TEST_F(Segment_test, Split) try { - const size_t num_rows_write = 100; + const size_t num_rows_write_per_batch = 100; + const size_t num_rows_write = num_rows_write_per_batch * 2; { - // write to segment - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); - segment->write(dmContext(), std::move(block)); + // write to segment and flush + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), true); + } + { + // write to segment and don't flush + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), false); } { @@ -823,7 +829,7 @@ try size_t num_rows_seg2 = 0; { { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -832,7 +838,7 @@ try in->readSuffix(); } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -843,9 +849,13 @@ try ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); } + // delete rows in the right segment + { + new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange()); + new_segment->flushCache(dmContext()); + } + // merge segments - // TODO: enable merge test! - if (false) { segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment); { @@ -864,7 +874,7 @@ try num_rows_read += block.rows(); } in->readSuffix(); - EXPECT_EQ(num_rows_read, num_rows_write); + EXPECT_EQ(num_rows_read, num_rows_seg1); } } }