Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count rows as a metadata-only operation #1223

Open
Tracked by #1
Visorgood opened this issue Oct 8, 2024 · 8 comments
Open
Tracked by #1

Count rows as a metadata-only operation #1223

Visorgood opened this issue Oct 8, 2024 · 8 comments
Assignees

Comments

@Visorgood
Copy link

Visorgood commented Oct 8, 2024

Feature Request / Improvement

Hello!

I'm using PyIceberg 0.7.1

I have a use-case where I need to count rows given a certain filter, and I was expecting it to be doable with PyIceberg as a metadata-only operation, given that manifest files contain counts of rows in each data file.

I figured out this code to count rows:

query = "col1 = 'val_X' AND col2 = 'val_Y' AND ..."
scan = table.scan(row_filter=query)
df = scan.to_duckdb("data")
res = df.sql("SELECT count(*) FROM data")

but this is loading the data filtered (using the query expression) into memory first, and then does the calculation of the count.
I couldn't figure out the code that would return the result without converting either to duckdb or to pyarrow dataframe first.

Is there a way to do such operation without loading data into memory - as a metadata-only operation?
If not, I believe this would be a good feature to have in PyIceberg.

I have tried Daft, which is supposed to be a fully lazily optimized query engine interface on top of PyIceberg tables, but it still seems to need to load data into memory, even when I do .limit(1).

@sungwy
Copy link
Collaborator

sungwy commented Oct 8, 2024

Hi @Visorgood - thank you for raising this issue.

As you mentioned, scan.to_duckdb unfortunately loads the data that matches the filter as an arrow table first, and then loads that data through a duckdb connection. So it eagerly fetches all of the data, rather than lazily evaluating it.

Is there a way to do such operation without loading data into memory - as a metadata-only operation?
If not, I believe this would be a good feature to have in PyIceberg.

Woot! I'm a big fan of this idea as well, and if implemented well, I think we could extend a similar approach for other aggregations that rely on statistics like min and max. However, there are challenges in making this a purely metadata only operation:

  1. if the row_filter in the scan doesn't only use partition columns, then it is no longer a metadata only operation. A Manifest file contains entries mapping to each data file, which is created at the granularity of a partition
  2. Similar to above, if the partitions have evolved, it is also no longer a metadata only operation. For example, for a given table with columns A and B, if only column A used to be a partition column, and now it is A and B, the older data files will be at the granularity of A and the newer ones A x B. This means that the count of rows where "A = 1 and B = 2" cannot be evaluated as a metadata only operation

@kevinjqliu
Copy link
Contributor

This is a great idea! We should leverage Iceberg's robust metadata whenever possible.

As mentioned, this would be a specific optimization for querying Iceberg table under specific circumstances.

There are some prior art in this optimization. Trino has implemented it for count(*) and reading only partition columns. See trino/#19303.
Min/Max is also discussed in trino/#10974

@kevinjqliu
Copy link
Contributor

I think this is an optimization for the engine side.
I want to balance "pyiceberg, the python library for iceberg" and "pyiceberg, the engines to run queries on iceberg".

@Fokko
Copy link
Contributor

Fokko commented Oct 30, 2024

Thanks @Visorgood for reaching out here, and that's an excellent idea. We actually already do this in a project like Datahub, see: https://github.com/datahub-project/datahub/blob/0e62c699fc2e4cf2d3525e899037b8277541cfd6/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_profiler.py#L141-L162

There are some limitations that @sungwy already pointed out, such as applying a filter. There are a couple more, such as when you have positional deletes, the row-counts are not accurate anymore. You would need to apply the deletes and then count, but this requires computation. Also, the upper- and lower bounds are truncated by default when the column is a string. For DataHub this is fine, but you need to be aware of the limitations.

That said, I do think there is value in a special API to quickly get table/column statistics. I think adding this to the metadata tables is the right place. WDYT?

@tusharchou
Copy link

RCA

Hi @Visorgood,
The behavior expected here is a simple partition push-down implementation in duck db which this pr solves for-
duckdb/duckdb-iceberg#72

@Fokko please assign this too me.

@Fokko
Copy link
Contributor

Fokko commented Nov 26, 2024

@tusharchou Thanks. I was noodling on this, and instead of having a .to_arrow(), we could also have a .count() that will return the number of rows that match the predicate. This will then leverage the Iceberg metadata as much as possible, but we still might need to open files if there are positional deletes, or if we cannot get a definitive answer using the statistics that we have. We would be able to leverage a lot of the logic of the current read-path. Any thoughts so far?

tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Nov 28, 2024
…ta-only-op

add count in data scan and test in catalog sql
@tusharchou
Copy link

Hi @Fokko,
Thank you for helping. I attempted to implement .count() in DataScan. I can test for it using the SqlCatalog in catalog/test_sql however when I try to write a similar test in table/test_init I am facing fixture errors.

I agree that positional deletes are confusing to the user. Hence, this value cannot be used as a business metric, but it might help with skewness analysis or load evaluation.

I would want to add more test cases, so please suggest any pytest I can reference.

tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Dec 24, 2024
tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Dec 31, 2024
tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Dec 31, 2024
tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Dec 31, 2024
@gli-chris-hao
Copy link

We have the same use case and concerns about loading too much data into memory for counting, the way I'm doing it to use DataScan.to_arrow_batch_reader, and then count number of rows by iterating the batches, this should avoid memory issue for large datascan:

count = 0
for batch in datascan.to_arrow_batch_reader():
    count += batch.num_rows

tusharchou added a commit to tusharchou/iceberg-python that referenced this issue Jan 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants