diff --git a/storage/reader.go b/storage/reader.go index 6af5a94e0aac..68d9ca11ab26 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "net/http" "net/url" + "reflect" "strconv" "strings" @@ -74,11 +75,6 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) return nil, err } req = withContext(req, ctx) - if length < 0 && offset > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) - } else if length > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1)) - } if o.userProject != "" { req.Header.Set("X-Goog-User-Project", o.userProject) } @@ -88,39 +84,57 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil { return nil, err } - var res *http.Response - err = runWithRetry(ctx, func() error { - res, err = o.c.hc.Do(req) - if err != nil { - return err - } - if res.StatusCode == http.StatusNotFound { - res.Body.Close() - return ErrObjectNotExist + + // Define a function that initiates a Read with offset and length, assuming we + // have already read seen bytes. + reopen := func(seen int64) (*http.Response, error) { + start := offset + seen + if length < 0 && start > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start)) + } else if length > 0 { + // The end character isn't affected by how many bytes we've seen. + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1)) } - if res.StatusCode < 200 || res.StatusCode > 299 { - body, _ := ioutil.ReadAll(res.Body) - res.Body.Close() - return &googleapi.Error{ - Code: res.StatusCode, - Header: res.Header, - Body: string(body), + var res *http.Response + err = runWithRetry(ctx, func() error { + res, err = o.c.hc.Do(req) + if err != nil { + return err } + if res.StatusCode == http.StatusNotFound { + res.Body.Close() + return ErrObjectNotExist + } + if res.StatusCode < 200 || res.StatusCode > 299 { + body, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return &googleapi.Error{ + Code: res.StatusCode, + Header: res.Header, + Body: string(body), + } + } + if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent { + res.Body.Close() + return errors.New("storage: partial request not satisfied") + } + return nil + }) + if err != nil { + return nil, err } - if offset > 0 && length != 0 && res.StatusCode != http.StatusPartialContent { - res.Body.Close() - return errors.New("storage: partial request not satisfied") - } - return nil - }) + return res, nil + } + + res, err := reopen(0) if err != nil { return nil, err } - var size int64 // total size of object, even if a range was requested. if res.StatusCode == http.StatusPartialContent { cr := strings.TrimSpace(res.Header.Get("Content-Range")) if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") { + return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) } size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64) @@ -155,6 +169,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) cacheControl: res.Header.Get("Cache-Control"), wantCRC: crc, checkCRC: checkCRC, + reopen: reopen, }, nil } @@ -180,15 +195,16 @@ var emptyBody = ioutil.NopCloser(strings.NewReader("")) // the stored CRC, returning an error from Read if there is a mismatch. This integrity check // is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding. type Reader struct { - body io.ReadCloser - remain, size int64 - contentType string - contentEncoding string - cacheControl string - checkCRC bool // should we check the CRC? - wantCRC uint32 // the CRC32c value the server sent in the header - gotCRC uint32 // running crc - checkedCRC bool // did we check the CRC? (For tests.) + body io.ReadCloser + seen, remain, size int64 + contentType string + contentEncoding string + cacheControl string + checkCRC bool // should we check the CRC? + wantCRC uint32 // the CRC32c value the server sent in the header + gotCRC uint32 // running crc + checkedCRC bool // did we check the CRC? (For tests.) + reopen func(seen int64) (*http.Response, error) } // Close closes the Reader. It must be called when done reading. @@ -197,7 +213,7 @@ func (r *Reader) Close() error { } func (r *Reader) Read(p []byte) (int, error) { - n, err := r.body.Read(p) + n, err := r.readWithRetry(p) if r.remain != -1 { r.remain -= int64(n) } @@ -217,6 +233,35 @@ func (r *Reader) Read(p []byte) (int, error) { return n, err } +func (r *Reader) readWithRetry(p []byte) (int, error) { + n := 0 + for len(p[n:]) > 0 { + m, err := r.body.Read(p[n:]) + n += m + r.seen += int64(m) + if !shouldRetryRead(err) { + return n, err + } + // Read failed, but we will try again. Send a ranged read request that takes + // into account the number of bytes we've already seen. + res, err := r.reopen(r.seen) + if err != nil { + // reopen already retries + return n, err + } + r.body.Close() + r.body = res.Body + } + return n, nil +} + +func shouldRetryRead(err error) bool { + if err == nil { + return false + } + return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2") +} + // Size returns the size of the object in bytes. // The returned value is always the same and is not affected by // calls to Read or Close. diff --git a/storage/reader_test.go b/storage/reader_test.go index de734c636745..b93286d8a275 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -15,7 +15,9 @@ package storage import ( + "errors" "fmt" + "io" "io/ioutil" "net/http" "strconv" @@ -110,3 +112,180 @@ func handleRangeRead(w http.ResponseWriter, r *http.Request) { panic(err) } } + +func TestRangeReaderRetry(t *testing.T) { + retryErr := errors.New("blah blah INTERNAL_ERROR") + readBytes := []byte(readData) + hc, close := newTestServer(handleRangeRead) + defer close() + ctx := context.Background() + c, err := NewClient(ctx, option.WithHTTPClient(hc)) + if err != nil { + t.Fatal(err) + } + + obj := c.Bucket("b").Object("o") + for i, test := range []struct { + offset, length int64 + bodies []fakeReadCloser + want string + }{ + { + offset: 0, + length: -1, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{10}, err: io.EOF}, + }, + want: readData, + }, + { + offset: 0, + length: -1, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes[3:], counts: []int{5, 2}, err: io.EOF}, + }, + want: readData, + }, + { + offset: 0, + length: -1, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{5}, err: retryErr}, + {data: readBytes[5:], counts: []int{1, 3}, err: retryErr}, + {data: readBytes[9:], counts: []int{1}, err: io.EOF}, + }, + want: readData, + }, + { + offset: 0, + length: 5, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes[3:], counts: []int{2}, err: io.EOF}, + }, + want: readData[:5], + }, + { + offset: 1, + length: 5, + bodies: []fakeReadCloser{ + {data: readBytes, counts: []int{3}, err: retryErr}, + {data: readBytes[3:], counts: []int{2}, err: io.EOF}, + }, + want: readData[:5], + }, + { + offset: 1, + length: 3, + bodies: []fakeReadCloser{ + {data: readBytes[1:], counts: []int{1}, err: retryErr}, + {data: readBytes[2:], counts: []int{2}, err: io.EOF}, + }, + want: readData[1:4], + }, + { + offset: 4, + length: -1, + bodies: []fakeReadCloser{ + {data: readBytes[4:], counts: []int{1}, err: retryErr}, + {data: readBytes[5:], counts: []int{4}, err: retryErr}, + {data: readBytes[9:], counts: []int{1}, err: io.EOF}, + }, + want: readData[4:], + }, + } { + r, err := obj.NewRangeReader(ctx, test.offset, test.length) + if err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + r.body = &test.bodies[0] + b := 0 + r.reopen = func(int64) (*http.Response, error) { + b++ + return &http.Response{Body: &test.bodies[b]}, nil + } + buf := make([]byte, len(readData)/2) + var gotb []byte + for { + n, err := r.Read(buf) + gotb = append(gotb, buf[:n]...) + if err == io.EOF { + break + } + if err != nil { + t.Fatalf("#%d: %v", i, err) + } + } + if err != nil { + t.Errorf("#%d: %v", i, err) + continue + } + if got := string(gotb); got != test.want { + t.Errorf("#%d: got %q, want %q", i, got, test.want) + } + } +} + +type fakeReadCloser struct { + data []byte + counts []int // how much of data to deliver on each read + err error // error to return with last count + + d int // current position in data + c int // current position in counts +} + +func (f *fakeReadCloser) Close() error { + return nil +} + +func (f *fakeReadCloser) Read(buf []byte) (int, error) { + i := f.c + n := 0 + if i < len(f.counts) { + n = f.counts[i] + } + var err error + if i >= len(f.counts)-1 { + err = f.err + } + copy(buf, f.data[f.d:f.d+n]) + if len(buf) < n { + n = len(buf) + f.counts[i] -= n + err = nil + } else { + f.c++ + } + f.d += n + return n, err +} + +func TestFakeReadCloser(t *testing.T) { + e := errors.New("") + f := &fakeReadCloser{ + data: []byte(readData), + counts: []int{1, 2, 3}, + err: e, + } + wants := []string{"0", "12", "345"} + buf := make([]byte, 10) + for i := 0; i < 3; i++ { + n, err := f.Read(buf) + if got, want := n, f.counts[i]; got != want { + t.Fatalf("i=%d: got %d, want %d", i, got, want) + } + var wantErr error + if i == 2 { + wantErr = e + } + if err != wantErr { + t.Fatalf("i=%d: got error %v, want %v", i, err, wantErr) + } + if got, want := string(buf[:n]), wants[i]; got != want { + t.Fatalf("i=%d: got %q, want %q", i, got, want) + } + } +}