Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement array_agg aggregate function #1300

Merged
merged 6 commits into from
Nov 16, 2021
Merged

Conversation

viirya
Copy link
Member

@viirya viirya commented Nov 15, 2021

Which issue does this PR close?

Closes #1085.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Nov 15, 2021
@houqp houqp added the enhancement New feature or request label Nov 15, 2021
Ok(vec![self.array.clone()])
}

fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you leave update_batch and merge_batch on purpose?
I think at this point it is hard to think of a much more efficient implementation (avoiding converting every item to scalars), given that we don't have columnar storage for aggregates yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, I just rely on the default implementation.


#[derive(Debug)]
pub(crate) struct ArrayAggAccumulator {
array: ScalarValue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be Vec<ScalarValue> instead.
You only need to convert it to a List in evaluate and state leading to easier update implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Updated.

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! @viirya
I have one minor comment about the accumulator state.

Ok(Field::new(
&self.name,
DataType::List(Box::new(Field::new(
"element",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching this. should be item to be consistent.

Ok(vec![Field::new(
&format_state_name(&self.name, "array_agg"),
DataType::List(Box::new(Field::new(
"item",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have a different name?

Copy link
Member

@jimexist jimexist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the pull request. can you add sql test and psql comparison integration tests?

i'd like to cover 0-/1-/2-element cases and also with a group by to see how it's handled differently

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @liukun4515 -- I agree with @jimexist that having the end to end tests (aka showing that one can invoke array_agg from SQL) is quite important. Otherwise this PR is 👍 to me

Copy link
Contributor

@capkurmagati capkurmagati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution.
I'm not sure if it's better to do in another PR, but both Trino and PostgreSQL supports order by.
Do you consider to support it?

@jimexist
Copy link
Member

Thanks for the contribution.

I'm not sure if it's better to do in another PR, but both Trino and PostgreSQL supports order by.

Do you consider to support it?

Maybe that can be put into a separate pull request? I agree that it is useful to add

@viirya
Copy link
Member Author

viirya commented Nov 15, 2021

Thanks @jimexist @alamb @capkurmagati Let me add some end to end tests. I will try to add order by in a separate PR later.

Ok(())
}

fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, after we changed the state from a ScalarValue to a Vec<ScalarValue> in early commit, we also need to update how merge works. It cannot call update directly now. Found this when adding e2e tests.

return Ok(());
};

match &states[0] {
Copy link
Contributor

@alamb alamb Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would makes sense to assert here that states.len() == 1? so we don't (silently) end up ignoring any other items that might be added (accidentally / erroniously)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, let me add it.

let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx).await?;
let sql =
"SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 2) test";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add an ORDER BY to this query - as written it may be non deterministic (the first two values returned are not guaranteed to be in any order):

Suggested change
"SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 2) test";
"SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 ORDER BY c13 LIMIT 2) test";

}

#[tokio::test]
async fn csv_query_array_agg_empty() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let mut ctx = ExecutionContext::new();
register_aggregate_csv(&mut ctx).await?;
let sql =
"SELECT array_agg(c13) FROM (SELECT * FROM aggregate_test_100 LIMIT 1) test";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here about needing an ORDER BY please

@houqp houqp requested a review from alamb November 16, 2021 06:08
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good -- thank you @viirya !

@alamb alamb merged commit 77c68b2 into apache:master Nov 16, 2021
matthewmturner pushed a commit to matthewmturner/arrow-datafusion that referenced this pull request Nov 16, 2021
* Implement array_agg aggregate function.

* Avoid copying.

* Fix clippy.

* For review comment.

* Add e2e tests.

* Add assert and order by.
@viirya
Copy link
Member Author

viirya commented Nov 16, 2021

Thanks @houqp @jimexist @Dandandan @alamb @capkurmagati !

@alamb alamb changed the title Implement array_agg aggregate function Implement array_agg aggregate function Feb 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement array_agg aggregate function
6 participants