Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(storage): gRPC zerocopy codec #10888

Merged
merged 16 commits into from
Oct 3, 2024
84 changes: 56 additions & 28 deletions storage/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,39 +128,67 @@ func TestBytesCodecV2(t *testing.T) {
},
} {
t.Run(test.desc, func(t *testing.T) {
// Encode the response.
encodedResp, err := proto.Marshal(test.resp)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}
var respData mem.BufferSlice
respData = append(respData, mem.SliceBuffer(encodedResp))

// Unmarshal and decode response using custom decoding.
var encodedBytes *mem.BufferSlice = &mem.BufferSlice{}
if err := bytesCodecV2.Unmarshal(bytesCodecV2{}, respData, encodedBytes); err != nil {
t.Fatalf("unmarshal: %v", err)
}
for _, subtest := range []struct {
desc string
splitPoints []int
}{
{
desc: "single buffer",
splitPoints: []int{},
},
{
desc: "random split buffer",
splitPoints: []int{100, 500, 700},
},
{
desc: "mid-tag split buffer",
splitPoints: []int{3, 1032},
},
} {
tritone marked this conversation as resolved.
Show resolved Hide resolved
t.Run(subtest.desc, func(t *testing.T) {
// Encode the response.
encodedResp, err := proto.Marshal(test.resp)
if err != nil {
t.Fatalf("proto.Marshal: %v", err)
}
var respData mem.BufferSlice
off := 0
// Split response into multiple buffers if needed.
for _, split := range subtest.splitPoints {
// Only split if offset is within buffer
if split > len(encodedResp) {
break
}
respData = append(respData, mem.SliceBuffer(encodedResp[off:split]))
off = split
}
respData = append(respData, mem.SliceBuffer(encodedResp[off:]))
// Unmarshal and decode response using custom decoding.
var encodedBytes *mem.BufferSlice = &mem.BufferSlice{}
if err := bytesCodecV2.Unmarshal(bytesCodecV2{}, respData, encodedBytes); err != nil {
t.Fatalf("unmarshal: %v", err)
}

decoder := &readResponseDecoder{
databufs: encodedBytes,
}
decoder := &readResponseDecoder{
databufs: encodedBytes,
}

msg, offsets, err := decoder.readFullObjectResponse()
if err != nil {
t.Fatalf("readFullObjectResponse: %v", err)
}
msg, offsets, err := decoder.readFullObjectResponse()
if err != nil {
t.Fatalf("readFullObjectResponse: %v", err)
}

// Compare the result with the original ReadObjectResponse, without the content
if diff := cmp.Diff(msg, test.resp, protocmp.Transform(), protocmp.IgnoreMessages(&storagepb.ChecksummedData{})); diff != "" {
t.Errorf("cmp.Diff message: got(-),want(+):\n%s", diff)
}
// Compare the result with the original ReadObjectResponse, without the content
if diff := cmp.Diff(msg, test.resp, protocmp.Transform(), protocmp.IgnoreMessages(&storagepb.ChecksummedData{})); diff != "" {
t.Errorf("cmp.Diff message: got(-),want(+):\n%s", diff)
}

// Compare offsets
if diff := cmp.Diff(offsets, test.offsets, cmp.AllowUnexported(bufferSliceOffsets{})); diff != "" {
t.Errorf("cmp.Diff offsets: got(-),want(+):\n%s", diff)
// Compare offsets
if diff := cmp.Diff(offsets, test.offsets, cmp.AllowUnexported(bufferSliceOffsets{})); diff != "" {
t.Errorf("cmp.Diff offsets: got(-),want(+):\n%s", diff)
}
})
}

})
}
}