Skip to content

Commit

Permalink
chore(storage): fix checksum for grpc simple write (#7195)
Browse files Browse the repository at this point in the history
In #7193 I moved the checksum logic to send on the request that
initiates the upload, but I inadvertently removed sending the
checksum for one-shot uploads. This PR restores the checksum
logic for simple uploads.

Fixes #7194
  • Loading branch information
tritone authored Dec 28, 2022
1 parent a7720e4 commit 3592917
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 23 deletions.
27 changes: 9 additions & 18 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
)

Expand Down Expand Up @@ -1500,23 +1499,10 @@ func (w *gRPCWriter) startResumableUpload() error {
WriteObjectSpec: spec,
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
}
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
if w.sendCRC32C {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(w.attrs.CRC32C),
}
}
if len(w.attrs.MD5) != 0 {
if cs := req.GetObjectChecksums(); cs == nil {
req.ObjectChecksums = &storagepb.ObjectChecksums{
Md5Hash: w.attrs.MD5,
}
} else {
cs.Md5Hash = w.attrs.MD5
}
}
// TODO: Currently the checksums are only sent on the request to initialize
// the upload, but in the future, we must also support sending it
// on the *last* message of the stream.
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
return run(w.ctx, func() error {
upres, err := w.c.raw.StartResumableWrite(w.ctx, req)
w.upid = upres.GetUploadId()
Expand Down Expand Up @@ -1603,6 +1589,11 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
WriteObjectSpec: spec,
}
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
// For a non-resumable upload, checksums must be sent in this message.
// TODO: Currently the checksums are only sent on the first message
// of the stream, but in the future, we must also support sending it
// on the *last* message of the stream (instead of the first).
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
}

}
Expand Down
10 changes: 5 additions & 5 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2695,22 +2695,22 @@ func TestIntegration_HashesOnUpload(t *testing.T) {
w.CRC32C = crc32c
w.SendCRC32C = true
if err := write(w); err != nil {
t.Fatal(err)
t.Error(err)
}

// If we change the CRC, validation should fail.
w = obj.NewWriter(ctx)
w.CRC32C = crc32c + 1
w.SendCRC32C = true
if err := write(w); err == nil {
t.Fatal("write with bad CRC32c: want error, got nil")
t.Error("write with bad CRC32c: want error, got nil")
}

// If we have the wrong CRC but forget to send it, we succeed.
w = obj.NewWriter(ctx)
w.CRC32C = crc32c + 1
if err := write(w); err != nil {
t.Fatal(err)
t.Error(err)
}

// MD5
Expand All @@ -2719,15 +2719,15 @@ func TestIntegration_HashesOnUpload(t *testing.T) {
w = obj.NewWriter(ctx)
w.MD5 = md5[:]
if err := write(w); err != nil {
t.Fatal(err)
t.Error(err)
}

// If we change the MD5, validation should fail.
w = obj.NewWriter(ctx)
w.MD5 = append([]byte(nil), md5[:]...)
w.MD5[0]++
if err := write(w); err == nil {
t.Fatal("write with bad MD5: want error, got nil")
t.Error("write with bad MD5: want error, got nil")
}
})
}
Expand Down
19 changes: 19 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,25 @@ func toProtoCommonObjectRequestParams(key []byte) *storagepb.CommonObjectRequest
}
}

func toProtoChecksums(sendCRC32C bool, attrs *ObjectAttrs) *storagepb.ObjectChecksums {
var checksums *storagepb.ObjectChecksums
if sendCRC32C {
checksums = &storagepb.ObjectChecksums{
Crc32C: proto.Uint32(attrs.CRC32C),
}
}
if len(attrs.MD5) != 0 {
if checksums == nil {
checksums = &storagepb.ObjectChecksums{
Md5Hash: attrs.MD5,
}
} else {
checksums.Md5Hash = attrs.MD5
}
}
return checksums
}

// ServiceAccount fetches the email address of the given project's Google Cloud Storage service account.
func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, error) {
o := makeStorageOpts(true, c.retry, "")
Expand Down

0 comments on commit 3592917

Please sign in to comment.