Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TFXIO for reading parquet #52

Merged
merged 7 commits into from
Mar 15, 2022
Merged

Conversation

martinbomio
Copy link
Contributor

These changes add a TFXIO for reading parquet files.
The implementation uses a tf.schema to describe the record schema being read, but can be further generalized to use other schemas like avro schema if this is something needed.

The implementation uses beam's ReadFromParquetBatched to read the files into pyarrow tables and then uses pa.Table.to_batches to transform them into pa.RecordBatch.

@google-cla
Copy link

google-cla bot commented Mar 9, 2022

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

For more information, open the CLA check for this pull request.

self,
table: pa.Table,
batch_size: Optional[int] = None) -> List[pa.RecordBatch]:
return table.to_batches(self, max_chunksize=batch_size)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line here throws the following exception when running tests:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 843, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/Users/martinbomio/Spotify/Personal/tfx-bsl/tfx_bsl/tfxio/parquet_tfxio.py", line 87, in _TableToRecordBatch
    return table.to_batches(self, max_chunksize=batch_size)
  File "pyarrow/table.pxi", line 1701, in pyarrow.lib.Table.to_batches
TypeError: to_batches() got multiple values for keyword argument 'max_chunksize'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think self should be passed to to_batches

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 of course

from tfx_bsl.tfxio.tfxio import TFXIO


class ParquetTFXIO(TFXIO):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have another working implementation that uses ReadFromParquet instead of ReadFromParquetBatched, then batches and finally transforms the dicts into pa.RecordBatch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are experimental, so no difference in API stability, I would prefer to keep batching on Beam's side

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, I'll keep this one then

@martinbomio
Copy link
Contributor Author

@iindyk

class ParquetTFXIO(TFXIO):
"""TFXIO implementation for Parquet."""

def __init__(self,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation is not really doing any profiling, the parquet io does not provide an easy way to get the raw records, we probably want to implement a custom telemetry DoFn that iterates over each record?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not undo the Beam's batching just to get telemetry or implement custom telemetry for tables, let's just add telemetry for resulting recordbatches in BeamSource like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in bcac68d

Copy link
Collaborator

@iindyk iindyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Martin!

from tfx_bsl.tfxio.tfxio import TFXIO


class ParquetTFXIO(TFXIO):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are experimental, so no difference in API stability, I would prefer to keep batching on Beam's side

self,
table: pa.Table,
batch_size: Optional[int] = None) -> List[pa.RecordBatch]:
return table.to_batches(self, max_chunksize=batch_size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think self should be passed to to_batches

class ParquetTFXIO(TFXIO):
"""TFXIO implementation for Parquet."""

def __init__(self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not undo the Beam's batching just to get telemetry or implement custom telemetry for tables, let's just add telemetry for resulting recordbatches in BeamSource like this

@martinbomio martinbomio marked this pull request as ready for review March 11, 2022 02:34
@martinbomio
Copy link
Contributor Author

@iindyk finished the implementation with your suggestion. I believe this should be ready for review

tfx_bsl/tfxio/parquet_tfxio.py Outdated Show resolved Hide resolved
schema should contain exactly the same features as column_names.
validate: Boolean flag to verify that the files exist during the pipeline
creation time.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add telemetry_descriptors doc section

tfx_bsl/tfxio/parquet_tfxio.py Show resolved Hide resolved
file_pattern: Text,
column_names: List[Text],
min_bundle_size: int = 0,
schema: Optional[schema_pb2.Schema] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can parquet store other data types apart from dense, e.g. varying length features?
In other words, would it be possible to infer schema automatically if not provided?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it can be inferred. If you look at the tests, there's one without specifying a schema. The missing part if inferring the schema. Do you think that is something we want to do? The inference will need to happen by reading one of the parquet files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iindyk done in 217cb9c

tfx_bsl/tfxio/parquet_tfxio.py Outdated Show resolved Hide resolved
tfx_bsl/tfxio/parquet_tfxio.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@iindyk iindyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Martin! let me import and submit this internally

class ParquetTFXIO(TFXIO):
"""TFXIO implementation for Parquet."""

def __init__(self,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, one more small usability thing: can we make all args after file_pattern and column_names key-word only, i.e. def __init__(self, file_pattern:..., column_names:..., *, min_bundle_size:...,...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done it ae1b00f.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also note that other tfxio do not use this pattern

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! yeah, that's unfortunate, but I'd like to have it moving forward: I think it helps with preventing some errors

@jay90099 jay90099 merged commit d713de3 into tensorflow:master Mar 15, 2022
@martinbomio martinbomio deleted the parquet-tfxio branch March 15, 2022 22:51
@martinbomio
Copy link
Contributor Author

@iindyk @jay90099 what would be the process of cutting a release so I can add this new IO to the tfxio factory in tfx?

@martinbomio
Copy link
Contributor Author

Hey @iindyk @jay90099 I saw that the changes introduced in this PR were reverted in 017c17b. Any reason for this?

@iindyk
Copy link
Collaborator

iindyk commented Mar 18, 2022

re reverted: this was not intentional
@jay90099 that was my change that was not removing these files internally, but copybara added deletion of these files to the resulting PR, how do we fix this?

@iindyk
Copy link
Collaborator

iindyk commented Mar 18, 2022

it's an issue with our internal tools, could you please reopen the PR

@martinbomio martinbomio restored the parquet-tfxio branch March 19, 2022 00:46
@martinbomio martinbomio mentioned this pull request Mar 19, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants