diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 6a997befc061..1dfb6f8302be 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -34,7 +34,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" fieldmaskpb "google.golang.org/protobuf/types/known/fieldmaskpb" ) @@ -1500,23 +1499,10 @@ func (w *gRPCWriter) startResumableUpload() error { WriteObjectSpec: spec, CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), } - // TODO: Currently the checksums are only sent on the first message - // of the stream, but in the future, we must also support sending it - // on the *last* message of the stream (instead of the first). - if w.sendCRC32C { - req.ObjectChecksums = &storagepb.ObjectChecksums{ - Crc32C: proto.Uint32(w.attrs.CRC32C), - } - } - if len(w.attrs.MD5) != 0 { - if cs := req.GetObjectChecksums(); cs == nil { - req.ObjectChecksums = &storagepb.ObjectChecksums{ - Md5Hash: w.attrs.MD5, - } - } else { - cs.Md5Hash = w.attrs.MD5 - } - } + // TODO: Currently the checksums are only sent on the request to initialize + // the upload, but in the future, we must also support sending it + // on the *last* message of the stream. + req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs) return run(w.ctx, func() error { upres, err := w.c.raw.StartResumableWrite(w.ctx, req) w.upid = upres.GetUploadId() @@ -1603,6 +1589,11 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st WriteObjectSpec: spec, } req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey) + // For a non-resumable upload, checksums must be sent in this message. + // TODO: Currently the checksums are only sent on the first message + // of the stream, but in the future, we must also support sending it + // on the *last* message of the stream (instead of the first). + req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs) } } diff --git a/storage/integration_test.go b/storage/integration_test.go index 9d9f3158975d..eb5b27557d20 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2695,7 +2695,7 @@ func TestIntegration_HashesOnUpload(t *testing.T) { w.CRC32C = crc32c w.SendCRC32C = true if err := write(w); err != nil { - t.Fatal(err) + t.Error(err) } // If we change the CRC, validation should fail. @@ -2703,14 +2703,14 @@ func TestIntegration_HashesOnUpload(t *testing.T) { w.CRC32C = crc32c + 1 w.SendCRC32C = true if err := write(w); err == nil { - t.Fatal("write with bad CRC32c: want error, got nil") + t.Error("write with bad CRC32c: want error, got nil") } // If we have the wrong CRC but forget to send it, we succeed. w = obj.NewWriter(ctx) w.CRC32C = crc32c + 1 if err := write(w); err != nil { - t.Fatal(err) + t.Error(err) } // MD5 @@ -2719,7 +2719,7 @@ func TestIntegration_HashesOnUpload(t *testing.T) { w = obj.NewWriter(ctx) w.MD5 = md5[:] if err := write(w); err != nil { - t.Fatal(err) + t.Error(err) } // If we change the MD5, validation should fail. @@ -2727,7 +2727,7 @@ func TestIntegration_HashesOnUpload(t *testing.T) { w.MD5 = append([]byte(nil), md5[:]...) w.MD5[0]++ if err := write(w); err == nil { - t.Fatal("write with bad MD5: want error, got nil") + t.Error("write with bad MD5: want error, got nil") } }) } diff --git a/storage/storage.go b/storage/storage.go index b5c10efc8a75..c4a771593249 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -2079,6 +2079,25 @@ func toProtoCommonObjectRequestParams(key []byte) *storagepb.CommonObjectRequest } } +func toProtoChecksums(sendCRC32C bool, attrs *ObjectAttrs) *storagepb.ObjectChecksums { + var checksums *storagepb.ObjectChecksums + if sendCRC32C { + checksums = &storagepb.ObjectChecksums{ + Crc32C: proto.Uint32(attrs.CRC32C), + } + } + if len(attrs.MD5) != 0 { + if checksums == nil { + checksums = &storagepb.ObjectChecksums{ + Md5Hash: attrs.MD5, + } + } else { + checksums.Md5Hash = attrs.MD5 + } + } + return checksums +} + // ServiceAccount fetches the email address of the given project's Google Cloud Storage service account. func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, error) { o := makeStorageOpts(true, c.retry, "")