diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index ff7d759aa9d5..d3a51549968e 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -24,7 +24,6 @@ SpillConfig::SpillConfig( std::string _fileNamePrefix, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -42,7 +41,6 @@ SpillConfig::SpillConfig( _maxFileSize == 0 ? std::numeric_limits::max() : _maxFileSize), writeBufferSize(_writeBufferSize), - minSpillRunSize(_minSpillRunSize), executor(_executor), minSpillableReservationPct(_minSpillableReservationPct), spillableReservationGrowthPct(_spillableReservationGrowthPct), diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index ad5f4b3ab5b4..a51801a62a49 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -51,7 +51,6 @@ struct SpillConfig { std::string _filePath, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -94,15 +93,6 @@ struct SpillConfig { /// storage system for io efficiency. uint64_t writeBufferSize; - /// The min spill run size (bytes) limit used to select partitions for - /// spilling. The spiller tries to spill a previously spilled partitions if - /// its data size exceeds this limit, otherwise it spills the partition with - /// most data. If the limit is zero, then the spiller always spill a - /// previously spilled partition if it has any data. This is to avoid spill - /// from a partition with a small amount of data which might result in - /// generating too many small spilled files. - uint64_t minSpillRunSize; - /// Executor for spilling. If nullptr spilling writes on the Driver's thread. folly::Executor* executor; // Not owned. diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 6dac69a074a5..bdcbf1b0f22c 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -31,7 +31,6 @@ TEST(SpillConfig, spillLevel) { "fakeSpillPath", 0, 0, - 0, nullptr, 0, 0, @@ -116,7 +115,6 @@ TEST(SpillConfig, spillLevelLimit) { "fakeSpillPath", 0, 0, - 0, nullptr, 0, 0, @@ -163,7 +161,6 @@ TEST(SpillConfig, spillableReservationPercentages) { "spillableReservationPercentages", 0, 0, - 0, nullptr, testData.minPct, testData.growthPct, diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 7695c4e42993..b9b019232851 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -87,7 +87,6 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { "", 0, 0, - 0, spillExecutor_.get(), 10, 20, diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 4aa4bc86d317..a494230d486e 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -249,7 +249,6 @@ class E2EWriterTest : public testing::Test { "fakeSpillConfig", 0, 0, - 0, nullptr, minSpillableReservationPct, spillableReservationGrowthPct, diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 849252a58384..5dac9f9d6ec6 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -150,7 +150,6 @@ std::optional DriverCtx::makeSpillConfig( spillFilePrefix, queryConfig.maxSpillFileSize(), queryConfig.spillWriteBufferSize(), - queryConfig.minSpillRunSize(), task->queryCtx()->spillExecutor(), queryConfig.minSpillableReservationPct(), queryConfig.spillableReservationGrowthPct(), diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index 4c0ef7843cea..b1ad5e5742da 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -936,6 +936,7 @@ void GroupingSet::spill() { if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); + VELOX_CHECK_EQ(numDistinctSpilledFiles_, 0); spiller_ = std::make_unique( Spiller::Type::kAggregateInput, rows, @@ -944,8 +945,13 @@ void GroupingSet::spill() { std::vector(), spillConfig_, spillStats_); + VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); } spiller_->spill(); + if (isDistinct() && numDistinctSpilledFiles_ == 0) { + numDistinctSpilledFiles_ = spiller_->state().numFinishedFiles(0); + VELOX_CHECK_GT(numDistinctSpilledFiles_, 0); + } if (sortedAggregations_) { sortedAggregations_->clear(); } @@ -1064,16 +1070,19 @@ bool GroupingSet::mergeNextWithoutAggregates( const RowVectorPtr& result) { VELOX_CHECK_NOT_NULL(merge_); VELOX_CHECK(isDistinct()); + VELOX_CHECK_GT(numDistinctSpilledFiles_, 0); // We are looping over sorted rows produced by tree-of-losers. We logically // split the stream into runs of duplicate rows. As we process each run we - // track whether one of the values comes from stream 0, in which case we - // should not produce a result from that run. Otherwise, we produce a result - // at the end of the run (when we know for sure whether the run contains a row - // from stream 0 or not). + // track whether one of the values coming from distinct streams, in which case + // we should not produce a result from that run. Otherwise, we produce a + // result at the end of the run (when we know for sure whether the run + // contains a row from the distinct streams). // - // NOTE: stream 0 contains rows which has already been output as distinct - // before we trigger spilling. + // NOTE: the distinct stream refers to the stream that contains the spilled + // distinct hash table. A distinct stream contains rows which has already + // been output as distinct before we trigger spilling. A distinct stream id is + // less than 'numDistinctSpilledFiles_'. bool newDistinct{true}; int32_t numOutputRows{0}; while (numOutputRows < maxOutputRows) { @@ -1082,7 +1091,7 @@ bool GroupingSet::mergeNextWithoutAggregates( if (stream == nullptr) { break; } - if (stream->id() == 0) { + if (stream->id() < numDistinctSpilledFiles_) { newDistinct = false; } if (next.second) { diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index 8cbbe8ff99bc..072a5b500e4f 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -327,6 +327,10 @@ class GroupingSet { bool remainingMayPushdown_; std::unique_ptr spiller_; + // Sets to the number of files stores the spilled distinct hash table which + // are the files generated by the first spill call. This only applies for + // distinct hash aggregation. + size_t numDistinctSpilledFiles_{0}; std::unique_ptr> merge_; // Container for materializing batches of output from spilling. diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index eb0bab730f28..c6aa380ccdbb 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -155,6 +155,17 @@ void SpillState::finishFile(uint32_t partition) { writer->finishFile(); } +size_t SpillState::numFinishedFiles(uint32_t partition) const { + if (!isPartitionSpilled(partition)) { + return 0; + } + const auto* writer = partitionWriter(partition); + if (writer == nullptr) { + return 0; + } + return writer->numFinishedFiles(); +} + SpillFiles SpillState::finish(uint32_t partition) { auto* writer = partitionWriter(partition); if (writer == nullptr) { diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index d306a39a78cb..c350df9839df 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -380,6 +380,12 @@ class SpillState { /// far. void finishFile(uint32_t partition); + /// Returns the current number of finished files from a given partition. + /// + /// NOTE: the fucntion returns zero if the state has finished or the partition + /// is not spilled yet. + size_t numFinishedFiles(uint32_t partition) const; + /// Returns the spill file objects from a given 'partition'. The function /// returns an empty list if either the partition has not been spilled or has /// no spilled data. diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 240da4a9c952..46ae2cc9b44f 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -137,6 +137,10 @@ void SpillWriter::closeFile() { currentFile_.reset(); } +size_t SpillWriter::numFinishedFiles() const { + return finishedFiles_.size(); +} + uint64_t SpillWriter::flush() { if (batch_ == nullptr) { return 0; diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index bb7a54299cd0..eee5a1727119 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -135,6 +135,9 @@ class SpillWriter { /// start a new one. void finishFile(); + /// Returns the number of current finished files. + size_t numFinishedFiles() const; + /// Finishes this file writer and returns the written spill files info. /// /// NOTE: we don't allow write to a spill writer after t diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 6594cf359c22..d826f6fbc232 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -87,6 +87,10 @@ DEFINE_bool( namespace facebook::velox::exec::test { +int32_t AggregationFuzzerBase::randInt(int32_t min, int32_t max) { + return boost::random::uniform_int_distribution(min, max)(rng_); +} + bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const { // Date / IntervalDayTime/ Unknown are not currently supported by DWRF. if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { @@ -403,8 +407,10 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true"); - spillPct = 100; + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config(core::QueryConfig::kMaxSpillRunRows, randInt(32, 1L << 30)); + // Randomized the spill injection with a percentage less than 100. + spillPct = 20; } if (abandonPartial) { diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index fec35c5e2da7..533c189ffb3f 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -141,6 +141,8 @@ class AggregationFuzzerBase { AggregationFuzzerBase::ReferenceQueryErrorCode errorCode); }; + int32_t randInt(int32_t min, int32_t max); + bool addSignature( const std::string& name, const FunctionSignaturePtr& signature); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 04eb340f904e..621a2d56d210 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -3180,114 +3180,96 @@ TEST_F(AggregationTest, maxSpillBytes) { } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregation) { - std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); + const int numInputs = 8; + std::vector vectors = + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - std::vector sameQueries = {false, true}; - for (bool sameQuery : sameQueries) { - SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto fakeQueryCtx = std::make_shared(executor_.get()); - std::shared_ptr aggregationQueryCtx; - if (sameQuery) { - aggregationQueryCtx = fakeQueryCtx; - } else { - aggregationQueryCtx = std::make_shared(executor_.get()); - } + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); - folly::EventCount arbitrationWait; - std::atomic_bool arbitrationWaitFlag{true}; - folly::EventCount taskPauseWait; - std::atomic_bool taskPauseWaitFlag{true}; - - std::atomic_int numInputs{0}; + std::atomic_int inputCount{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "Aggregation") { + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { return; } - if (++numInputs != 5) { + // Inject spill in the middle of aggregation input processing. + if (++inputCount != numInputs / 2) { return; } - arbitrationWaitFlag = false; - arbitrationWait.notifyAll(); - - // Wait for task pause to be triggered. - taskPauseWait.await([&] { return !taskPauseWaitFlag.load(); }); + testingRunArbitration(op->pool()); }))); - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Task::requestPauseLocked", - std::function(([&](Task* /*unused*/) { - taskPauseWaitFlag = false; - taskPauseWait.notifyAll(); - }))); - - std::thread aggregationThread([&]() { - core::PlanNodeId aggrNodeId; - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(aggregationQueryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults( - "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); - ASSERT_GT(planStats.customStats["memoryReclaimWallNanos"].sum, 0); - ASSERT_GT(planStats.customStats["reclaimedMemoryBytes"].sum, 0); - }); - - arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); }); - - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, exec::MemoryReclaimer::create()); - fakePool->maybeReserve(memory::memoryManager()->arbitrator()->capacity()); - - aggregationThread.join(); - + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults( + "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); + task.reset(); waitForAllTasksToBeDeleted(); } } -TEST_F(AggregationTest, reclaimFromDistinctAggregation) { - const uint64_t maxQueryCapacity = 20L << 20; +DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromDistinctAggregation) { + const int numInputs = 32; std::vector vectors = - createVectors(rowType_, maxQueryCapacity * 10, fuzzerOpts_); + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - core::PlanNodeId aggrNodeId; - std::shared_ptr queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - std::unordered_map>{}, - cache::AsyncDataCache::getInstance(), - memory::memoryManager()->addRootPool( - "test-root", maxQueryCapacity, exec::MemoryReclaimer::create())); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(queryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0"}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - task.reset(); - waitForAllTasksToBeDeleted(); + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); + + std::atomic_int inputCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { + return; + } + // Inject spill at the end of aggregation input processing. + if (++inputCount != numInputs / 2) { + return; + } + testingRunArbitration(op->pool()); + }))); + + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); + } } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregationOnNoMoreInput) { diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 9b6a285755a9..83a23fbefeea 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -49,7 +49,6 @@ class SortBufferTest : public OperatorTestBase { "0.0.0", 0, 0, - 0, executor_.get(), 5, 10, @@ -291,7 +290,6 @@ TEST_F(SortBufferTest, batchOutput) { "0.0.0", 1000, 0, - 1000, executor_.get(), 5, 10, @@ -386,7 +384,6 @@ TEST_F(SortBufferTest, spill) { "0.0.0", 1000, 0, - 1000, executor_.get(), 100, spillableReservationGrowthPct, diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index 05ff84fd915a..a6238db7611d 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -177,6 +177,7 @@ class SpillTest : public ::testing::TestWithParam, for (auto partition = 0; partition < state_->maxPartitions(); ++partition) { ASSERT_FALSE(state_->isPartitionSpilled(partition)); + ASSERT_EQ(state_->numFinishedFiles(partition), 0); // Expect an exception if partition is not set to spill. { RowVectorPtr dummyInput; @@ -224,9 +225,11 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_TRUE( state_->testingNonEmptySpilledPartitionSet().contains(partition)); + ASSERT_GE(state_->numFinishedFiles(partition), 0); // Indicates that the next additions to 'partition' are not sorted // with respect to the values added so far. state_->finishFile(partition); + ASSERT_GE(state_->numFinishedFiles(partition), 1); ASSERT_TRUE( state_->testingNonEmptySpilledPartitionSet().contains(partition)); } @@ -248,6 +251,11 @@ class SpillTest : public ::testing::TestWithParam, ASSERT_GT( stats_.rlock()->spilledBytes, numPartitions * numBatches * sizeof(int64_t)); + int numFinishedFiles{0}; + for (int partition = 0; partition < numPartitions; ++partition) { + numFinishedFiles += state_->numFinishedFiles(partition); + } + ASSERT_EQ(numFinishedFiles, expectedFiles); } // 'numDuplicates' specifies the number of duplicates generated for each @@ -341,6 +349,7 @@ class SpillTest : public ::testing::TestWithParam, for (auto partition = 0; partition < state_->maxPartitions(); ++partition) { auto spillFiles = state_->finish(partition); + ASSERT_EQ(state_->numFinishedFiles(partition), 0); auto spillPartition = SpillPartition(SpillPartitionId{0, partition}, std::move(spillFiles)); auto merge = spillPartition.createOrderedReader(pool());