Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache Parquet Support #903

Closed
bmcdonald3 opened this issue Aug 26, 2021 · 11 comments · Fixed by #967 or #1028
Closed

Apache Parquet Support #903

bmcdonald3 opened this issue Aug 26, 2021 · 11 comments · Fixed by #967 or #1028

Comments

@bmcdonald3
Copy link
Contributor

bmcdonald3 commented Aug 26, 2021

As I am starting to work on implementing reading in of Parquet files to Chapel arrays, I thought I would summarize what I have been finding here to get feedback and make sure I am going down the right path.

Summary of Parquet I/O using GLib Interface

  • Parquet files store columns of data, each column being of a single data type (although, different columns in the same file could be different data types)
  • Columns are stored in "chunk arrays", which are not guaranteed to be contiguous in memory (which, among other reasons, make a simple pointer swap on the buffer difficult)
  • Reading in Parquet columns (using the GLib interface) requires that the columns be read into Arrow arrays first, before being copied to Chapel arrays

Interacting with Parquet files

From what I have gathered, the industry norm for interacting with Parquet files looks something like the following:

  1. Loading in the "table" (that is, all of the columns in the file) into memory
  2. Reading in only a specific column on an as-needed basis to interact with
  3. Keeping the table in memory for easy-access to other columns when needed

Chapel Implementation

First proposal: reading specific columns from a file

Based on this information, I have been taking the approach of allowing the reading of specific columns from the Parquet file, which looks something like this:

var chplIntArr = getIntColumn(path="./data/file.parquet", col=0);
var chplStrArr = getStringColumn(path="./data/file.parquet", col=1);

Second proposal: reading entire files at a time

// tuple size matches the number of columns in the parquet file
(chplIntArr, chplStrArr) = readParquetFile(path="./data/file.parquet", numCols=2);

edit: adding this third proposal

Third proposal: reading columns from in-memory table (not file)

When Arrow files are read, they are stored in tables and since the implementation for how this is done is largely out of our control (and based on the info in the Interacting with Parquet Files section above), this seems to be the most performant method for large files. The table is read in once and then passed to the various methods, with the data already in memory.

var table = readParquetFile(path="./data/file.parquet");
var chplIntArr = getIntColumn(table=table, col=0);
var chplStrArr = getStringColumn(table=table, col=1);

Remaining questions

  1. Are you expecting multiple columns per file or just a single column per file?
  2. Would it be valuable to allow the reading in of Parquet metadata/schema information to Chapel?
  3. Is the entire file approach or the column-by-column approach preferred?
  4. Does the information here align with your understanding?/Do you have any suggestions or alternate interface proposals?
  5. Are the types of the columns known in advance or would it be valuable to have auto-type detection? (e.g, just getColumn() rather than getIntColumn()

Lastly, any sample files that match what the expected size and format of the Parquet files that you intend to work with would be greatly appreciated.

@bmcdonald3
Copy link
Contributor Author

Also, if anyone is interested in seeing the work that is being done, here is the branch that I have been working on: https://github.com/bmcdonald3/chapel/tree/parquet

@hokiegeek2
Copy link
Contributor

@ben-albrecht Awesome! Thanks for the update and really glad that you'e bringing in this across the finish line. Just my two cents regarding your questions

  1. I can definite envisage use cases for both (multi and single), especially since I've had and currently have use cases for both. 💯
  2. Yes, great idea, IMHO
  3. Ideally both, again 'cause I can envisage use cases for both, but the individual column one to me would be the most important one to focus on if you need to choose for v1.
  4. All three proposals look great. The in-memory one would be really awesome as we could then do distributed, zero-copy data sharing between multiple languages such as Chapel, Python, and/or Scala.
  5. Inference would cool, but that could wait for v2, IMHO

Again, just my opinions, will definitely defer to @mhmerrill and @reuster986

@reuster986
Copy link
Collaborator

@bmcdonald3 welcome aboard and thanks for taking on this important work! I appreciate and agree with your general overview of parquet format.

One thing to note up front is that we will almost always want to read the same columns from multiple files at once, since our data feeds come in as many files per day, all with the same schema.

To your questions:

  1. All our current data sources have multiple columns per file, and I would expect that to be the dominant case elsewhere.
  2. Yes, definitely. Schema discovery in these self-describing formats is a huge benefit.
  3. We would at least require the column-by-column approach, because in most cases we only want to read in a subset of columns in our data. If reading in the whole file is faster, we could look at also supporting that case, but I would not expect much speedup because the format is already column-major.
  4. I think proposal 1 looks most compatible with the current design of arkouda. Proposal 2 risks unnecessary I/O on unused columns. Regarding proposal 3, arkouda is already going to persist each column as a distributed chapel array in memory, so having another copy of the data in Arrow might be redundant. But the Arrow approach could be interesting for a possible future where we factor apart the algorithms and the object store in arkouda.
  5. Auto type detection is strongly preferred. The current I/O layer does not even allow the user to specify types, but expects them to be discoverable by reading the (HDF5) schema. We'd like to keep this design, if possible.

Unfortunately, I can't give you any sample files at the moment, but I can work on generating some based on an educated guess.

Thanks!

@bmcdonald3
Copy link
Contributor Author

Thanks for the feedback guys, I have been working on getting everything in good shape and will hopefully have something to show for it soon. One question, however, that hadn't occurred to me when creating this issue is: Is the ability to write to a Parquet file also valuable/important?

Is the Parquet use-case entirely going to be based on reading in Parquet files and any writing would be done to an HDF5 file and not a Parquet file? If writing to Parquet files is important, what functionality would you envision being useful for that? Thank you.

@hokiegeek2
Copy link
Contributor

@bmcdonald3 yes, writing to parquet very important! Basically however we read in, be able to write out.

@reuster986
Copy link
Collaborator

@bmcdonald3 I second @hokiegeek2 that being able to round-trip arrays to/from parquet is important. One place where this shows up is in the unit tests/CI, where we will typically generate a few random arrays in arkouda, write them to disk, and read them back in to make sure the result is the same as the original. Having that capability with parquet will be very important for writing useful tests (as well as for users to get data out).

@bmcdonald3
Copy link
Contributor Author

Here is a quick summary on the progress that has been made with Parquet so far.

Summary

Using the Arrow GLib C interface, we have been able to get Parquet I/O working in Arkouda, but it is by no means perfect just yet and there is quite a bit of remaining work. Having a discussion at next weeks Arkouda call (or another time) about what is needed on the Arkouda end to get this to a usable place would be valuable for determining where to focus our efforts.

read from parquet file

>>> ak.read_parquet(['test1.parquet', 'test2.parquet'])
array([0 1 2 3 4 5 6 7 0 1 2 3 4])

write to parquet file (on 2 locales)

>>> a = ak.read_parquet(['test1.parquet', 'test2.parquet'])
>>> a.save_parquet('write-test')
'wrote array to file'
>>> glob.glob('*.parquet')
['write-test_LOCALE0.parquet', 'write-test_LOCALE1.parquet']
>>> ak.read_parquet('write-test_LOCALE0.parquet')
array([0 1 2 3 4 5 6])

Challenges

  • Arrow GLib C interface doesn't seem to allow reading of column sizes and other information from metadata (even though I know it is stored)
    • We are experimenting with working with the C++ interface for this task specifically
  • The Arkouda HDF5 string array reading doesn't seem to have a parallel with Parquet's API, so that isn't working just yet and will likely need to be done differently than how HDF5 reads string arrays in Arkouda today
  • Getting Arrow/Parquet module into Chapel main after the release

Conclusion

In conclusion, I mainly am wondering:

  1. What would need to get done on this effort for it be usable/production ready for Arkouda?
  2. Is there any functionality beyond the basic reading/writing shown above you are hoping to see?

I am planning to give a quick demo of the code at the next Arkouda call and would appreciate any feedback there (tagging @glitch from Mike's recommendation).
Here is the branch that I am working on here if anyone is interested: https://github.com/bmcdonald3/arkouda/tree/parquet.

@hokiegeek2
Copy link
Contributor

@bmcdonald3 this is really cool, really exciting! Are you also planning on adding read-write capability to the Plasma Object Store?

@bmcdonald3
Copy link
Contributor Author

bmcdonald3 commented Sep 15, 2021

@hokiegeek2 that has not been a priority of mine as of late, where I have been just focusing on Parquet I/O in regards to Arrow, but I did spend some time looking into the Plasma Object Store prior to this work. It seemed to me there were some pretty strong limitations with it (objects are read-only, must be shared memory, etc.), so we put that on hold for the time being, but we could invest more into that effort if it would be valuable/helpful to the Arkouda team.

Is that something that would be valuable to you/others? If so, could you provide an example use-case that you would like to be able to use in Arkouda?

@hokiegeek2
Copy link
Contributor

hokiegeek2 commented Sep 16, 2021

@bmcdonald3 My thoughts are that enabling Chapel I/O with the Plasma Object Store could enable zero-copy data sharing between Chapel locales and distributed Python frameworks such as Ray, Dask, and PySpark as well as distributed Java/Scala frameworks such as Spark. The main use case would be for an Arkouda user to downselect an Arkouda array and then do a zero-copy read of that array into one of these other tools to leverage the Python ML/DL ecosystem (Ray, Dask, PySpark), Spark to enable integration with Spark tools and frameworks, and basically any other distributed framework that utilizes the Plasma Object Store for in-memory data sharing.

Chapel-Plasma Object Store integration is definitely not an immediate need as this is not an Arkouda development priority. This is an idea I've had for awhile and my plan is to investigate this on my own time to see what's possible. I've just recently started looking into how to make this happen on the Ray/Dask/PySpark/Spark side. The Ray option seems to be the most accessible one as Ray distributes a plasma object store with each Ray worker.

@bmcdonald3
Copy link
Contributor Author

bmcdonald3 commented Sep 28, 2021

Here is a quick update of the performance that has been observed reading in Parquet files to Chapel arrays with various configurations collected on a linux server:

Config Read Speeds
C/GLib 0.000344 GiB/sec
C++ RG=1 0.000681 GiB/sec
C++ RG=1MB 0.70564 GiB/sec
C++ RG=256MB 0.5691 GiB/sec
C++ RG=512MB 0.6429 GiB/sec
HDF5 2.23 GiB/sec

RG in this table stands for the size of the "row group" used when writing the file. This is a property that is set by the file writer upon file creation, so it may be outside of the user's control, but how the file was written can significantly alter the performance of reading a Parquet file.

For context, from the Apache Arrow documentation: "A Row Group is a logical horizontal partitioning of the data into rows."

In regards to file writing performance, this is about 15x slower than HDF5, but no work has been done to investigate how we could improve that performance, so I didn't think it was a fair comparison to have it displayed in a table like the read speeds.

Edit: In the Arrow documentation, they recommend large row group sizes (~512MB-1GB), which will cause the columns to be all stored contiguously, which makes reading the entire column faster, which is what the above benchmark is doing (reading one entire column).
For more information about row groups and row group sizes, see: https://parquet.apache.org/documentation/latest/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants