Skip to content

Commit

Permalink
Fix multiple spill files for distinct aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Mar 24, 2024
1 parent 0cd6c0a commit 6367e41
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 129 deletions.
2 changes: 0 additions & 2 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ SpillConfig::SpillConfig(
std::string _fileNamePrefix,
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _minSpillRunSize,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand All @@ -42,7 +41,6 @@ SpillConfig::SpillConfig(
_maxFileSize == 0 ? std::numeric_limits<int64_t>::max()
: _maxFileSize),
writeBufferSize(_writeBufferSize),
minSpillRunSize(_minSpillRunSize),
executor(_executor),
minSpillableReservationPct(_minSpillableReservationPct),
spillableReservationGrowthPct(_spillableReservationGrowthPct),
Expand Down
10 changes: 0 additions & 10 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ struct SpillConfig {
std::string _filePath,
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _minSpillRunSize,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand Down Expand Up @@ -94,15 +93,6 @@ struct SpillConfig {
/// storage system for io efficiency.
uint64_t writeBufferSize;

/// The min spill run size (bytes) limit used to select partitions for
/// spilling. The spiller tries to spill a previously spilled partitions if
/// its data size exceeds this limit, otherwise it spills the partition with
/// most data. If the limit is zero, then the spiller always spill a
/// previously spilled partition if it has any data. This is to avoid spill
/// from a partition with a small amount of data which might result in
/// generating too many small spilled files.
uint64_t minSpillRunSize;

/// Executor for spilling. If nullptr spilling writes on the Driver's thread.
folly::Executor* executor; // Not owned.

Expand Down
14 changes: 0 additions & 14 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,6 @@ class QueryConfig {
/// The max allowed spill file size. If it is zero, then there is no limit.
static constexpr const char* kMaxSpillFileSize = "max_spill_file_size";

/// The min spill run size limit used to select partitions for spilling. The
/// spiller tries to spill a previously spilled partitions if its data size
/// exceeds this limit, otherwise it spills the partition with most data.
/// If the limit is zero, then the spiller always spill a previously spilled
/// partition if it has any data. This is to avoid spill from a partition with
/// a small amount of data which might result in generating too many small
/// spilled files.
static constexpr const char* kMinSpillRunSize = "min_spill_run_size";

static constexpr const char* kSpillCompressionKind =
"spill_compression_codec";

Expand Down Expand Up @@ -616,11 +607,6 @@ class QueryConfig {
return get<uint64_t>(kMaxSpillFileSize, kDefaultMaxFileSize);
}

uint64_t minSpillRunSize() const {
constexpr uint64_t kDefaultMinSpillRunSize = 256 << 20; // 256MB.
return get<uint64_t>(kMinSpillRunSize, kDefaultMinSpillRunSize);
}

std::string spillCompressionKind() const {
return get<std::string>(kSpillCompressionKind, "none");
}
Expand Down
1 change: 0 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
spillFilePrefix,
queryConfig.maxSpillFileSize(),
queryConfig.spillWriteBufferSize(),
queryConfig.minSpillRunSize(),
task->queryCtx()->spillExecutor(),
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
Expand Down
24 changes: 17 additions & 7 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ GroupingSet::GroupingSet(
rows_(operatorCtx->pool()),
isAdaptive_(queryConfig_.hashAdaptivityEnabled()),
pool_(*operatorCtx->pool()) {
// VELOX_CHECK(!aggregates_.empty());
VELOX_CHECK_NOT_NULL(nonReclaimableSection_);
VELOX_CHECK(pool_.trackUsage());
for (auto& hasher : hashers_) {
Expand Down Expand Up @@ -933,15 +934,21 @@ void GroupingSet::spill() {
if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
VELOX_CHECK_EQ(numDistinctTableSpilledFiles_, 0);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kAggregateInput,
rows,
makeSpillType(),
rows->keyTypes().size(),
std::vector<CompareFlags>(),
spillConfig_);
VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1);
}
spiller_->spill();
if (isDistinct() && numDistinctTableSpilledFiles_ == 0) {
numDistinctTableSpilledFiles_ = spiller_->state().numFinishedFiles(0);
VELOX_CHECK_GT(numDistinctTableSpilledFiles_, 0);
}
if (sortedAggregations_) {
sortedAggregations_->clear();
}
Expand Down Expand Up @@ -1056,16 +1063,19 @@ bool GroupingSet::mergeNextWithoutAggregates(
const RowVectorPtr& result) {
VELOX_CHECK_NOT_NULL(merge_);
VELOX_CHECK(isDistinct());
VELOX_CHECK_GT(numDistinctTableSpilledFiles_, 0);

// We are looping over sorted rows produced by tree-of-losers. We logically
// split the stream into runs of duplicate rows. As we process each run we
// track whether one of the values comes from stream 0, in which case we
// should not produce a result from that run. Otherwise, we produce a result
// at the end of the run (when we know for sure whether the run contains a row
// from stream 0 or not).
// track whether one of the values coming from distinct streams, in which case
// we should not produce a result from that run. Otherwise, we produce a
// result at the end of the run (when we know for sure whether the run
// contains a row from the distinct streams).
//
// NOTE: stream 0 contains rows which has already been output as distinct
// before we trigger spilling.
// NOTE: the distinct stream refers to the stream that contains the spilled
// distinct hash table. A distinct stream contains rows which has already
// been output as distinct before we trigger spilling. A distinct stream id is
// less than 'numDistinctTableSpilledFiles_'.
bool newDistinct{true};
int32_t numOutputRows{0};
while (numOutputRows < maxOutputRows) {
Expand All @@ -1074,7 +1084,7 @@ bool GroupingSet::mergeNextWithoutAggregates(
if (stream == nullptr) {
break;
}
if (stream->id() == 0) {
if (stream->id() < numDistinctTableSpilledFiles_) {
newDistinct = false;
}
if (next.second) {
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ class GroupingSet {
bool remainingMayPushdown_;

std::unique_ptr<Spiller> spiller_;
// Sets to the number of files stores the spilled distinct hash table which
// are the files spilled on the first spill call. This only applies for
// distinct aggregation.
size_t numDistinctTableSpilledFiles_{0};
std::unique_ptr<TreeOfLosers<SpillMergeStream>> merge_;

// Container for materializing batches of output from spilling.
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ void SpillState::finishFile(uint32_t partition) {
writer->finishFile();
}

size_t SpillState::numFinishedFiles(uint32_t partition) const {
auto* writer = partitionWriter(partition);
if (writer == nullptr) {
return 0;
}
return writer->numFinishedFiles();
}

SpillFiles SpillState::finish(uint32_t partition) {
auto* writer = partitionWriter(partition);
if (writer == nullptr) {
Expand Down
3 changes: 3 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,9 @@ class SpillState {
/// far.
void finishFile(uint32_t partition);

/// Returns the current number of finished files.
size_t numFinishedFiles(uint32_t partition) const;

/// Returns the spill file objects from a given 'partition'. The function
/// returns an empty list if either the partition has not been spilled or has
/// no spilled data.
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/SpillFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ void SpillWriter::closeFile() {
currentFile_.reset();
}

size_t SpillWriter::numFinishedFiles() const {
return finishedFiles_.size();
}

uint64_t SpillWriter::flush() {
if (batch_ == nullptr) {
return 0;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/SpillFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ class SpillWriter {
/// start a new one.
void finishFile();

/// Returns the current number of files that have been written and closed.
size_t numFinishedFiles() const;

/// Finishes this file writer and returns the written spill files info.
///
/// NOTE: we don't allow write to a spill writer after t
SpillFiles finish();


std::vector<std::string> testingSpilledFilePaths() const;

std::vector<uint32_t> testingSpilledFileIds() const;
Expand Down
12 changes: 10 additions & 2 deletions velox/exec/fuzzer/AggregationFuzzerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ DEFINE_bool(

namespace facebook::velox::exec::test {

int32_t AggregationFuzzerBase::randInt(int32_t min, int32_t max) {
return boost::random::uniform_int_distribution<int32_t>(min, max)(rng_);
}

bool AggregationFuzzerBase::isSupportedType(const TypePtr& type) const {
// Date / IntervalDayTime/ Unknown are not currently supported by DWRF.
if (type->isDate() || type->isIntervalDayTime() || type->isUnKnown()) {
Expand Down Expand Up @@ -403,8 +407,12 @@ velox::test::ResultOrError AggregationFuzzerBase::execute(
spillDirectory = exec::test::TempDirectoryPath::create();
builder.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, "true")
.config(core::QueryConfig::kAggregationSpillEnabled, "true");
spillPct = 100;
.config(core::QueryConfig::kAggregationSpillEnabled, "true")
.config(
core::QueryConfig::kMaxSpillRunRows,
std::to_string(randInt(32, 1L << 30)));
// Randomized the spill injection with a percentage less than 100.
spillPct = 20;
}

if (abandonPartial) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/fuzzer/AggregationFuzzerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class AggregationFuzzerBase {
AggregationFuzzerBase::ReferenceQueryErrorCode errorCode);
};

int32_t randInt(int32_t min, int32_t max);

bool addSignature(
const std::string& name,
const FunctionSignaturePtr& signature);
Expand Down
Loading

0 comments on commit 6367e41

Please sign in to comment.