Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allocate the spill source and rows memory in pool #11663

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions velox/exec/OperatorUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ void scalarGatherCopy(
BaseVector* target,
vector_size_t targetIndex,
vector_size_t count,
const std::vector<const RowVector*>& sources,
const std::vector<vector_size_t>& sourceIndices,
const std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>&
sources,
const std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>&
sourceIndices,
column_index_t sourceColumnChannel) {
VELOX_DCHECK(target->isFlatEncoding());

Expand Down Expand Up @@ -81,8 +83,10 @@ void complexGatherCopy(
BaseVector* target,
vector_size_t targetIndex,
vector_size_t count,
const std::vector<const RowVector*>& sources,
const std::vector<vector_size_t>& sourceIndices,
const std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>&
sources,
const std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>&
sourceIndices,
column_index_t sourceChannel) {
for (int i = 0; i < count; ++i) {
target->copy(
Expand All @@ -97,8 +101,10 @@ void gatherCopy(
BaseVector* target,
vector_size_t targetIndex,
vector_size_t count,
const std::vector<const RowVector*>& sources,
const std::vector<vector_size_t>& sourceIndices,
const std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>&
sources,
const std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>&
sourceIndices,
column_index_t sourceChannel) {
if (target->isScalar()) {
VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
Expand Down Expand Up @@ -326,8 +332,10 @@ void gatherCopy(
RowVector* target,
vector_size_t targetIndex,
vector_size_t count,
const std::vector<const RowVector*>& sources,
const std::vector<vector_size_t>& sourceIndices,
const std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>&
sources,
const std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>&
sourceIndices,
const std::vector<IdentityProjection>& columnMap) {
VELOX_DCHECK_GE(count, 0);
if (FOLLY_UNLIKELY(count <= 0)) {
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/OperatorUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ void gatherCopy(
RowVector* target,
vector_size_t targetIndex,
vector_size_t count,
const std::vector<const RowVector*>& sources,
const std::vector<vector_size_t>& sourceIndices,
const std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>&
sources,
const std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>&
sourceIndices,
const std::vector<IdentityProjection>& columnMap = {});

/// Generates the system-wide unique disk spill file path for an operator. It
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ SortBuffer::SortBuffer(
prefixSortConfig_(prefixSortConfig),
spillConfig_(spillConfig),
spillStats_(spillStats),
sortedRows_(0, memory::StlAllocator<char*>(*pool)) {
sortedRows_(0, memory::StlAllocator<char*>(*pool)),
spillSources_(0, memory::StlAllocator<const RowVector*>(*pool_)),
spillSourceRows_(0, memory::StlAllocator<vector_size_t>(*pool_)) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
VELOX_CHECK_GT(sortCompareFlags_.size(), 0);
VELOX_CHECK_EQ(sortColumnIndices.size(), sortCompareFlags_.size());
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ class SortBuffer {
// Used to merge the sorted runs from in-memory rows and spilled rows on disk.
std::unique_ptr<TreeOfLosers<SpillMergeStream>> spillMerger_;
// Records the source rows to copy to 'output_' in order.
std::vector<const RowVector*> spillSources_;
std::vector<vector_size_t> spillSourceRows_;
std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>
spillSources_;
std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>
spillSourceRows_;

// Reusable output vector.
RowVectorPtr output_;
Expand Down
12 changes: 8 additions & 4 deletions velox/exec/tests/OperatorUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ class OperatorUtilsTest : public OperatorTestBase {
}
}

std::vector<const RowVector*> sourcesVectors(kNumRows);
std::vector<vector_size_t> sourceIndices(kNumRows);
std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>
sourcesVectors(kNumRows, *pool_);
std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>
sourceIndices(kNumRows, *pool_);
for (int iter = 0; iter < 5; ++iter) {
const int count =
folly::Random::oneIn(10) ? 0 : folly::Random::rand32() % kNumRows;
Expand Down Expand Up @@ -259,8 +261,10 @@ TEST_F(OperatorUtilsTest, gatherCopy) {
makeFlatVector<int64_t>(kNumRows, [](auto row) { return row % 7; }),
BaseVector::createNullConstant(UNKNOWN(), kNumRows, pool()),
});
std::vector<const RowVector*> sourceVectors(kNumRows);
std::vector<vector_size_t> sourceIndices(kNumRows);
std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>
sourceVectors(kNumRows, *pool_);
std::vector<vector_size_t, memory::StlAllocator<vector_size_t>> sourceIndices(
kNumRows, *pool_);
for (int i = 0; i < kNumRows; ++i) {
sourceVectors[i] = sourceVector.get();
sourceIndices[i] = kNumRows - i - 1;
Expand Down
6 changes: 4 additions & 2 deletions velox/exec/tests/SpillerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,10 @@ class SpillerTest : public exec::test::RowContainerTestBase {
int i = 0;
int outputRow = 0;
int outputSize = 0;
std::vector<const RowVector*> sourceVectors(outputBatchSize);
std::vector<vector_size_t> sourceIndices(outputBatchSize);
std::vector<const RowVector*, memory::StlAllocator<const RowVector*>>
sourceVectors(outputBatchSize, *pool_);
std::vector<vector_size_t, memory::StlAllocator<vector_size_t>>
sourceIndices(outputBatchSize, *pool_);
for (;;) {
auto stream = merge->next();
if (stream == nullptr) {
Expand Down
Loading