Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43986: [C++][Acero] Some code cleanup to Grouper #43988

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/groupby_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -369,13 +369,14 @@ Status GroupByNode::InputReceived(ExecNode* input, ExecBatch batch) {
DCHECK_EQ(input, inputs_[0]);

auto handler = [this](const ExecBatch& full_batch, const Segment& segment) {
if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
if (!segment.extends && segment.offset == 0)
RETURN_NOT_OK(OutputResult(/*is_last=*/false));
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
auto batch = ExecSpan(exec_batch);
RETURN_NOT_OK(Consume(batch));
RETURN_NOT_OK(
ExtractSegmenterValues(&segmenter_values_, exec_batch, segment_key_field_ids_));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false));
return Status::OK();
};
ARROW_RETURN_NOT_OK(
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/acero/scalar_aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {
// (1) The segment is starting of a new segment group and points to
// the beginning of the batch, then it means no data in the batch belongs
// to the current segment group. We can output and reset kernel states.
if (!segment.extends && segment.offset == 0) RETURN_NOT_OK(OutputResult(false));
if (!segment.extends && segment.offset == 0)
RETURN_NOT_OK(OutputResult(/*is_last=*/false));

// We add segment to the current segment group aggregation
auto exec_batch = full_batch.Slice(segment.offset, segment.length);
Expand All @@ -244,7 +245,7 @@ Status ScalarAggregateNode::InputReceived(ExecNode* input, ExecBatch batch) {

// If the segment closes the current segment group, we can output segment group
// aggregation.
if (!segment.is_open) RETURN_NOT_OK(OutputResult(false));
if (!segment.is_open) RETURN_NOT_OK(OutputResult(/*is_last=*/false));

return Status::OK();
};
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2533,11 +2533,11 @@ struct GroupedCountDistinctImpl : public GroupedAggregator {
struct GroupedDistinctImpl : public GroupedCountDistinctImpl {
Result<Datum> Finalize() override {
ARROW_ASSIGN_OR_RAISE(auto uniques, grouper_->GetUniques());
ARROW_ASSIGN_OR_RAISE(auto groupings, grouper_->MakeGroupings(
*uniques[1].array_as<UInt32Array>(),
static_cast<uint32_t>(num_groups_), ctx_));
ARROW_ASSIGN_OR_RAISE(
auto list, grouper_->ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_));
auto groupings, Grouper::MakeGroupings(*uniques[1].array_as<UInt32Array>(),
static_cast<uint32_t>(num_groups_), ctx_));
ARROW_ASSIGN_OR_RAISE(
auto list, Grouper::ApplyGroupings(*groupings, *uniques[0].make_array(), ctx_));
const auto& values = list->values();
DCHECK_EQ(values->offset(), 0);
auto* offsets = list->value_offsets()->mutable_data_as<int32_t>();
Expand Down
32 changes: 0 additions & 32 deletions cpp/src/arrow/compute/row/grouper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,38 +332,6 @@ Result<std::unique_ptr<RowSegmenter>> RowSegmenter::Make(

namespace {

struct GrouperNoKeysImpl : Grouper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is never used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, not used anywhere.

Result<std::shared_ptr<Array>> MakeConstantGroupIdArray(int64_t length,
group_id_t value) {
std::unique_ptr<ArrayBuilder> a_builder;
RETURN_NOT_OK(MakeBuilder(default_memory_pool(), g_group_id_type, &a_builder));
using GroupIdBuilder = typename TypeTraits<GroupIdType>::BuilderType;
auto builder = checked_cast<GroupIdBuilder*>(a_builder.get());
if (length != 0) {
RETURN_NOT_OK(builder->Resize(length));
}
for (int64_t i = 0; i < length; i++) {
builder->UnsafeAppend(value);
}
std::shared_ptr<Array> array;
RETURN_NOT_OK(builder->Finish(&array));
return array;
}
Status Reset() override { return Status::OK(); }
Result<Datum> Consume(const ExecSpan& batch, int64_t offset, int64_t length) override {
ARROW_ASSIGN_OR_RAISE(auto array, MakeConstantGroupIdArray(length, 0));
return Datum(array);
}
Result<ExecBatch> GetUniques() override {
auto data = ArrayData::Make(uint32(), 1, 0);
auto values = data->GetMutableValues<uint32_t>(0);
values[0] = 0;
ExecBatch out({Datum(data)}, 1);
return out;
}
uint32_t num_groups() const override { return 1; }
};

struct GrouperImpl : public Grouper {
static Result<std::unique_ptr<GrouperImpl>> Make(
const std::vector<TypeHolder>& key_types, ExecContext* ctx) {
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/compute/row/grouper.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ inline bool operator!=(const Segment& segment1, const Segment& segment2) {

/// \brief a helper class to divide a batch into segments of equal values
///
/// For example, given a batch with two rows:
/// For example, given a batch with two columns specifed as segment keys:
///
/// A A
/// A A
/// A B
/// A B
/// A A
/// A A [other columns]...
/// A A ...
/// A B ...
/// A B ...
/// A A ...
///
/// Then the batch could be divided into 3 segments. The first would be rows 0 & 1,
/// the second would be rows 2 & 3, and the third would be row 4.
Expand Down
Loading