Skip to content

Commit

Permalink
Implement ZeroCopyInputStream::ReadCord() in terms of absl::CordBuffer
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 492078322
  • Loading branch information
martijnvels authored and copybara-github committed Dec 1, 2022
1 parent 9b0a8d3 commit 75d31be
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
63 changes: 63 additions & 0 deletions src/google/protobuf/io/zero_copy_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,76 @@

#include "google/protobuf/io/zero_copy_stream.h"

#include <utility>

#include "google/protobuf/stubs/logging.h"
#include "google/protobuf/stubs/common.h"
#include "absl/strings/cord.h"
#include "absl/strings/cord_buffer.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"

// Must be included last.
#include "google/protobuf/port_def.inc"

namespace google {
namespace protobuf {
namespace io {

bool ZeroCopyInputStream::ReadCord(absl::Cord* cord, int count) {
if (count <= 0) return true;

absl::CordBuffer cord_buffer = cord->GetAppendBuffer(count);
absl::Span<char> out = cord_buffer.available_up_to(count);

auto FetchNextChunk = [&]() -> absl::Span<const char> {
const void* buffer;
int size;
if (!Next(&buffer, &size)) return {};

if (size > count) {
BackUp(size - count);
size = count;
}
return absl::MakeConstSpan(static_cast<const char*>(buffer), size);
};

auto AppendFullBuffer = [&]() -> absl::Span<char> {
cord->Append(std::move(cord_buffer));
cord_buffer = absl::CordBuffer::CreateWithDefaultLimit(count);
return cord_buffer.available_up_to(count);
};

auto CopyBytes = [&](absl::Span<char>& dst, absl::Span<const char>& src,
size_t bytes) {
memcpy(dst.data(), src.data(), bytes);
dst.remove_prefix(bytes);
src.remove_prefix(bytes);
count -= bytes;
cord_buffer.IncreaseLengthBy(bytes);
};

do {
absl::Span<const char> in = FetchNextChunk();
if (in.empty()) {
// Append whatever we have pending so far.
cord->Append(std::move(cord_buffer));
return false;
}

if (out.empty()) out = AppendFullBuffer();

while (in.size() > out.size()) {
CopyBytes(out, in, out.size());
out = AppendFullBuffer();
}

CopyBytes(out, in, in.size());
} while (count > 0);

cord->Append(std::move(cord_buffer));
return true;
}

bool ZeroCopyOutputStream::WriteAliasedRaw(const void* /* data */,
int /* size */) {
Expand Down
13 changes: 13 additions & 0 deletions src/google/protobuf/io/zero_copy_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
#define GOOGLE_PROTOBUF_IO_ZERO_COPY_STREAM_H__

#include "google/protobuf/stubs/common.h"
#include "absl/strings/cord.h"
#include "google/protobuf/port.h"


Expand Down Expand Up @@ -183,6 +184,18 @@ class PROTOBUF_EXPORT ZeroCopyInputStream {
// Returns the total number of bytes read since this object was created.
virtual int64_t ByteCount() const = 0;

// Read the next `count` bytes and append it to the given Cord.
//
// In the case of a read error, the method reads as much data as possible into
// the cord before returning false. The default implementation iterates over
// the buffers and appends up to `count` bytes of data into `cord` using the
// `absl::CordBuffer` API.
//
// Some streams may implement this in a way that avoids copying by sharing or
// reference counting existing data managed by the stream implementation.
//
virtual bool ReadCord(absl::Cord* cord, int count);

};

// Abstract interface similar to an output stream but designed to minimize
Expand Down
88 changes: 88 additions & 0 deletions src/google/protobuf/io/zero_copy_stream_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
// "parametized tests" so that one set of tests can be used on all the
// implementations.

#include <algorithm>
#include <chrono>
#include <thread>

Expand All @@ -62,6 +63,7 @@
#include <memory>
#include <sstream>
#include <utility>
#include <vector>

#include "google/protobuf/testing/file.h"
#include "google/protobuf/io/coded_stream.h"
Expand All @@ -79,6 +81,9 @@
#include "google/protobuf/testing/googletest.h"
#include <gtest/gtest.h>

// Must be included last.
#include "google/protobuf/port_def.inc"

namespace google {
namespace protobuf {
namespace io {
Expand Down Expand Up @@ -737,6 +742,89 @@ TEST_F(IoTest, LargeOutput) {
#endif // THREAD_SANITIZER
}

TEST(DefaultReadCordTest, ReadSmallCord) {
std::string source = "abcdefghijk";
ArrayInputStream input(source.data(), source.size());

absl::Cord dest;
EXPECT_TRUE(input.Skip(1));
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));

EXPECT_EQ(dest, "bcdefghij");
}

TEST(DefaultReadCordTest, ReadSmallCordAfterBackUp) {
std::string source = "abcdefghijk";
ArrayInputStream input(source.data(), source.size());

absl::Cord dest;
const void* buffer;
int size;
EXPECT_TRUE(input.Next(&buffer, &size));
input.BackUp(size - 1);

EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));

EXPECT_EQ(dest, "bcdefghij");
}

TEST(DefaultReadCordTest, ReadLargeCord) {
std::string source = "abcdefghijk";
for (int i = 0; i < 1024; i++) {
source.append("abcdefghijk");
}

absl::Cord dest;
ArrayInputStream input(source.data(), source.size());
EXPECT_TRUE(input.Skip(1));
EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));

absl::Cord expected(source);
expected.RemovePrefix(1);
expected.RemoveSuffix(1);

EXPECT_EQ(expected, dest);
}

TEST(DefaultReadCordTest, ReadLargeCordAfterBackup) {
std::string source = "abcdefghijk";
for (int i = 0; i < 1024; i++) {
source.append("abcdefghijk");
}

absl::Cord dest;
ArrayInputStream input(source.data(), source.size());

const void* buffer;
int size;
EXPECT_TRUE(input.Next(&buffer, &size));
input.BackUp(size - 1);

EXPECT_TRUE(input.ReadCord(&dest, source.size() - 2));

absl::Cord expected(source);
expected.RemovePrefix(1);
expected.RemoveSuffix(1);

EXPECT_EQ(expected, dest);

EXPECT_TRUE(input.Next(&buffer, &size));
EXPECT_EQ("k", std::string(reinterpret_cast<const char*>(buffer), size));
}

TEST(DefaultReadCordTest, ReadCordEof) {
std::string source = "abcdefghijk";

absl::Cord dest;
ArrayInputStream input(source.data(), source.size());
input.Skip(1);
EXPECT_FALSE(input.ReadCord(&dest, source.size()));

absl::Cord expected(source);
expected.RemovePrefix(1);
EXPECT_EQ(expected, dest);
}


// To test files, we create a temporary file, write, read, truncate, repeat.
TEST_F(IoTest, FileIo) {
Expand Down

0 comments on commit 75d31be

Please sign in to comment.