Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io interfaces. Create arrow_ipc leaf library #138

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,6 @@ set(ARROW_SRCS
src/arrow/table.cc
src/arrow/type.cc

# IPC / Shared memory library; to be turned into an optional component
src/arrow/ipc/adapter.cc
src/arrow/ipc/memory.cc
src/arrow/ipc/metadata.cc
src/arrow/ipc/metadata-internal.cc

src/arrow/types/construct.cc
src/arrow/types/decimal.cc
src/arrow/types/json.cc
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

set(ARROW_IO_LINK_LIBS
arrow_shared
dl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed, maybe you mixed somewhere a static and a shared build?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using dlopen in libhdfs_shim.cc, I get a linker error without linking libdl

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. I had the problem with missing dlopen symbols in parquet-cpp as there static and shared test libs where mixed. There it would have been only a workaround to link dl but as we explicitly use it here, it's fine.

)

if (ARROW_BOOST_USE_SHARED)
Expand All @@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS
${ARROW_IO_PRIVATE_LINK_LIBS})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need a arrow_io_static here to solve the linking problems in the tests.


set(ARROW_IO_SRCS
memory.cc
)

if(ARROW_HDFS)
Expand Down Expand Up @@ -71,8 +73,8 @@ if(ARROW_HDFS)
${ARROW_HDFS_SRCS}
${ARROW_IO_SRCS})

ADD_ARROW_TEST(hdfs-io-test)
ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
ADD_ARROW_TEST(io-hdfs-test)
ARROW_TEST_LINK_LIBRARIES(io-hdfs-test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's why you need dl. This should probably be conditional on if we build static or shared tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, the issue was actually dlopen, dlsym, etc.

${ARROW_IO_TEST_LINK_LIBS})
endif()

Expand Down Expand Up @@ -101,10 +103,15 @@ if (APPLE)
INSTALL_NAME_DIR "@rpath")
endif()

ADD_ARROW_TEST(io-memory-test)
ARROW_TEST_LINK_LIBRARIES(io-memory-test
${ARROW_IO_TEST_LINK_LIBS})

# Headers: top level
install(FILES
hdfs.h
interfaces.h
memory.h
DESTINATION include/arrow/io)

install(TARGETS arrow_io
Expand Down
35 changes: 22 additions & 13 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt(
return impl_->ReadAt(position, nbytes, bytes_read, buffer);
}

Status HdfsReadableFile::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::NotImplemented("Not yet implemented");
}

bool HdfsReadableFile::supports_zero_copy() const {
return false;
}

Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
return impl_->Read(nbytes, bytes_read, buffer);
}
Expand All @@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) {
// File writing

// Private implementation for writeable-only files
class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
public:
HdfsWriteableFileImpl() {}
HdfsOutputStreamImpl() {}

Status Close() {
if (is_open_) {
Expand All @@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
}
};

HdfsWriteableFile::HdfsWriteableFile() {
impl_.reset(new HdfsWriteableFileImpl());
HdfsOutputStream::HdfsOutputStream() {
impl_.reset(new HdfsOutputStreamImpl());
}

HdfsWriteableFile::~HdfsWriteableFile() {
HdfsOutputStream::~HdfsOutputStream() {
impl_->Close();
}

Status HdfsWriteableFile::Close() {
Status HdfsOutputStream::Close() {
return impl_->Close();
}

Status HdfsWriteableFile::Write(
Status HdfsOutputStream::Write(
const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
return impl_->Write(buffer, nbytes, bytes_read);
}

Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) {
int64_t bytes_written_dummy = 0;
return Write(buffer, nbytes, &bytes_written_dummy);
}

Status HdfsWriteableFile::Tell(int64_t* position) {
Status HdfsOutputStream::Tell(int64_t* position) {
return impl_->Tell(position);
}

Expand Down Expand Up @@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl {

Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file) {
std::shared_ptr<HdfsOutputStream>* file) {
int flags = O_WRONLY;
if (append) flags |= O_APPEND;

Expand All @@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl {
}

// std::make_shared does not work with private ctors
*file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
*file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
(*file)->impl_->set_members(path, fs_, handle);

return Status::OK();
Expand Down Expand Up @@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable(

Status HdfsClient::OpenWriteable(const std::string& path, bool append,
int32_t buffer_size, int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file) {
std::shared_ptr<HdfsOutputStream>* file) {
return impl_->OpenWriteable(
path, append, buffer_size, replication, default_block_size, file);
}

Status HdfsClient::OpenWriteable(
const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file) {
const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file) {
return OpenWriteable(path, append, 0, 0, 0, file);
}

Expand Down
29 changes: 18 additions & 11 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@

namespace arrow {

class Buffer;
class Status;

namespace io {

class HdfsClient;
class HdfsReadableFile;
class HdfsWriteableFile;
class HdfsOutputStream;

struct HdfsPathInfo {
ObjectType::type kind;
Expand Down Expand Up @@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
// @param default_block_size, 0 for default
Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file);
std::shared_ptr<HdfsOutputStream>* file);

Status OpenWriteable(
const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file);
const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file);

private:
friend class HdfsReadableFile;
friend class HdfsWriteableFile;
friend class HdfsOutputStream;

class ARROW_NO_EXPORT HdfsClientImpl;
std::unique_ptr<HdfsClientImpl> impl_;
Expand All @@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
DISALLOW_COPY_AND_ASSIGN(HdfsClient);
};

class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
public:
~HdfsReadableFile();

Expand All @@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;

Expand All @@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
};

class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
// Naming this file OutputStream because it does not support seeking (like the
// WriteableFile interface)
class ARROW_EXPORT HdfsOutputStream : public OutputStream {
public:
~HdfsWriteableFile();
~HdfsOutputStream();

Status Close() override;

Expand All @@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
Status Tell(int64_t* position) override;

private:
class ARROW_NO_EXPORT HdfsWriteableFileImpl;
std::unique_ptr<HdfsWriteableFileImpl> impl_;
class ARROW_NO_EXPORT HdfsOutputStreamImpl;
std::unique_ptr<HdfsOutputStreamImpl> impl_;

friend class HdfsClient::HdfsClientImpl;

HdfsWriteableFile();
HdfsOutputStream();

DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
};

Status ARROW_EXPORT ConnectLibHdfs();
Expand Down
71 changes: 61 additions & 10 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
#include <cstdint>
#include <memory>

#include "arrow/util/macros.h"

namespace arrow {

class Buffer;
class Status;

namespace io {
Expand All @@ -40,30 +43,78 @@ class FileSystemClient {
virtual ~FileSystemClient() {}
};

class FileBase {
class FileInterface {
public:
virtual ~FileInterface() {}
virtual Status Close() = 0;
virtual Status Tell(int64_t* position) = 0;

FileMode::type mode() const { return mode_; }

protected:
FileInterface() {}
FileMode::type mode_;

void set_mode(FileMode::type mode) { mode_ = mode; }

private:
DISALLOW_COPY_AND_ASSIGN(FileInterface);
};

class ReadableFile : public FileBase {
class Seekable {
public:
virtual Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
virtual Status Seek(int64_t position) = 0;
};

virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
class Writeable {
public:
virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;
};

virtual Status GetSize(int64_t* size) = 0;
class Readable {
public:
virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
};

class OutputStream : public FileInterface, public Writeable {
protected:
OutputStream() {}
};

class RandomAccessFile : public ReadableFile {
class InputStream : public FileInterface, public Readable {
protected:
InputStream() {}
};

class ReadableFileInterface : public InputStream, public Seekable {
public:
virtual Status Seek(int64_t position) = 0;
virtual Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;

virtual Status GetSize(int64_t* size) = 0;

// Does not copy if not necessary
virtual Status ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;

virtual bool supports_zero_copy() const = 0;

protected:
ReadableFileInterface() { set_mode(FileMode::READ); }
};

class WriteableFile : public FileBase {
class WriteableFileInterface : public OutputStream, public Seekable {
public:
virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0;

protected:
WriteableFileInterface() { set_mode(FileMode::READ); }
};

class ReadWriteFileInterface : public ReadableFileInterface,
public WriteableFileInterface {
protected:
ReadWriteFileInterface() { ReadableFileInterface::set_mode(FileMode::READWRITE); }
};

} // namespace io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TestHdfsClient : public ::testing::Test {
Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
bool append = false, int buffer_size = 0, int replication = 0,
int default_block_size = 0) {
std::shared_ptr<HdfsWriteableFile> file;
std::shared_ptr<HdfsOutputStream> file;
RETURN_NOT_OK(client_->OpenWriteable(
path, append, buffer_size, replication, default_block_size, &file));

Expand Down
Loading