diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index ff7d759aa9d58..d3a51549968ed 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -24,7 +24,6 @@ SpillConfig::SpillConfig( std::string _fileNamePrefix, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -42,7 +41,6 @@ SpillConfig::SpillConfig( _maxFileSize == 0 ? std::numeric_limits::max() : _maxFileSize), writeBufferSize(_writeBufferSize), - minSpillRunSize(_minSpillRunSize), executor(_executor), minSpillableReservationPct(_minSpillableReservationPct), spillableReservationGrowthPct(_spillableReservationGrowthPct), diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index ad5f4b3ab5b48..a51801a62a490 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -51,7 +51,6 @@ struct SpillConfig { std::string _filePath, uint64_t _maxFileSize, uint64_t _writeBufferSize, - uint64_t _minSpillRunSize, folly::Executor* _executor, int32_t _minSpillableReservationPct, int32_t _spillableReservationGrowthPct, @@ -94,15 +93,6 @@ struct SpillConfig { /// storage system for io efficiency. uint64_t writeBufferSize; - /// The min spill run size (bytes) limit used to select partitions for - /// spilling. The spiller tries to spill a previously spilled partitions if - /// its data size exceeds this limit, otherwise it spills the partition with - /// most data. If the limit is zero, then the spiller always spill a - /// previously spilled partition if it has any data. This is to avoid spill - /// from a partition with a small amount of data which might result in - /// generating too many small spilled files. - uint64_t minSpillRunSize; - /// Executor for spilling. If nullptr spilling writes on the Driver's thread. folly::Executor* executor; // Not owned. diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index a541bdbc356e7..397656d67d922 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -242,15 +242,6 @@ class QueryConfig { /// The max allowed spill file size. If it is zero, then there is no limit. static constexpr const char* kMaxSpillFileSize = "max_spill_file_size"; - /// The min spill run size limit used to select partitions for spilling. The - /// spiller tries to spill a previously spilled partitions if its data size - /// exceeds this limit, otherwise it spills the partition with most data. - /// If the limit is zero, then the spiller always spill a previously spilled - /// partition if it has any data. This is to avoid spill from a partition with - /// a small amount of data which might result in generating too many small - /// spilled files. - static constexpr const char* kMinSpillRunSize = "min_spill_run_size"; - static constexpr const char* kSpillCompressionKind = "spill_compression_codec"; @@ -616,11 +607,6 @@ class QueryConfig { return get(kMaxSpillFileSize, kDefaultMaxFileSize); } - uint64_t minSpillRunSize() const { - constexpr uint64_t kDefaultMinSpillRunSize = 256 << 20; // 256MB. - return get(kMinSpillRunSize, kDefaultMinSpillRunSize); - } - std::string spillCompressionKind() const { return get(kSpillCompressionKind, "none"); } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 849252a583840..5dac9f9d6ec67 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -150,7 +150,6 @@ std::optional DriverCtx::makeSpillConfig( spillFilePrefix, queryConfig.maxSpillFileSize(), queryConfig.spillWriteBufferSize(), - queryConfig.minSpillRunSize(), task->queryCtx()->spillExecutor(), queryConfig.minSpillableReservationPct(), queryConfig.spillableReservationGrowthPct(), diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index b1e54eed605fe..b5331094f17f0 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -70,6 +70,7 @@ GroupingSet::GroupingSet( rows_(operatorCtx->pool()), isAdaptive_(queryConfig_.hashAdaptivityEnabled()), pool_(*operatorCtx->pool()) { + // VELOX_CHECK(!aggregates_.empty()); VELOX_CHECK_NOT_NULL(nonReclaimableSection_); VELOX_CHECK(pool_.trackUsage()); for (auto& hasher : hashers_) { @@ -933,6 +934,7 @@ void GroupingSet::spill() { if (!hasSpilled()) { auto rows = table_->rows(); VELOX_DCHECK(pool_.trackUsage()); + VELOX_CHECK_EQ(numDistinctTableSpilledFiles_, 0); spiller_ = std::make_unique( Spiller::Type::kAggregateInput, rows, @@ -940,8 +942,13 @@ void GroupingSet::spill() { rows->keyTypes().size(), std::vector(), spillConfig_); + VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); } spiller_->spill(); + if (isDistinct() && numDistinctTableSpilledFiles_ == 0) { + numDistinctTableSpilledFiles_ = spiller_->state().numFinishedFiles(0); + VELOX_CHECK_GT(numDistinctTableSpilledFiles_, 0); + } if (sortedAggregations_) { sortedAggregations_->clear(); } @@ -1056,16 +1063,19 @@ bool GroupingSet::mergeNextWithoutAggregates( const RowVectorPtr& result) { VELOX_CHECK_NOT_NULL(merge_); VELOX_CHECK(isDistinct()); + VELOX_CHECK_GT(numDistinctTableSpilledFiles_, 0); // We are looping over sorted rows produced by tree-of-losers. We logically // split the stream into runs of duplicate rows. As we process each run we - // track whether one of the values comes from stream 0, in which case we - // should not produce a result from that run. Otherwise, we produce a result - // at the end of the run (when we know for sure whether the run contains a row - // from stream 0 or not). + // track whether one of the values coming from distinct streams, in which case + // we should not produce a result from that run. Otherwise, we produce a + // result at the end of the run (when we know for sure whether the run + // contains a row from the distinct streams). // - // NOTE: stream 0 contains rows which has already been output as distinct - // before we trigger spilling. + // NOTE: the distinct stream refers to the stream that contains the spilled + // distinct hash table. A distinct stream contains rows which has already + // been output as distinct before we trigger spilling. A distinct stream id is + // less than 'numDistinctTableSpilledFiles_'. bool newDistinct{true}; int32_t numOutputRows{0}; while (numOutputRows < maxOutputRows) { @@ -1074,7 +1084,7 @@ bool GroupingSet::mergeNextWithoutAggregates( if (stream == nullptr) { break; } - if (stream->id() == 0) { + if (stream->id() < numDistinctTableSpilledFiles_) { newDistinct = false; } if (next.second) { diff --git a/velox/exec/GroupingSet.h b/velox/exec/GroupingSet.h index bf489c4621afb..44da9778d209b 100644 --- a/velox/exec/GroupingSet.h +++ b/velox/exec/GroupingSet.h @@ -326,6 +326,10 @@ class GroupingSet { bool remainingMayPushdown_; std::unique_ptr spiller_; + // Sets to the number of files stores the spilled distinct hash table which + // are the files spilled on the first spill call. This only applies for + // distinct aggregation. + size_t numDistinctTableSpilledFiles_{0}; std::unique_ptr> merge_; // Container for materializing batches of output from spilling. diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index eb0bab730f285..e972ce7df20d8 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -155,6 +155,14 @@ void SpillState::finishFile(uint32_t partition) { writer->finishFile(); } +size_t SpillState::numFinishedFiles(uint32_t partition) const { + auto* writer = partitionWriter(partition); + if (writer == nullptr) { + return 0; + } + return writer->numFinishedFiles(); +} + SpillFiles SpillState::finish(uint32_t partition) { auto* writer = partitionWriter(partition); if (writer == nullptr) { diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index d306a39a78cb8..6f3614927186a 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -380,6 +380,9 @@ class SpillState { /// far. void finishFile(uint32_t partition); + /// Returns the current number of finished files. + size_t numFinishedFiles(uint32_t partition) const; + /// Returns the spill file objects from a given 'partition'. The function /// returns an empty list if either the partition has not been spilled or has /// no spilled data. diff --git a/velox/exec/SpillFile.cpp b/velox/exec/SpillFile.cpp index 240da4a9c952b..46ae2cc9b44f6 100644 --- a/velox/exec/SpillFile.cpp +++ b/velox/exec/SpillFile.cpp @@ -137,6 +137,10 @@ void SpillWriter::closeFile() { currentFile_.reset(); } +size_t SpillWriter::numFinishedFiles() const { + return finishedFiles_.size(); +} + uint64_t SpillWriter::flush() { if (batch_ == nullptr) { return 0; diff --git a/velox/exec/SpillFile.h b/velox/exec/SpillFile.h index bb7a54299cd0d..f05e8f076dc46 100644 --- a/velox/exec/SpillFile.h +++ b/velox/exec/SpillFile.h @@ -135,11 +135,15 @@ class SpillWriter { /// start a new one. void finishFile(); + /// Returns the current number of files that have been written and closed. + size_t numFinishedFiles() const; + /// Finishes this file writer and returns the written spill files info. /// /// NOTE: we don't allow write to a spill writer after t SpillFiles finish(); + std::vector testingSpilledFilePaths() const; std::vector testingSpilledFileIds() const; diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.cpp b/velox/exec/fuzzer/AggregationFuzzerBase.cpp index 82866a680de65..bf43a5250d051 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.cpp +++ b/velox/exec/fuzzer/AggregationFuzzerBase.cpp @@ -87,6 +87,10 @@ DEFINE_bool( namespace facebook::velox::exec::test { +int32_t AggregationFuzzerBase::randInt(int32_t min, int32_t max) { + return boost::random::uniform_int_distribution(min, max)(rng_); +} + bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const { // Date / IntervalDayTime/ Unknown are not currently supported by DWRF. if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) { @@ -403,8 +407,12 @@ velox::test::ResultOrError AggregationFuzzerBase::execute( spillDirectory = exec::test::TempDirectoryPath::create(); builder.spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, "true") - .config(core::QueryConfig::kAggregationSpillEnabled, "true"); - spillPct = 100; + .config(core::QueryConfig::kAggregationSpillEnabled, "true") + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(randInt(32, 1L << 30))); + // Randomized the spill injection with a percentage less than 100. + spillPct = 20; } if (abandonPartial) { diff --git a/velox/exec/fuzzer/AggregationFuzzerBase.h b/velox/exec/fuzzer/AggregationFuzzerBase.h index fec35c5e2da76..533c189ffb3f8 100644 --- a/velox/exec/fuzzer/AggregationFuzzerBase.h +++ b/velox/exec/fuzzer/AggregationFuzzerBase.h @@ -141,6 +141,8 @@ class AggregationFuzzerBase { AggregationFuzzerBase::ReferenceQueryErrorCode errorCode); }; + int32_t randInt(int32_t min, int32_t max); + bool addSignature( const std::string& name, const FunctionSignaturePtr& signature); diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index 107cc59099dbc..0947f5cbabeed 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -3186,112 +3186,96 @@ TEST_F(AggregationTest, maxSpillBytes) { } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregation) { - std::vector vectors = createVectors(8, rowType_, fuzzerOpts_); + const int numInputs = 8; + std::vector vectors = + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - std::vector sameQueries = {false, true}; - for (bool sameQuery : sameQueries) { - SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - auto fakeQueryCtx = std::make_shared(executor_.get()); - std::shared_ptr aggregationQueryCtx; - if (sameQuery) { - aggregationQueryCtx = fakeQueryCtx; - } else { - aggregationQueryCtx = std::make_shared(executor_.get()); - } + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); - folly::EventCount arbitrationWait; - std::atomic_bool arbitrationWaitFlag{true}; - folly::EventCount taskPauseWait; - std::atomic_bool taskPauseWaitFlag{true}; - - std::atomic_int numInputs{0}; + std::atomic_int inputCount{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", - std::function(([&](Operator* op) { - if (op->operatorType() != "Aggregation") { + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { return; } - if (++numInputs != 5) { + // Inject spill in the middle of aggregation input processing. + if (++inputCount != numInputs / 2) { return; } - arbitrationWaitFlag = false; - arbitrationWait.notifyAll(); - - // Wait for task pause to be triggered. - taskPauseWait.await([&] { return !taskPauseWaitFlag.load(); }); + testingRunArbitration(op->pool()); }))); - SCOPED_TESTVALUE_SET( - "facebook::velox::exec::Task::requestPauseLocked", - std::function(([&](Task* /*unused*/) { - taskPauseWaitFlag = false; - taskPauseWait.notifyAll(); - }))); - - std::thread aggregationThread([&]() { - core::PlanNodeId aggrNodeId; - auto task = - AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(aggregationQueryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults( - "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); - }); - - arbitrationWait.await([&] { return !arbitrationWaitFlag.load(); }); - - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, exec::MemoryReclaimer::create()); - fakePool->maybeReserve(memory::memoryManager()->arbitrator()->capacity()); - - aggregationThread.join(); - + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = + AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0", "c1"}, {"array_agg(c2)"}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults( + "SELECT c0, c1, array_agg(c2) FROM tmp GROUP BY c0, c1"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + ASSERT_GT(planStats.customStats["memoryArbitrationWallNanos"].sum, 0); + task.reset(); waitForAllTasksToBeDeleted(); } } -TEST_F(AggregationTest, reclaimFromDistinctAggregation) { - const uint64_t maxQueryCapacity = 20L << 20; +DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromDistinctAggregation) { + const int numInputs = 32; std::vector vectors = - createVectors(rowType_, maxQueryCapacity * 10, fuzzerOpts_); + createVectors(numInputs, rowType_, fuzzerOpts_); createDuckDbTable(vectors); - const auto spillDirectory = exec::test::TempDirectoryPath::create(); - core::PlanNodeId aggrNodeId; - std::shared_ptr queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - std::unordered_map>{}, - cache::AsyncDataCache::getInstance(), - memory::memoryManager()->addRootPool( - "test-root", maxQueryCapacity, exec::MemoryReclaimer::create())); - auto task = AssertQueryBuilder(duckDbQueryRunner_) - .spillDirectory(spillDirectory->path) - .config(core::QueryConfig::kSpillEnabled, true) - .config(core::QueryConfig::kAggregationSpillEnabled, true) - .queryCtx(queryCtx) - .plan(PlanBuilder() - .values(vectors) - .singleAggregation({"c0"}, {}) - .capturePlanNodeId(aggrNodeId) - .planNode()) - .assertResults("SELECT distinct c0 FROM tmp"); - auto taskStats = exec::toPlanStats(task->taskStats()); - auto& planStats = taskStats.at(aggrNodeId); - ASSERT_GT(planStats.spilledBytes, 0); - task.reset(); - waitForAllTasksToBeDeleted(); + for (const auto maxSpillRunRows : std::vector({32, 1UL << 30})) { + SCOPED_TRACE(fmt::format("maxSpillRunRows {}", maxSpillRunRows)); + + std::atomic_int inputCount{0}; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Driver::runInternal::addInput", + std::function(([&](exec::Operator* op) { + if (op->testingOperatorCtx()->operatorType() != "Aggregation") { + return; + } + // Inject spill at the end of aggregation input processing. + if (++inputCount != numInputs / 2) { + return; + } + testingRunArbitration(op->pool()); + }))); + + const auto spillDirectory = exec::test::TempDirectoryPath::create(); + core::PlanNodeId aggrNodeId; + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .spillDirectory(spillDirectory->path) + .config(core::QueryConfig::kSpillEnabled, true) + .config(core::QueryConfig::kAggregationSpillEnabled, true) + .config( + core::QueryConfig::kMaxSpillRunRows, + std::to_string(maxSpillRunRows)) + .plan(PlanBuilder() + .values(vectors) + .singleAggregation({"c0"}, {}) + .capturePlanNodeId(aggrNodeId) + .planNode()) + .assertResults("SELECT distinct c0 FROM tmp"); + auto taskStats = exec::toPlanStats(task->taskStats()); + auto& planStats = taskStats.at(aggrNodeId); + ASSERT_GT(planStats.spilledBytes, 0); + task.reset(); + waitForAllTasksToBeDeleted(); + } } DEBUG_ONLY_TEST_F(AggregationTest, reclaimFromAggregationOnNoMoreInput) { diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index f965652651acd..9a46354bf32cf 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -49,7 +49,6 @@ class SortBufferTest : public OperatorTestBase { "0.0.0", 0, 0, - 0, executor_.get(), 5, 10, @@ -291,7 +290,6 @@ TEST_F(SortBufferTest, batchOutput) { "0.0.0", 1000, 0, - 1000, executor_.get(), 5, 10, @@ -386,7 +384,6 @@ TEST_F(SortBufferTest, spill) { "0.0.0", 1000, 0, - 1000, executor_.get(), 100, spillableReservationGrowthPct,