Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-194: C++: Allow read-only memory mapped source #72

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions cpp/src/arrow/ipc/ipc-memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@

#include "arrow/ipc/memory.h"
#include "arrow/ipc/test-common.h"
#include "arrow/test-util.h"
#include "arrow/util/buffer.h"
#include "arrow/util/status.h"

namespace arrow {
namespace ipc {
Expand Down Expand Up @@ -67,6 +64,57 @@ TEST_F(TestMemoryMappedSource, WriteRead) {
}
}

TEST_F(TestMemoryMappedSource, ReadOnly) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);

test::random_bytes(1024, 0, buffer.data());

const int reps = 5;

std::string path = "ipc-read-only-test";
CreateFile(path, reps * buffer_size);

std::shared_ptr<MemoryMappedSource> rwmmap;
ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap));

int64_t position = 0;
for (int i = 0; i < reps; ++i) {
ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size));

position += buffer_size;
}
rwmmap->Close();

std::shared_ptr<MemoryMappedSource> rommap;
ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));

position = 0;
std::shared_ptr<Buffer> out_buffer;
for (int i = 0; i < reps; ++i) {
ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));

ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
position += buffer_size;
}
rommap->Close();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be nice to test that writing to a the file opened in read only mode gives an appropriate error status.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the good suggestion.


TEST_F(TestMemoryMappedSource, InvalidMode) {
const int64_t buffer_size = 1024;
std::vector<uint8_t> buffer(buffer_size);

test::random_bytes(1024, 0, buffer.data());

std::string path = "ipc-invalid-mode-test";
CreateFile(path, buffer_size);

std::shared_ptr<MemoryMappedSource> rommap;
ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));

ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size));
}

TEST_F(TestMemoryMappedSource, InvalidFile) {
std::string non_existent_path = "invalid-file-name-asfd";

Expand Down
22 changes: 15 additions & 7 deletions cpp/src/arrow/ipc/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ MemorySource::~MemorySource() {}

class MemoryMappedSource::Impl {
public:
Impl() : file_(nullptr), is_open_(false), data_(nullptr) {}
Impl() : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {}

~Impl() {
if (is_open_) {
Expand All @@ -53,10 +53,12 @@ class MemoryMappedSource::Impl {
Status Open(const std::string& path, MemorySource::AccessMode mode) {
if (is_open_) { return Status::IOError("A file is already open"); }

path_ = path;
int prot_flags = PROT_READ;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you deleting this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this variable is currently not used anywhere. In addition, current implementation expects a file exists in a given path, which in turn means the file is already created in some other parts of the program. So, I think MemoryMappedSource is not responsible for deleting files used for mmap.
In addition, if path is required later, we can add it again easily.


if (mode == MemorySource::READ_WRITE) {
file_ = fopen(path.c_str(), "r+b");
prot_flags |= PROT_WRITE;
is_writable_ = true;
} else {
file_ = fopen(path.c_str(), "rb");
}
Expand All @@ -73,14 +75,13 @@ class MemoryMappedSource::Impl {
fseek(file_, 0L, SEEK_SET);
is_open_ = true;

// TODO(wesm): Add read-only version of this
data_ = reinterpret_cast<uint8_t*>(
mmap(nullptr, size_, PROT_READ | PROT_WRITE, MAP_SHARED, fileno(file_), 0));
if (data_ == nullptr) {
void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0);
if (result == MAP_FAILED) {
std::stringstream ss;
ss << "Memory mapping file failed, errno: " << errno;
return Status::IOError(ss.str());
}
data_ = reinterpret_cast<uint8_t*>(result);

return Status::OK();
}
Expand All @@ -89,11 +90,15 @@ class MemoryMappedSource::Impl {

uint8_t* data() { return data_; }

bool writable() { return is_writable_; }

bool opened() { return is_open_; }

private:
std::string path_;
FILE* file_;
int64_t size_;
bool is_open_;
bool is_writable_;

// The memory map
uint8_t* data_;
Expand Down Expand Up @@ -134,6 +139,9 @@ Status MemoryMappedSource::ReadAt(
}

Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
if (!impl_->opened() || !impl_->writable()) {
return Status::IOError("Unable to write");
}
if (position < 0 || position >= impl_->size()) {
return Status::Invalid("position is out of bounds");
}
Expand Down