From 5c67de4031ffb2c81956eb3b3d1e0eda3a09cf8a Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Mon, 25 Mar 2024 16:32:11 -0700 Subject: [PATCH] Fix multiple files spilled for the distinct hash table (#9230) Summary: The existing distinct aggregation implementation assumes that there is one file generated for each spill run. And use stream id 0 to detect if a row read from spilled file is distinct one or not. This is no longer true after we add support to configure the max number of rows to spill in each sorted spill file for aggregation which means stream id > 0 could also contains the distinct values. This will cause incorrect data result and reported by [issue](https://github.com/facebookincubator/velox/issues/9219). This PR fixes this issue by recording the number of spilled files on the first spill in grouping set to detect the spilled files that contain the seen distinct values. Unit test is added to reproduce and verify the fix. Also removed the unused spill config Pull Request resolved: https://github.com/facebookincubator/velox/pull/9230 Reviewed By: oerling Differential Revision: D55288249 Pulled By: xiaoxmeng fbshipit-source-id: 0b96263ea3c08d8e5bd9e210f77547d642c2f2db --- velox/common/base/SpillConfig.cpp | 2 - velox/common/base/SpillConfig.h | 10 -- velox/common/base/tests/SpillConfigTest.cpp | 3 - .../hive/tests/HiveDataSinkTest.cpp | 1 - velox/dwio/dwrf/test/E2EWriterTest.cpp | 1 - velox/exec/Driver.cpp | 1 - velox/exec/GroupingSet.cpp | 23 ++- velox/exec/GroupingSet.h | 4 + velox/exec/Spill.cpp | 11 ++ velox/exec/Spill.h | 6 + velox/exec/SpillFile.cpp | 4 + velox/exec/SpillFile.h | 3 + velox/exec/fuzzer/AggregationFuzzerBase.cpp | 10 +- velox/exec/fuzzer/AggregationFuzzerBase.h | 2 + velox/exec/tests/AggregationTest.cpp | 166 ++++++++---------- velox/exec/tests/SortBufferTest.cpp | 3 - velox/exec/tests/SpillTest.cpp | 9 + 17 files changed, 137 insertions(+), 122 deletions(-) diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index ff7d759aa9d5..d3a51549968e 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -24,7 +24,6 @@ SpillConfig::SpillConfig( std::string _fileNamePrefix, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -42,7 +41,6 @@ SpillConfig::SpillConfig( _maxFileSize == 0 ? std::numeric_limits::max() : _maxFileSize), writeBufferSize(_writeBufferSize), - minSpillRunSize(_minSpillRunSize), executor(_executor), minSpillableReservationPct(_minSpillableReservationPct), spillableReservationGrowthPct(_spillableReservationGrowthPct), diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index ad5f4b3ab5b4..a51801a62a49 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -51,7 +51,6 @@ struct SpillConfig { std::string _filePath, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -94,15 +93,6 @@ struct SpillConfig { /// storage system for io efficiency. uint64_t writeBufferSize; - /// The min spill run size (bytes) limit used to select partitions for - /// spilling. The spiller tries to spill a previously spilled partitions if - /// its data size exceeds this limit, otherwise it spills the partition with - /// most data. If the limit is zero, then the spiller always spill a - /// previously spilled partition if it has any data. This is to avoid spill - /// from a partition with a small amount of data which might result in - /// generating too many small spilled files. - uint64_t minSpillRunSize; - /// Executor for spilling. If nullptr spilling writes on the Driver's thread. folly::Executor* executor; // Not owned. diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 6dac69a074a5..bdcbf1b0f22c 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -31,7 +31,6 @@ TEST(SpillConfig, spillLevel) { "fakeSpillPath", 0, 0, - 0, nullptr, 0, 0, @@ -116,7 +115,6 @@ TEST(SpillConfig, spillLevelLimit) { "fakeSpillPath", 0, 0, - 0, nullptr, 0, 0, @@ -163,7 +161,6 @@ TEST(SpillConfig, spillableReservationPercentages) { "spillableReservationPercentages", 0, 0, - 0, nullptr, testData.minPct, testData.growthPct, diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 7695c4e42993..b9b019232851 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -87,7 +87,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { "", 0, 0, - 0, spillExecutor_.get(), 10, 20, diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 4aa4bc86d317..a494230d486e 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -249,7 +249,6 @@ class E2EWriterTest : public testing::Test { "fakeSpillConfig", 0, 0, - 0, nullptr, minSpillableReservationPct, spillableReservationGrowthPct, diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 5bcb59d2a27f..f2f43c6b451a 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -150,7 +150,6 @@ std::optional DriverCtx::makeSpillConfig( spillFilePrefix, queryConfig.maxSpillFileSize(), queryConfig.spillWriteBufferSize(), - queryConfig.minSpillRunSize(), task->queryCtx()->spillExecutor(), queryConfig.minSpillableReservationPct(), queryConfig.spillableReservationGrowthPct(), diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 4c0ef7843cea..b1ad5e5742da 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -936,6 +936,7 @@ void GroupingSet::spill() { if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); + VELOX_CHECK_EQ(numDistinctSpilledFiles_, 0); spiller_ = std::make_unique( Spiller::Type::kAggregateInput, rows, @@ -944,8 +945,13 @@ void GroupingSet::spill() { std::vector(), spillConfig_, spillStats_); + VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); } spiller_->spill(); + if (isDistinct() && numDistinctSpilledFiles_ == 0) { + numDistinctSpilledFiles_ = spiller_->state().numFinishedFiles(0); + VELOX_CHECK_GT(numDistinctSpilledFiles_, 0); + } if (sortedAggregations_) { sortedAggregations_->clear(); } @@ -1064,16 +1070,19 @@ bool GroupingSet::mergeNextWithoutAggregates( const RowVectorPtr& result) { VELOX_CHECK_NOT_NULL(merge_); VELOX_CHECK(isDistinct()); + VELOX_CHECK_GT(numDistinctSpilledFiles_, 0); // We are looping over sorted rows produced by tree-of-losers. We logically // split the stream into runs of duplicate rows. As we process each run we - // track whether one of the values comes from stream 0, in which case we - // should not produce a result from that run. Otherwise, we produce a result - // at the end of the run (when we know for sure whether the run contains a row - // from stream 0 or not). + // track whether one of the values coming from distinct streams, in which case + // we should not produce a result from that run. Otherwise, we produce a + // result at the end of the run (when we know for sure whether the run + // contains a row from the distinct streams). // - // NOTE: stream 0 contains rows which has already been output as distinct - // before we trigger spilling. + // NOTE: the distinct stream refers to the stream that contains the spilled + // distinct hash table. A distinct stream contains rows which has already + // been output as distinct before we trigger spilling. A distinct stream id is + // less than 'numDistinctSpilledFiles_'. bool newDistinct{true}; int32_t numOutputRows{0}; while (numOutputRows < maxOutputRows) { @@ -1082,7 +1091,7 @@ bool GroupingSet::mergeNextWithoutAggregates( if (stream == nullptr) { break; } - if (stream->id() == 0) { + if (stream->id() < numDistinctSpilledFiles_) { newDistinct = false; } if (next.second) { diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index 8cbbe8ff99bc..072a5b500e4f 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -327,6 +327,10 @@ class GroupingSet { bool remainingMayPushdown_; std::unique_ptr spiller_; + // Sets to the number of files stores the spilled distinct hash table which + // are the files generated by the first spill call. This only applies for + // distinct hash aggregation. + size_t numDistinctSpilledFiles_{0}; std::unique_ptr> merge_; // Container for materializing batches of output from spilling. diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index eb0bab730f28..c6aa380ccdbb 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -155,6 +155,17 @@ void SpillState::finishFile(uint32_t partition) { writer->finishFile(); } +size_t SpillState::numFinishedFiles(uint32_t partition) const { + if (!isPartitionSpilled(partition)) { + return 0; + } + const auto* writer = partitionWriter(partition); + if (writer == nullptr) { + return 0; + } + return writer->numFinishedFiles(); +} + SpillFiles SpillState::finish(uint32_t partition) { auto* writer = partitionWriter(partition); if (writer == nullptr) { diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index d306a39a78cb..c350df9839df 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -380,6 +380,12 @@ class SpillState { /// far. void finishFile(uint32_t partition); + /// Returns the current number of finished files from a given partition. + /// + /// NOTE: the fucntion returns zero if the state has finished or the partition + /// is not spilled yet. + size_t numFinishedFiles(uint32_t partition) const; + /// Returns the spill file objects from a given 'partition'. The function /// returns an empty list if either the partition has not been spilled or has /// no spilled data. diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 240da4a9c952..46ae2cc9b44f 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -137,6 +137,10 @@ void SpillWriter::closeFile() { currentFile_.reset(); } +size_t SpillWriter::numFinishedFiles() const { + return finishedFiles_.size(); +} + uint64_t SpillWriter::flush() { if (batch_ == nullptr) { return 0; diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index bb7a54299cd0..eee5a1727119 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -135,6 +135,9 @@ class SpillWriter { /// start a new one. void finishFile(); + /// Returns the number of current finished files. + size_t numFinishedFiles() const; + /// Finishes this file writer and returns the written spill files info. /// /// NOTE: we don't allow write to a spill writer after t diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 6594cf359c22..d826f6fbc232 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -87,6 +87,10 @@ DEFINE_bool( namespace facebook::velox::exec::test { +int32_t AggregationFuzzerBase::randInt(int32_t min, int32_t max) { + return boost::random::uniform_int_distribution(min, max)(rng_); +} + bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const { // Date / IntervalDayTime/ Unknown are not currently supported by DWRF. if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { @@ -403,8 +407,10 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true"); - spillPct = 100; + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config(core::QueryConfig::kMaxSpillRunRows, randInt(32, 1L << 30)); + // Randomized the spill injection with a percentage less than 100. + spillPct = 20; } if (abandonPartial) { diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index fec35c5e2da7..533c189ffb3f 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -141,6 +141,8 @@ class AggregationFuzzerBase { AggregationFuzzerBase::ReferenceQueryErrorCode errorCode); }; + int32_t randInt(int32_t min, int32_t max); + bool addSignature( const std::string& name, const FunctionSignaturePtr& signature); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 04eb340f904e..621a2d56d210 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -3180,114 +3180,96 @@ TEST_F(AggregationTest, maxSpillBytes) { } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregation) { - std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); + const int numInputs = 8; + std::vector vectors = + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - std::vector sameQueries = {false, true}; - for (bool sameQuery : sameQueries) { - SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto fakeQueryCtx = std::make_shared(executor_.get()); - std::shared_ptr aggregationQueryCtx; - if (sameQuery) { - aggregationQueryCtx = fakeQueryCtx; - } else { - aggregationQueryCtx = std::make_shared(executor_.get()); - } + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); - folly::EventCount arbitrationWait; - std::atomic_bool arbitrationWaitFlag{true}; - folly::EventCount taskPauseWait; - std::atomic_bool taskPauseWaitFlag{true}; - - std::atomic_int numInputs{0}; + std::atomic_int inputCount{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "Aggregation") { + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { return; } - if (++numInputs != 5) { + // Inject spill in the middle of aggregation input processing. + if (++inputCount != numInputs / 2) { return; } - arbitrationWaitFlag = false; - arbitrationWait.notifyAll(); - - // Wait for task pause to be triggered. - taskPauseWait.await([&] { return !taskPauseWaitFlag.load(); }); + testingRunArbitration(op->pool()); }))); - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Task::requestPauseLocked", - std::function(([&](Task* /*unused*/) { - taskPauseWaitFlag = false; - taskPauseWait.notifyAll(); - }))); - - std::thread aggregationThread([&]() { - core::PlanNodeId aggrNodeId; - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(aggregationQueryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults( - "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); - ASSERT_GT(planStats.customStats["memoryReclaimWallNanos"].sum, 0); - ASSERT_GT(planStats.customStats["reclaimedMemoryBytes"].sum, 0); - }); - - arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); }); - - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, exec::MemoryReclaimer::create()); - fakePool->maybeReserve(memory::memoryManager()->arbitrator()->capacity()); - - aggregationThread.join(); - + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults( + "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); + task.reset(); waitForAllTasksToBeDeleted(); } } -TEST_F(AggregationTest, reclaimFromDistinctAggregation) { - const uint64_t maxQueryCapacity = 20L << 20; +DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromDistinctAggregation) { + const int numInputs = 32; std::vector vectors = - createVectors(rowType_, maxQueryCapacity * 10, fuzzerOpts_); + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - core::PlanNodeId aggrNodeId; - std::shared_ptr queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - std::unordered_map>{}, - cache::AsyncDataCache::getInstance(), - memory::memoryManager()->addRootPool( - "test-root", maxQueryCapacity, exec::MemoryReclaimer::create())); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(queryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0"}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - task.reset(); - waitForAllTasksToBeDeleted(); + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); + + std::atomic_int inputCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { + return; + } + // Inject spill at the end of aggregation input processing. + if (++inputCount != numInputs / 2) { + return; + } + testingRunArbitration(op->pool()); + }))); + + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); + } } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregationOnNoMoreInput) { diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 9b6a285755a9..83a23fbefeea 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -49,7 +49,6 @@ class SortBufferTest : public OperatorTestBase { "0.0.0", 0, 0, - 0, executor_.get(), 5, 10, @@ -291,7 +290,6 @@ TEST_F(SortBufferTest, batchOutput) { "0.0.0", 1000, 0, - 1000, executor_.get(), 5, 10, @@ -386,7 +384,6 @@ TEST_F(SortBufferTest, spill) { "0.0.0", 1000, 0, - 1000, executor_.get(), 100, spillableReservationGrowthPct, diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index 05ff84fd915a..a6238db7611d 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -177,6 +177,7 @@ class SpillTest : public ::testing::TestWithParam, for (auto partition = 0; partition < state_->maxPartitions(); ++partition) { ASSERT_FALSE(state_->isPartitionSpilled(partition)); + ASSERT_EQ(state_->numFinishedFiles(partition), 0); // Expect an exception if partition is not set to spill. { RowVectorPtr dummyInput; @@ -224,9 +225,11 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_TRUE( state_->testingNonEmptySpilledPartitionSet().contains(partition)); + ASSERT_GE(state_->numFinishedFiles(partition), 0); // Indicates that the next additions to 'partition' are not sorted // with respect to the values added so far. state_->finishFile(partition); + ASSERT_GE(state_->numFinishedFiles(partition), 1); ASSERT_TRUE( state_->testingNonEmptySpilledPartitionSet().contains(partition)); } @@ -248,6 +251,11 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_GT( stats_.rlock()->spilledBytes, numPartitions * numBatches * sizeof(int64_t)); + int numFinishedFiles{0}; + for (int partition = 0; partition < numPartitions; ++partition) { + numFinishedFiles += state_->numFinishedFiles(partition); + } + ASSERT_EQ(numFinishedFiles, expectedFiles); } // 'numDuplicates' specifies the number of duplicates generated for each @@ -341,6 +349,7 @@ class SpillTest : public ::testing::TestWithParam, for (auto partition = 0; partition < state_->maxPartitions(); ++partition) { auto spillFiles = state_->finish(partition); + ASSERT_EQ(state_->numFinishedFiles(partition), 0); auto spillPartition = SpillPartition(SpillPartitionId{0, partition}, std::move(spillFiles)); auto merge = spillPartition.createOrderedReader(pool());