Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix table function execution without partitioning (v2) #21558

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

findepi
Copy link
Member

@findepi findepi commented Apr 15, 2024

Previously, when table function did not declare partitioning it would run single-threaded and first buffer all data in memory, like a one big WINDOW. After the change, the local execution processes input pages in a streaming fashion.

Fixes #20398

Alternative to #21378, with different TableFunctionDataProcessor lifecycle. The implementation creates one TableFunctionDataProcessor per operator for streaming processing.

@findepi findepi requested review from martint and kasiafi April 15, 2024 15:47
@cla-bot cla-bot bot added the cla-signed label Apr 15, 2024
@findepi
Copy link
Member Author

findepi commented Apr 15, 2024

thanks @hovaesco @aalbu @mdesmet for pointing out the lifecycle aspect

@findepi findepi force-pushed the findepi/exclude-columns-streaming-with-lifecycle branch from 5b36e66 to a31bbfe Compare April 15, 2024 15:48
@findepi
Copy link
Member Author

findepi commented Apr 16, 2024

failure is related.

@hovaesco
Copy link
Member

I've tested the fix and it helps with #20398 however when writing the data using table function, it still does not distribute the data evenly and there is no way to control the number of files being written.

@findepi
Copy link
Member Author

findepi commented Apr 23, 2024

it still does not distribute the data evenly

between the nodes?

there is no "writer scaling" equivalent for table functions, right?

there is no way to control the number of files being written.

do you mean control number of TF data processor instances?

@hovaesco
Copy link
Member

between the nodes?

between files, when using setSemantics() data is distributed evenly.

there is no "writer scaling" equivalent for table functions, right?

correct

do you mean control number of TF data processor instances?

control the number of files, for setSemantics() it could be done using hive.target-max-file-size property

@findepi findepi force-pushed the findepi/exclude-columns-streaming-with-lifecycle branch from a31bbfe to b358662 Compare April 23, 2024 11:58
@findepi
Copy link
Member Author

findepi commented Apr 23, 2024

( rebased to resolve conflicts, no other changes )

@mdesmet
Copy link
Contributor

mdesmet commented Apr 23, 2024

do you mean control number of TF data processor instances?

This is indeed where the writers are created. But I guess this depends on the shape of the plan right before going into the table function operator?

@findepi
Copy link
Member Author

findepi commented Apr 25, 2024

Planned changes moved out to #21710

@findepi findepi force-pushed the findepi/exclude-columns-streaming-with-lifecycle branch 3 times, most recently from e1bf212 to d762983 Compare April 25, 2024 12:14
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label May 16, 2024
@findepi findepi added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels May 17, 2024
@findepi findepi force-pushed the findepi/exclude-columns-streaming-with-lifecycle branch from d762983 to fa9fdf0 Compare May 23, 2024 14:23
findepi added 3 commits May 23, 2024 16:52
During migration from TestNG to JUnit `@TestInstance(PER_CLASS)`
annotation was added, but it implies single-threaded execution. Restore
previous parallelism: either add `@Execution(CONCURRENT)` or inherit it
from base class.
Previously, when table function did not declare partitioning, it would
be globally distributed, but on a worker node it would run
single-threaded and first buffer all data in memory, like a one big
WINDOW. After the change, the local execution processes input pages in a
streaming fashion.

This commit also fixes property derivations for a case where table
function is partitioned on empty list of symbols (global grouping).
@findepi findepi force-pushed the findepi/exclude-columns-streaming-with-lifecycle branch from fa9fdf0 to a892820 Compare May 24, 2024 20:14
@github-actions github-actions bot added iceberg Iceberg connector delta-lake Delta Lake connector labels May 24, 2024
@github-actions github-actions bot added the hive Hive connector label May 24, 2024
pagesIndex,
0,
pagesIndex.getPositionCount(),
new TableFunctionDataProcessor()
Copy link
Member

@tbaeg tbaeg May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to wrap in a new anonymous class?

Locally, I directly passed the tableFunction which also resolved the TestJsonTable failures for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed delta-lake Delta Lake connector hive Hive connector iceberg Iceberg connector stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed.
Development

Successfully merging this pull request may close these issues.

Consuming large results with table functions causes "Query exceeded per-node memory limit" error
4 participants