Skip to content

Commit

Permalink
ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io …
Browse files Browse the repository at this point in the history
…interfaces. Create arrow_ipc leaf library

Several things here

* Clean up IO interface class structure to be able to indicate precise characteristics of an implementation
* Make the IPC reader/writer use more generic interfaces -- writing only needs an output stream, reading only needs a random access reader. This will unblock ARROW-267
* Create a separate arrow_ipc shared library

Author: Wes McKinney <[email protected]>

Closes #138 from wesm/ARROW-280 and squashes the following commits:

6a59eb6 [Wes McKinney] * Restructure IO interfaces to accommodate more configurations. * Refactor memory mapped IO interfaces to be in line with other arrow::io   classes. * Split arrow_ipc into a leaf library * Refactor pyarrow and arrow_parquet to suit. Move BufferReader to   arrow_io. Pyarrow parquet tests currently segfault
  • Loading branch information
wesm committed Sep 18, 2016
1 parent 17e90e1 commit 559b865
Show file tree
Hide file tree
Showing 39 changed files with 873 additions and 603 deletions.
6 changes: 0 additions & 6 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -626,12 +626,6 @@ set(ARROW_SRCS
src/arrow/table.cc
src/arrow/type.cc

# IPC / Shared memory library; to be turned into an optional component
src/arrow/ipc/adapter.cc
src/arrow/ipc/memory.cc
src/arrow/ipc/metadata.cc
src/arrow/ipc/metadata-internal.cc

src/arrow/types/construct.cc
src/arrow/types/decimal.cc
src/arrow/types/json.cc
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

set(ARROW_IO_LINK_LIBS
arrow_shared
dl
)

if (ARROW_BOOST_USE_SHARED)
Expand All @@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS
${ARROW_IO_PRIVATE_LINK_LIBS})

set(ARROW_IO_SRCS
memory.cc
)

if(ARROW_HDFS)
Expand Down Expand Up @@ -71,8 +73,8 @@ if(ARROW_HDFS)
${ARROW_HDFS_SRCS}
${ARROW_IO_SRCS})

ADD_ARROW_TEST(hdfs-io-test)
ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
ADD_ARROW_TEST(io-hdfs-test)
ARROW_TEST_LINK_LIBRARIES(io-hdfs-test
${ARROW_IO_TEST_LINK_LIBS})
endif()

Expand Down Expand Up @@ -101,10 +103,15 @@ if (APPLE)
INSTALL_NAME_DIR "@rpath")
endif()

ADD_ARROW_TEST(io-memory-test)
ARROW_TEST_LINK_LIBRARIES(io-memory-test
${ARROW_IO_TEST_LINK_LIBS})

# Headers: top level
install(FILES
hdfs.h
interfaces.h
memory.h
DESTINATION include/arrow/io)

install(TARGETS arrow_io
Expand Down
35 changes: 22 additions & 13 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt(
return impl_->ReadAt(position, nbytes, bytes_read, buffer);
}

Status HdfsReadableFile::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::NotImplemented("Not yet implemented");
}

bool HdfsReadableFile::supports_zero_copy() const {
return false;
}

Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
return impl_->Read(nbytes, bytes_read, buffer);
}
Expand All @@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) {
// File writing

// Private implementation for writeable-only files
class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
public:
HdfsWriteableFileImpl() {}
HdfsOutputStreamImpl() {}

Status Close() {
if (is_open_) {
Expand All @@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
}
};

HdfsWriteableFile::HdfsWriteableFile() {
impl_.reset(new HdfsWriteableFileImpl());
HdfsOutputStream::HdfsOutputStream() {
impl_.reset(new HdfsOutputStreamImpl());
}

HdfsWriteableFile::~HdfsWriteableFile() {
HdfsOutputStream::~HdfsOutputStream() {
impl_->Close();
}

Status HdfsWriteableFile::Close() {
Status HdfsOutputStream::Close() {
return impl_->Close();
}

Status HdfsWriteableFile::Write(
Status HdfsOutputStream::Write(
const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
return impl_->Write(buffer, nbytes, bytes_read);
}

Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) {
int64_t bytes_written_dummy = 0;
return Write(buffer, nbytes, &bytes_written_dummy);
}

Status HdfsWriteableFile::Tell(int64_t* position) {
Status HdfsOutputStream::Tell(int64_t* position) {
return impl_->Tell(position);
}

Expand Down Expand Up @@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl {

Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file) {
std::shared_ptr<HdfsOutputStream>* file) {
int flags = O_WRONLY;
if (append) flags |= O_APPEND;

Expand All @@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl {
}

// std::make_shared does not work with private ctors
*file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
*file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
(*file)->impl_->set_members(path, fs_, handle);

return Status::OK();
Expand Down Expand Up @@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable(

Status HdfsClient::OpenWriteable(const std::string& path, bool append,
int32_t buffer_size, int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file) {
std::shared_ptr<HdfsOutputStream>* file) {
return impl_->OpenWriteable(
path, append, buffer_size, replication, default_block_size, file);
}

Status HdfsClient::OpenWriteable(
const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file) {
const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file) {
return OpenWriteable(path, append, 0, 0, 0, file);
}

Expand Down
29 changes: 18 additions & 11 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@

namespace arrow {

class Buffer;
class Status;

namespace io {

class HdfsClient;
class HdfsReadableFile;
class HdfsWriteableFile;
class HdfsOutputStream;

struct HdfsPathInfo {
ObjectType::type kind;
Expand Down Expand Up @@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
// @param default_block_size, 0 for default
Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
std::shared_ptr<HdfsWriteableFile>* file);
std::shared_ptr<HdfsOutputStream>* file);

Status OpenWriteable(
const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file);
const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file);

private:
friend class HdfsReadableFile;
friend class HdfsWriteableFile;
friend class HdfsOutputStream;

class ARROW_NO_EXPORT HdfsClientImpl;
std::unique_ptr<HdfsClientImpl> impl_;
Expand All @@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
DISALLOW_COPY_AND_ASSIGN(HdfsClient);
};

class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
public:
~HdfsReadableFile();

Expand All @@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;

Expand All @@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
};

class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
// Naming this file OutputStream because it does not support seeking (like the
// WriteableFile interface)
class ARROW_EXPORT HdfsOutputStream : public OutputStream {
public:
~HdfsWriteableFile();
~HdfsOutputStream();

Status Close() override;

Expand All @@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
Status Tell(int64_t* position) override;

private:
class ARROW_NO_EXPORT HdfsWriteableFileImpl;
std::unique_ptr<HdfsWriteableFileImpl> impl_;
class ARROW_NO_EXPORT HdfsOutputStreamImpl;
std::unique_ptr<HdfsOutputStreamImpl> impl_;

friend class HdfsClient::HdfsClientImpl;

HdfsWriteableFile();
HdfsOutputStream();

DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
};

Status ARROW_EXPORT ConnectLibHdfs();
Expand Down
71 changes: 61 additions & 10 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
#include <cstdint>
#include <memory>

#include "arrow/util/macros.h"

namespace arrow {

class Buffer;
class Status;

namespace io {
Expand All @@ -40,30 +43,78 @@ class FileSystemClient {
virtual ~FileSystemClient() {}
};

class FileBase {
class FileInterface {
public:
virtual ~FileInterface() {}
virtual Status Close() = 0;
virtual Status Tell(int64_t* position) = 0;

FileMode::type mode() const { return mode_; }

protected:
FileInterface() {}
FileMode::type mode_;

void set_mode(FileMode::type mode) { mode_ = mode; }

private:
DISALLOW_COPY_AND_ASSIGN(FileInterface);
};

class ReadableFile : public FileBase {
class Seekable {
public:
virtual Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
virtual Status Seek(int64_t position) = 0;
};

virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
class Writeable {
public:
virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;
};

virtual Status GetSize(int64_t* size) = 0;
class Readable {
public:
virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
};

class OutputStream : public FileInterface, public Writeable {
protected:
OutputStream() {}
};

class RandomAccessFile : public ReadableFile {
class InputStream : public FileInterface, public Readable {
protected:
InputStream() {}
};

class ReadableFileInterface : public InputStream, public Seekable {
public:
virtual Status Seek(int64_t position) = 0;
virtual Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;

virtual Status GetSize(int64_t* size) = 0;

// Does not copy if not necessary
virtual Status ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;

virtual bool supports_zero_copy() const = 0;

protected:
ReadableFileInterface() { set_mode(FileMode::READ); }
};

class WriteableFile : public FileBase {
class WriteableFileInterface : public OutputStream, public Seekable {
public:
virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0;

protected:
WriteableFileInterface() { set_mode(FileMode::READ); }
};

class ReadWriteFileInterface : public ReadableFileInterface,
public WriteableFileInterface {
protected:
ReadWriteFileInterface() { ReadableFileInterface::set_mode(FileMode::READWRITE); }
};

} // namespace io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TestHdfsClient : public ::testing::Test {
Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
bool append = false, int buffer_size = 0, int replication = 0,
int default_block_size = 0) {
std::shared_ptr<HdfsWriteableFile> file;
std::shared_ptr<HdfsOutputStream> file;
RETURN_NOT_OK(client_->OpenWriteable(
path, append, buffer_size, replication, default_block_size, &file));

Expand Down
Loading

0 comments on commit 559b865

Please sign in to comment.