Skip to content

Commit

Permalink
ARROW-16226: [C++] Add better coverage for filesystem tell. (#14064)
Browse files Browse the repository at this point in the history
Based on [ARROW-16226](https://issues.apache.org/jira/browse/ARROW-16226).

Adds coverage to GenericFileSystemTest::TestOpenInput(Stream|File) for validating Tell() and reads after seeking.

Authored-by: benibus <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
benibus authored Sep 22, 2022
1 parent 4cb7b50 commit 44ae852
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 14 deletions.
40 changes: 28 additions & 12 deletions cpp/src/arrow/filesystem/gcsfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,20 @@ class GcsInputStream : public arrow::io::InputStream {
return Status::OK();
}

Result<int64_t> Tell() const override {
Result<int64_t> Tell() const override { return TellOr(nread_); }

// At EOF, gcs::ObjectReadStream::tellg() returns -1, but our APIs canonically return
// the stream size. This method helps with the conversion.
Result<int64_t> TellOr(int64_t max_pos) const {
if (closed()) return Status::Invalid("Cannot use Tell() on a closed stream");
return stream_.tellg();
int64_t pos = stream_.tellg();
if (pos < 0) {
if (!stream_.eof()) {
return Status::IOError("Tell() failed before end of stream");
}
return max_pos;
}
return pos;
}

// A gcs::ObjectReadStream can be "born closed". For small objects the stream returns
Expand All @@ -140,15 +151,19 @@ class GcsInputStream : public arrow::io::InputStream {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
stream_.read(static_cast<char*>(out), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
return stream_.gcount();
int64_t nread = stream_.gcount();
nread_ += nread;
return nread;
}

Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
if (closed()) return Status::Invalid("Cannot read from a closed stream");
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes));
stream_.read(reinterpret_cast<char*>(buffer->mutable_data()), nbytes);
ARROW_GCS_RETURN_NOT_OK(stream_.status());
RETURN_NOT_OK(buffer->Resize(stream_.gcount(), true));
int64_t nread = stream_.gcount();
nread_ += nread;
RETURN_NOT_OK(buffer->Resize(nread, true));
return std::shared_ptr<Buffer>(std::move(buffer));
}
//@}
Expand All @@ -167,6 +182,7 @@ class GcsInputStream : public arrow::io::InputStream {
GcsPath path_;
gcs::Generation generation_;
gcs::Client client_;
int64_t nread_ = 0; // Total bytes consumed (updated after each Read())
bool closed_ = false;
};

Expand Down Expand Up @@ -226,13 +242,13 @@ class GcsOutputStream : public arrow::io::OutputStream {
bool closed_ = false;
};

using InputStreamFactory = std::function<Result<std::shared_ptr<io::InputStream>>(
using InputStreamFactory = std::function<Result<std::shared_ptr<GcsInputStream>>(
gcs::Generation, gcs::ReadFromOffset)>;

class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
public:
GcsRandomAccessFile(InputStreamFactory factory, gcs::ObjectMetadata metadata,
std::shared_ptr<io::InputStream> stream)
std::shared_ptr<GcsInputStream> stream)
: factory_(std::move(factory)),
metadata_(std::move(metadata)),
stream_(std::move(stream)) {}
Expand All @@ -242,7 +258,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
// @name FileInterface
Status Close() override { return stream_->Close(); }
Status Abort() override { return stream_->Abort(); }
Result<int64_t> Tell() const override { return stream_->Tell(); }
Result<int64_t> Tell() const override { return stream_->TellOr(metadata_.size()); }
bool closed() const override { return stream_->closed(); }
//@}

Expand Down Expand Up @@ -296,7 +312,7 @@ class GcsRandomAccessFile : public arrow::io::RandomAccessFile {
private:
InputStreamFactory factory_;
gcs::ObjectMetadata metadata_;
std::shared_ptr<io::InputStream> stream_;
std::shared_ptr<GcsInputStream> stream_;
};

google::cloud::Options AsGoogleCloudOptions(const GcsOptions& o) {
Expand Down Expand Up @@ -598,15 +614,15 @@ class GcsFileSystem::Impl {
return internal::ToArrowStatus(metadata.status());
}

Result<std::shared_ptr<io::InputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
Result<std::shared_ptr<GcsInputStream>> OpenInputStream(const GcsPath& path,
gcs::Generation generation,
gcs::ReadFromOffset offset) {
auto stream = client_.ReadObject(path.bucket, path.object, generation, offset);
ARROW_GCS_RETURN_NOT_OK(stream.status());
return std::make_shared<GcsInputStream>(std::move(stream), path, generation, client_);
}

Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
Result<std::shared_ptr<GcsOutputStream>> OpenOutputStream(
const GcsPath& path, const std::shared_ptr<const KeyValueMetadata>& metadata) {
std::shared_ptr<const KeyValueMetadata> resolved_metadata = metadata;
if (resolved_metadata == nullptr && options_.default_metadata != nullptr) {
Expand Down
26 changes: 24 additions & 2 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,12 +971,21 @@ void GenericFileSystemTest::TestOpenInputStream(FileSystem* fs) {
ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream("AB/abc"));
ASSERT_OK_AND_ASSIGN(auto metadata, stream->ReadMetadata());
// XXX we cannot really test anything more about metadata...
ASSERT_OK_AND_EQ(0, stream->Tell());
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(4));
AssertBufferEqual(*buffer, "some");
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(6));
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(6 /*Remaining + 1*/));
AssertBufferEqual(*buffer, " data");
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(1));
AssertBufferEqual(*buffer, "");
ASSERT_OK_AND_EQ(9, stream->Tell());
ASSERT_OK(stream->Close());

ASSERT_OK_AND_ASSIGN(stream, fs->OpenInputStream("AB/abc"));
ASSERT_OK(stream->Advance(4));
ASSERT_OK_AND_EQ(4, stream->Tell());
ASSERT_OK_AND_ASSIGN(buffer, stream->Read(6 /*Remaining + 1*/));
AssertBufferEqual(*buffer, " data");
ASSERT_OK(stream->Close());
ASSERT_RAISES(Invalid, stream->Read(1)); // Stream is closed

Expand Down Expand Up @@ -1056,8 +1065,21 @@ void GenericFileSystemTest::TestOpenInputFile(FileSystem* fs) {
std::shared_ptr<io::RandomAccessFile> file;
std::shared_ptr<Buffer> buffer;
ASSERT_OK_AND_ASSIGN(file, fs->OpenInputFile("AB/abc"));
ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(5, 6));
ASSERT_OK_AND_EQ(0, file->Tell());
ASSERT_OK(file->Seek(10));
ASSERT_OK_AND_EQ(10, file->Tell());
ASSERT_OK_AND_ASSIGN(buffer, file->Read(6 /*Remaining + 1*/));
AssertBufferEqual(*buffer, " data");
ASSERT_OK_AND_ASSIGN(buffer, file->Read(1));
AssertBufferEqual(*buffer, "");
ASSERT_OK_AND_EQ(15, file->Tell());
ASSERT_OK(file->Seek(5));
ASSERT_OK_AND_EQ(5, file->Tell());
ASSERT_OK_AND_ASSIGN(buffer, file->Read(6));
AssertBufferEqual(*buffer, "other ");
// Should return the same slice independent of the current position
ASSERT_OK_AND_ASSIGN(buffer, file->ReadAt(2, 3));
AssertBufferEqual(*buffer, "me ");
ASSERT_OK_AND_EQ(15, file->GetSize());
ASSERT_OK(file->Close());
ASSERT_RAISES(Invalid, file->ReadAt(1, 1)); // Stream is closed
Expand Down

0 comments on commit 44ae852

Please sign in to comment.