Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider Using object_store as IO Abstraction #172

Open
tustvold opened this issue Jan 25, 2024 · 36 comments
Open

Consider Using object_store as IO Abstraction #172

tustvold opened this issue Jan 25, 2024 · 36 comments

Comments

@tustvold
Copy link

I have debated filing this ticket for a while, but largely held off as I wasn't sure how well it would be received, especially as I am acutely aware that this crate currently makes use of OpenDAL and @Xuanwo is an active contributor to both repositories. However, I feel it is important to have these discussions, and part of my role as a maintainer of object_store is to engage with others in the community and hear about how its offering could be made more compelling.

That all being said, I think object_store provides some quite compelling functionality that might be of particular interest to this project:

  • First-party integration with arrow-rs, parquet, DataFusion and polars, including sophisticated vectored and streaming IO
  • Support for conditional writes, which would allow iceberg-rs to support multiple concurrent writers directly against object storage, without needing an external catalog
  • A flexible configuration system developed in partnership with, and used by both the polars and delta-rs communities
  • Extensive support for the various cloud provider credential sources, with extension points for users to further customise this
  • APIs that mirror that of object stores and not filesystems, which helps to understand what and how IO is being performed, and allows support for object store specific functionality like tags, partial range requests, and more...
  • Battle-tested in multiple production systems, and with a substantial and growing user-base

The major area object_store is limited, somewhat intentionally, is in the number of first-party implementations; only supporting S3-compatible stores, Google Cloud Storage, Azure Blob Storage, in-memory and local filesystems. However, the object-safe design does allow for third-party implementations, for things like HDFS.

I look forward to hearing your thoughts, but also fully understand if this is not a discussion you would like to engage with at this time.

@alamb
Copy link

alamb commented Jan 25, 2024

cc @liurenjie1024

@liurenjie1024
Copy link
Contributor

Hi, @tustvold @alamb Thanks for this proposal and write up, object_store looks great to me!

In iceberg's design, all file ios are hidden under the FileIO interface, and the backends, i.e. OpenDAL or object_store are not directly exposed to user, so I think we can integrate it without any breaking changes.

Currently OpenDAL works well for us and we are focusing on implementing more features for iceberg-rust, so it may take a while for us to evaluate object_store and integrate it into this crate.

First-party integration with arrow-rs, parquet, DataFusion and polars, including sophisticated vectored and streaming IO

I'm quite interested in this since we are about to add support for file reader/writer, which will heavily depend on arrow-rs, parquet, etc, so I think object_store is quite promising.

@liurenjie1024
Copy link
Contributor

cc @Xuanwo

@Xuanwo
Copy link
Member

Xuanwo commented Jan 26, 2024

Hi @tustvold, thank you for initiating this discussion! I will do my best to offer a multifaceted response with different hat.

Put iceberg-rust developers hat on

As @liurenjie1024 mentioned, iceberg-rust features its own FileIO interface to abstract IO operations. OpenDAL and object_store are merely implementation details with no current plans for external exposure.

It's fine to integrate with object_store, as that is precisely what we created FileIO for. However, it's important to note that we are in the initial stages of this project: currently focusing on the first release and implementing read/write capabilities.

Here are some remarks regarding the object_store feature set:

A flexible configuration system developed in partnership with, and used by both the polars and delta-rs communities

iceberg-rust is aligned with Iceberg and PyIceberg, sharing the same configuration logic; therefore, the object_store's configuration system is redundant for our purposes.

Support for conditional writes, which would allow iceberg-rs to support multiple concurrent writers directly against object storage, without needing an external catalog

While the conditional put feature offers certain advantages, it may not be as crucial for our current use cases in iceberg-rust, where integration with a catalog like Hive or REST is more common.

As an iceberg-rust developer, I am eager to unlock more potential within the project.

Put OpenDAL maintianer hat on

Firstly, opendal and object_store are not competitors. (And remember, I'm also a contributor to object_store!) Rather than discussing replacements, I'd prefer to explore how we can coexist to offer our users more choices and possibilities.

I believe opendal integrates seamlessly with object_store, which is why our community created object_store_opendal, enabling users to utilize opendal as an implementation of object_store.

Here are a few reasons why OpenDAL is beneficial for iceberg-rust.

  • OpenDAL offers native support for OSS, B2, HDFS, and WebHDFS in addition to the existing S3, GCS, and AzBlob. All have passed identical behavior test suites, simplifying integration for users without fear of unexpected breaking.
  • OpenDAL offers a comprehensive API that supports range retrieval and conditional fetching through the robust read_with() function.
  • OpenDAL enables users to freely utilize its API. For instance, they can directly use Writer without needing to understand MultipartUpload.
  • OpenDAL features powerful layers such as retry, logging, tracing, metrics, prometheus, timeout, and more that can significantly reduce the workload typically associated with managing these aspects manually.
  • OpenDAL features object_store_opendal integration, enabling seamless connection to existing object_store-based systems.

I also found some places that OpenDAL can improve (Thanks @tustvold!):

As an OpenDAL maintainer, I believe OpenDAL offers features that could be beneficial for iceberg-rust, potentially simplifying some aspects of storage management. And I will be happy to collaborate with object_store to ensure the success of iceberg-rust.

@tustvold
Copy link
Author

tustvold commented Jan 26, 2024

Thank you both for the responses.

In iceberg's design, all file ios are hidden under the FileIO interface, and the backends, i.e. OpenDAL or object_store are not directly exposed to user, so I think we can integrate it without any breaking changes.

Glad to here efforts are being made to keep the IO primitives abstracted and pluggable 👍. I would just observe that FileIO appears to mirror filesystem APIs, and that this has historically been a pain point in systems that chose this path. For example Spark has had a very hard time getting a performant S3 integration, with proper vectored IO only being added to OSS Spark very recently. By contrast the object_store APIs mirror those of the actual stores, and are designed to work well with the APIs in arrow-rs, avoiding all the complexities of prefetching heuristics and similar.

discussing replacements

I entirely agree, I guess I was more suggesting that the IO abstraction mirror object_store as this is what both the upstream crates use and expect, and what the underlying stores provide. If people then wanted additional backend support they could plug OpenDAL into this interface?

I'm quite interested in this since we are about to add support for file reader/writer

I'd be happy to help out with this, if you're open to contributions, both myself and my employer are very interested in native iceberg support for the Rust ecosystem

@alamb
Copy link

alamb commented Jan 26, 2024

Thank you all -- this is a great conversation.

I entirely agree, I guess I was more suggesting that the IO abstraction mirror object_store as this is what both the upstream crates use and expect, and what the underlying stores provide. If people then wanted additional backend support they could plug OpenDAL into this interface?

I took a look at the FileIO interface that @liurenjie1024 and @Xuanwo pointed it. Eventually they seem to provide something that implements AsyncRead and AsyncWrite

While it is true that AsyncRead and AsyncWrite's interfaces (seek, random IO, etc) can be used in such a way that would perform very poorly for remote object storage, I think if users are judicious and provide sufficients hints, and buffer the reads the performance difference will be negligible.

The "benefit" that one might get from using object_store is that its API is more opinionated and makes it very awkward to use poorly

In my opinon, the use of OpenDAL to connect to more storage systems other than object stores is pretty compelling.

Perhaps as you proceed integrating iceberg-rust with arrow-rs/parquet/datafusion we will learn more about how these various systems can be integrated and if any adjustments need to be made, either in OpenDAL or object_store or downstream in some other crates

@liurenjie1024
Copy link
Contributor

Thanks everyone for this very nice discussion.

I'd be happy to help out with this, if you're open to contributions, both myself and my employer are very interested in native iceberg support for the Rust ecosystem

Of course we are open to contributions from everyone, and that's the key spirit of open source project. Please note that this is an apache project, and everyone is welcome to contribute.

As with the FileIO interface, it's inspired by iceberg's java/python implementation. I have to admit that I don't have much experience working with object store such as s3, and I don't know much about its difference with file systems such as hdfs. I believe the whole iceberg community welcomes ideas and design as long as it's reasonable and provides benefits for performance.

@tustvold
Copy link
Author

tustvold commented Jan 26, 2024

I think if users are judicious and provide sufficients hints, and buffer the reads the performance difference will be negligible.

If primarily performing sequential IO I would tend to agree, the AsyncRead abstraction will be less efficient than a streaming request, but if pre-fetching is configured appropriately the end-to-end latency should be similar. However, it is "random" IO such as occurs when reading structured file formats like parquet, that this difference becomes more stark.

Fortunately the fix is extremely simple, adding InputFile::get_ranges that can be called by AsyncFileReader. This can then call through to vectorised IO primitives where supported.

Of course we are open to contributions from everyone
In iceberg's design, all file ios are hidden under the FileIO interface

Would you be open to a PR to allow using either OpenDAL or object_store, along with corresponding feature flags, or would you prefer to not complicate matters at this time? I think this could be achieved in a fairly unobtrusive manner.

@Fokko
Copy link
Contributor

Fokko commented Jan 26, 2024

Thanks @tustvold for raising this and please don't hesitate to open an issue or PR.

For example Spark has had a very hard time getting a performant S3 integration, with proper vectored IO only being added to OSS Spark apache/datafusion#2205 (comment).

This is why the Iceberg Java implementation ships with its own vectorized parquet reader :)

It looks to me that object_store and FileIO aim to solve the same problem. Iceberg is designed to work on object stores from the start, and not on filesystems. Similar to object_store the FileIO concept is very opinionated. Since many people are still on HDFS, this is also supported since Filesystems offer stronger guarantees than object stores. If you want to learn more about the FileIO concept, this is a good primer on the concept.

@tustvold
Copy link
Author

tustvold commented Jan 26, 2024

It looks to me that object_store and FileIO aim to solve the same problem

That's awesome, thank you for the link. That is exactly what object_store is, an opinionated abstraction that ensures workloads are not overly reliant on filesystem-specific APIs and behaviour. Really cool that the iceberg community chose to take this approach, I agree with it wholeheartedly 👍

FWIW I notice that the InputFile contract is not vectorised itself, but I guess if you have a custom parquet reader you could lift the range coalescing into it.

@liurenjie1024
Copy link
Contributor

Would you be open to a PR to allow using either OpenDAL or object_store, along with corresponding feature flags, or would you prefer to not complicate matters at this time? I think this could be achieved in a fairly unobtrusive manne

Hi, @tustvold Welcome to open pr for this.

About the timing, my suggestion is to wait for a moment. Currently this crate has finished rest catalog and serialization/deserialization of metadata, basic file based table scan planning. We are expecting to implement two things following: a parquet file writer which writes arrow record batch, and reading parquet file to arrow record batch stream. These two features depends on FileIO a lot, and would provide solid and concrete use cases for our new io interface, so that we can have better understanding and discussion about the benefits of these changes. What do you think?

@BlakeOrth
Copy link

Hi, I'm a relatively new user of the icberg-rust crate(s) and was hoping I could bring this discussion back to get some movement here. While Iceberg is a relatively new ecosystem for me, I'm reasonably well versed in both object stores as well as parquet. As such, I was hoping to be able to use the object_store crate that @tustvold surfaced here with iceberg-rust so I could have a unified storage backend for an application I'm working on. Unfortunately, since iceberg-rust relies on the FileIO struct for all IO operations I don't immediately see a way to use anything except the provided implementations, outside of reimplementing the existing parquet writer and all of its column statistic functionality.

I've read over the discussion here, as well as #356, reviewed the ethos behind FileIO from @Fokko's link, and have been looking into the various uses of FileIO throughout the code. While the easy and least disruptive answer here seems to be simply exposing FileIO as a trait instead of a struct, I question whether or not that's the best path forward. I understand the desire to retain FileIO for uniformity across the Iceberg ecosystem, however there also seems to be value in adopting the object_store trait for uniformity across the rust Arrow/Parquet ecosystem. Given the very small API surface area of FileIO (newInput(), newOutput(), delete()) I'm wondering how everyone would feel about adopting object_store as the core storage solution for iceberg-rust and implementing FileIO on top of object_store instead of OpenDAL directly. It seems like this would allow iceberg-rust to take advantage of the robust and well tested ecosystem surrounding object_store, including all of its existing integrations like OpenDAL, while retaining the option for Iceberg users to leverage the familiar FileIO if they wanted. @liurenjie1024, do you think this would satisfy your concerns you voiced in #356?

I'm not sure if @tustvold still has the time to open a PR, but I would be happy to help contribute towards this effort as well.

@Xuanwo
Copy link
Member

Xuanwo commented Nov 13, 2024

Hi, thank you, @BlakeOrth, for bringing this up.

It's part of our community's philosophy not to choose a winner. All implementations of Iceberg don't directly expose the underlying storage implementations. Instead, we provide a trait (interface) called FileIO to end users, allowing them to switch to any storage abstraction they prefer or even implement their own.

We have reached a consensus in previous discussions to expose such a trait, allowing users to choose between opendal or object_store based on their needs. Any downstream usage should be built up our FileIO to prevent leaking internal implementation details.

A similar discussion also occurred on arrow-rs, where users want to use opendal along with parquet and datafusion. I'm willing to help work on those projects too.

@alamb
Copy link

alamb commented Nov 13, 2024

@BlakeOrth I don't fully understand what you are trying to accomplish.

If the goal is to use iceberg-rust without OpenDAL, I thought it was possible to implement an adapter/ wrapper around an object_store instance that implements the FileIO trait?

Is the issue there is no community maintained adapter?

@tustvold
Copy link
Author

instance that implements the FileIO trait?

At the moment FileIO is not a trait but a struct - https://docs.rs/iceberg/latest/iceberg/io/struct.FileIO.html

I personally think it would be valuable to make this a trait so people can bring their own IO implementations, be they based on object_store or something else. This is the approach we have taken elsewhere in the ecosystem and it works well

@Fokko
Copy link
Contributor

Fokko commented Nov 13, 2024

To add some context here. As mentioned, FileIO should be pluggable. Right now, I've noticed that the FileIO is an impl, where I would expect a trait.

The goal behind FileIO is to provide an object-store native abstraction. Today we're heavily relying on OpenDAL, since that provides the most functionality out of the box. Still, there is always room for other implementations (as long they are well-maintained and compatible with the Apache license).

Another important aspect of the FileIO is that it harmonizes the configuration, so you can easily swap out FileIO, or even force a specific FileIO through properties (io-impl in Java, py-io-impl in Python or rs-io-impl in Rust?).

Edit: @tustvold and I commented at the same time. I agree that it should be a trait, feel free to ping me for a review!

@Xuanwo
Copy link
Member

Xuanwo commented Nov 13, 2024

Yes, I think it should be a trait as well. Its current form has historical reasons, and I would be happy to refine it into the shape we envisioned. If someone wants to work in this area, feel free to reach out to me through issues/PRs or on Slack.

@Xuanwo
Copy link
Member

Xuanwo commented Nov 13, 2024

By the way, we will need to present FileIO as a struct capable of parsing configurations and being used universally without concerns about lifecycle or generic parameters. The trait we develop might resemble Storage or something similar.

@tustvold
Copy link
Author

tustvold commented Nov 13, 2024

present FileIO as a struct capable of parsing configurations and being used universally without concerns about lifecycle or generic parameters

What about adding a FileIOProvider that constructs an Arc<dyn FileIO> from a given cofiguration? I think it would be really nice if a (default?) FileIOProvider could hook into DataFusion's SessionContext, ObjectStoreRegistry in particular, so that iceberg-rust could be a drop-in for people already using DataFusion

Edit: although that does naturally lead to the question of whether iceberg-rust even really needs to rebuild these abstractions when they exist upstream...

@liurenjie1024
Copy link
Contributor

I agree that we should make FileIO pluggable and allowing user to choose. But I have a concern to make FileIO trait since this forces user to use Arc<dyn FileIO> everywhere while making FileIO trait quite limited due to object safe in rust. I think it would be better to have a design doc and some example code so that we could have better discussion.

@alamb
Copy link

alamb commented Nov 13, 2024

One potential API that would not cause disruption to the current consumers would be to add a extension trait like we do with UserDefinedLogicalNode in DataFusion

The user would create a FileIO like

let my_wrapper = Arc::new(ObjectStoreWrapper::new(...))
...
// FileIO remains a struct that wraps a Storage  enum
let file_io = FileIO::from_extension(my_wrapper)

We could add a new variant of Storage like Storage::Extension

pub(crate) enum Storage {

A user would create one like

struct ObjectStoreWrapper {
...
}

/// New trait that users would have to implement
impl FileIOExtension  { 
...
}

@tustvold
Copy link
Author

tustvold commented Nov 13, 2024

while making FileIO trait quite limited due to object safe in rust

The transformation from the current abstraction to an object safe version could be done largely mechanically

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileIOProps(HashMap<String, String>);

impl FileIOProps {
    pub fn with_prop() {...}

    pub fn iter(&self) -> impl Iterator<Item=(&str, &str)> {
        self.0.iter()
    }
}

pub trait FileIOBuilder {
    fn get(&self, scheme: &str, props: &FileIOProps) -> Result<Arc<dyn FileIO>>
}

#[async_trait]
pub trait FileIO {
    async fn is_exist(&self, path: &str) -> Result<bool>;

    async fn new_input(&self, path: &str) -> Result<Box<dyn InputFile>>;

    ...
}

#[async_trait]
pub trait InputFile {
    async fn metadata() -> Result<FileMetadata>;

    fn reader(&self) -> Result<Arc<dyn FileRead>>;
}

#[async_trait]
pub trait OutputFile {
    fn writer(&self) -> Result<Box<dyn FileWrite>>;

    ...
}

#[async_trait]
pub trait OutputFile {
    async fn metadata() -> Result<FileMetadata>;

    fn writer(&self) -> Result<Box<dyn FileWrite>>;

    ...
}

This would of course be more disruptive than what @alamb proposes, but would be more extensible and avoid needing to define a secondary extension interface. It would also more closely mirror the java APIs which I understand is a goal here.

Edit: That being said if providing an idiomatic API for Rust users is more important than matching the Java APIs I can make alternative suggestions along those lines.

@JanKaul
Copy link
Collaborator

JanKaul commented Nov 13, 2024

My two cents:

I think it would make sense to list all requirements. The main requirements I can think of are:

  1. Abstraction over different object stores
  2. Harmonization of configuration specific to object storage

If the mentioned requirements are the main motivation, I think it makes sense to use ObjectStore directly. I don't think there is value by wrapping an abstraction in another abstraction. And as people have stated before ObjectStore is a widely used rust library that has a nice integration with arrow/parquet/datafusion.

Regarding 2: I think the common iceberg configuration could be handled with something like the existing parse_url_opt that parses a given iceberg configuration. This doesn't require a struct. In my opinion, the rest of the table configuration should be part of the table and not of FileIO.

So for me it comes down to, what are the other requirements? And do they prohibit the use of a trait because of object safety.

@gruuya
Copy link
Contributor

gruuya commented Nov 13, 2024

+1 for making FileIO a trait (if not using ObjectStore directly).

There's an analogy here with delta-rs, in that it also has its own high-level abstraction called LogStore, with which you can construct the DeltaTables. However the ObjectStore is a first-class citizen, and it is straightforward to construct a LogStore out of an ObjectStore, making the integration smooth(er).

what are the other requirements?

Perhaps ergonomics can be counted as one? Things like wrapping your FileIO/store/client with a path prefix, listing objects in a given path (I find this useful for debugging sometimes) and doing some more sophisticated stuff such as multipart uploads are all quite useful to us (and they're built into the object_store crate).

@JanKaul
Copy link
Collaborator

JanKaul commented Nov 13, 2024

To increase the ergonomics one could introduce an additional trait that would be implemented on top of ObjectStore, like so:

#[async_trait]
pub trait FileIO {
    async fn get_metadata(&self, location: Path) -> Result<TableMetadata, Error>;
}

#[async_trait]
impl<T: ObjectStore> FileIO for T {
    async fn get_metadata(&self, location: Path) -> Result<TableMetadata, Error> {
        let bytes = self.get(&location).await?.bytes().await?;
        Ok(serde_json::from_slice(&bytes)?)
    }
}

This provides better ergonomics while maintaining all the flexibility that you get with ObjectStore.

@Xuanwo
Copy link
Member

Xuanwo commented Nov 13, 2024

Hi, the FileIOProvider, FileIOExtension, and FileIO trait all look good to me. We can initiate a design to gradually implement them. I can imagine having crates like iceberg-fileio-opendal, iceberg-fileio-object-store, iceberg-fileio-hdfs, and iceberg-fileio-iouring. We should also allow users to implement file I/O on their own, as we do for Catalog.

Using object_store directly didn't seem ideal to me. iceberg-rust needs to build its own storage abstraction and integrate with the ecosystem, similar to what we do for our expression framework, so iceberg can always select the best implementations.

@BlakeOrth
Copy link

Wow, I did not expect my comment digging up a nearly year old issue to result in this much discussion!

I likely have less skin in the game than any of the maintainers here, but I think @gruuya and @JanKaul's most recent comments mirror what I personally had in mind. That being said, I think pretty much any of the proposed solutions would allow users to solve the underlying problem here.

I would also like to highlight @tustvold's comment:

It would also more closely mirror the java APIs which I understand is a goal here.
Edit: That being said if providing an idiomatic API for Rust users is more important than matching the Java APIs I can make alternative suggestions along those lines.

I personally think having an API that's idiomatic ends up being very important for long-term usability, and in turn the long-term health, of a given package. Maintaining similar units of abstraction, and the names of those units of abstraction, across a multi-language ecosystem such as arrow/iceberg/etc. is important for developer familiarity. However, I have personally seen multiple instances where the community has entirely re-implemented a package because the original API was mirrored from another language and was not ergonomic to use.

@liurenjie1024
Copy link
Contributor

Thanks for everyone joining the discussion here. I think we have reached some conclusions here:

  1. We need to support different storages, like s3, google cloud storage.
  2. We need to support differnt io layer, like opendal, object_store.

One thing undetermined is about the relationship with java api. The current design tries to be close to java api, but keep idiomatic for rust users. That's why we are not making FileIO, InputFile, OutputFile structs rather traits.

While both @tustvold and @alamb 's suggestions are great, one of my concerns is breaknig current api. In fact, in current we have enough room for extensions. For example, if we want to use object-store for s3, we could extend the Storage enum, which is invisible to user. Currently InputFile, OutputFile has a field for opendal's Operator, but they are private fields and hidden from user. So, I would suggest following changes to extend FileIO to support object_store crate:

  1. Make Storage a trait, rather an enum like following:
#[async_trait]
pub(crate) trait Storage {
  async fn  create_reader(&self, path: &str) -> Result<Arc<dyn FileRead>>;
  async fn create_writer(&self, path: &str) -> Result<Arc<dyn FileWrite>>;
}
  1. Change FileIOBuilder's behavior to take into one extra parameter into account:
s3.provider = (opendal) | (object_store)
  1. Add different implements for Storage trait, for example OpenDALStroage, ObjectStoreStorage

One missing point point of this design is that we don't allow user to provide external Storage implementation not included in this crate. While I'm not sure if there is requirements for this, this is still possible to allow user to inject sth like StorageProvider in FileIOBuilder.

@tustvold
Copy link
Author

Sounds like a good compromise, did you have any thoughts on how this might integrate with the existing Datafusion machinery? I'm mainly thinking for configuration, so users get a good out of box experience, but one could see the benefits of iceberg using more of DFs first party support for things like parquet

@liurenjie1024
Copy link
Contributor

Sounds like a good compromise, did you have any thoughts on how this might integrate with the existing Datafusion machinery? I'm mainly thinking for configuration, so users get a good out of box experience, but one could see the benefits of iceberg using more of DFs first party support for things like parquet

I'm not sure I get your point about the existing Datafusion machinery, do you mean how to pass configuration down to object_store in FileIOBuilder?

@tustvold
Copy link
Author

tustvold commented Nov 18, 2024

point about the existing Datafusion machinery

DataFusion provides an ObjectStoreRegistry as part of the SessionContext. This is then what various abstractions like ParquetExec hook into.

By integrating with this iceberg-rs would better interoperate with the rest of the DataFusion ecosystem, be they other catalogs like listing table, deltalake, Hive, etc... or unusual deployment scenarios with custom caching object stores, etc... It seems unfortunate for users to need to configure iceberg-rs separately from the rest of DataFusion. It would also benefit from the ongoing work to improve those components and systems.

I don't know to what extent the desire is to make iceberg-rs a standalone library that mirrors the Java APIs and configuration, but I thought it worthwhile to at least make the case for closer integration with DataFusion. It seems like quite a lot of undifferentiated toil to rebuild the quite subtle logic around predicate pushdown, concurrent decode, etc...

Edit: to ground this a bit more, the advantage of a trait based approach, is the DF bindings could provide a component wrapping SessionContext or similar, without forcing iceberg to take a dependency on DF or maintain this mapping

@liurenjie1024
Copy link
Contributor

point about the existing Datafusion machinery

DataFusion provides an ObjectStoreRegistry as part of the SessionContext. This is then what various abstractions like ParquetExec hook into.

By integrating with this iceberg-rs would better interoperate with the rest of the DataFusion ecosystem, be they other catalogs like listing table, deltalake, Hive, etc... or unusual deployment scenarios with custom caching object stores, etc... It seems unfortunate for users to need to configure iceberg-rs separately from the rest of DataFusion. It would also benefit from the ongoing work to improve those components and systems.

I don't know to what extent the desire is to make iceberg-rs a standalone library that mirrors the Java APIs and configuration, but I thought it worthwhile to at least make the case for closer integration with DataFusion. It seems like quite a lot of undifferentiated toil to rebuild the quite subtle logic around predicate pushdown, concurrent decode, etc...

Edit: to ground this a bit more, the advantage of a trait based approach, is the DF bindings could provide a component wrapping SessionContext or similar, without forcing iceberg to take a dependency on DF or maintain this mapping

If we want to allow integrating with ObjectStoreRegistry, we would need one more trait like StorageProvider:

#[async_trait]
pub trait StorageProvider {
  async fn build(&self, configs: &HashMap<String, String>, url: &str) -> Arc<dyn Stroage>;
}

@tustvold
Copy link
Author

I think that should work, the DataFusion wrapper can just hook the iceberg metadata operations into via that StorageProvider trait, and then use the DataFusion machinery directly for the actual file I/O. I presume this would be possible, or does this library expect to also perform the file I/O?

@BlakeOrth
Copy link

@liurenjie1024 I have taken some time to explore an implementation based on your suggestion above, just as I did for the user extensible Storage proposed earlier. Unfortunately, I don't currently see a way to implement what you've suggested without either some API breaking change or making every component in the FileIO chain (Storage, FileIO, InputFile and OutputFile) aware of their concrete implementation, effectively rendering the proposed Storage trait somewhat moot. I'll summarize the issues here, and would be happy to push a draft PR if anyone would like to look at the full implementation details.

There's a chance I didn't fully grasp your proposed solution, so if this solution isn't what you had in mind, please let me know. Based on your proposal, I changed Storage from an enum to a trait that fulfills the basic functions of the existing FileIO API.

#[async_trait::async_trait]
pub(crate) trait Storage: Debug {
    async fn create_reader(&self, path: &str) -> Result<Arc<dyn FileRead>>;
    async fn create_writer(&self, path: &str) -> Result<Arc<dyn FileWrite>>;
    async fn metadata(&self, path: &str) -> Result<FileMetadata>;
    async fn exists(&self, path: &str) -> Result<bool>;
    async fn delete(&self, path: &str) -> Result<()>;
}

Ideally I believe FileIO, InputFile and OutputFile would end up looking as follows:

#[derive(Clone, Debug)]
pub struct FileIO {
    inner: Arc<dyn Storage>,
}

pub struct InputFile {
    storage: Arc<dyn Storage>,
    path: String,
}

pub struct OutputFile {
    storage: Arc<dyn Storage>,
    path: String,
}

It seems like the Storage trait as defined above should allow each of them to operate as expected, but the API for both InputFile and OutputFile have methods that are incompatible.

impl InputFile {
    ...
    pub async fn reader(&self) -> crate::Result<impl FileRead> {
        ...
    }
}

impl OutputFile {
    ...
    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
       ...
    }
}

Since both of these methods return an owned trait implementation the problems with them are similar. For reader it seemed simple enough to write impl FileRead for Arc<dyn FileRead>, but this unfortunately requires FileRead be become Send which breaks the existing API. For writer the only way to consistently go from an Arc<dyn FileWrite> to a Box<dyn FileWrite> would be for FileWrite to be Clone, which breaks the existing API.

To get around the above issues I experimented with embedding an implementation specific Config inside InputFile, OutputFile that allows the full creation of a new, owned FileRead and FileWrite. While this seems possible to do, it does make every component implementation specific which I feel goes against the purpose of the proposed Storage trait.

As I said earlier, I'm happy to post a draft PR with the full change set (there are multiple other items to work out that I'm ignoring here) if anyone would like to see it. If there are other ideas on how to make the Storage trait work with the above methods I'd be happy to explore additional changes there as well.

@liurenjie1024
Copy link
Contributor

I think that should work, the DataFusion wrapper can just hook the iceberg metadata operations into via that StorageProvider trait, and then use the DataFusion machinery directly for the actual file I/O. I presume this would be possible, or does this library expect to also perform the file I/O?

I think that depends. This library provides several functionality for reading:

  1. Planning data files to read for table scan. This involves access to metadata files, and it use FileIO to do that.
  2. A builtin reader to convert planned table scan tasks to arrow record batch stream. In this case, we also use FileIO to access data files.

For integration with query engines like datafusion, datafusion only need to use planning file api, and perform scanning by itself.

@liurenjie1024
Copy link
Contributor

liurenjie1024 commented Nov 25, 2024

Hi, @BlakeOrth Thanks for trying this, and yes it's quite close to what's in my mind. So the question is that we can't keep the return type of reader/writer function? I think there are two approaches to this problem:

  1. Breaking current api and change return type to Arc<dyn FileRead>/Arc<dyn FileWrite> .
  2. Provide a wrapper around this. For example,
struct FileReadWrapper(Arc<dyn FileRead>);
impl FileRead for FileReaderWrapper {
    ...
}

Solution 2 is expected maintain backward compatibility, but it's somehow ugly since it maintains another layer of unnecessary abstraction. There is a solution which combines the benefits of both:

  1. Use solution 2.
  2. Mark current read/write method as deprecated.
  3. Provide another method to return types like Result<Arc<dyn FileRead>>/Result<Arc<dyn FileWrite>> and suggest others to use this method.
  4. Remove deprecated method in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants