Skip to content

Commit

Permalink
storage: retry reads
Browse files Browse the repository at this point in the history
If we get a retryable error when reading an object, issue
another request for the object's data, using a range that
begins where we stopped reading.

For now, the only retryable error is the internal stream error
that has been seen in #784.

Change-Id: Idb10b1859bb276b301c3ccb93b0b8bfc84510354
Reviewed-on: https://code-review.googlesource.com/25630
Reviewed-by: Jean de Klerk <[email protected]>
Reviewed-by: Brad Fitzpatrick <[email protected]>
  • Loading branch information
jba committed Apr 17, 2018
1 parent 08ad5b8 commit d19004d
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 38 deletions.
121 changes: 83 additions & 38 deletions storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"

Expand Down Expand Up @@ -74,11 +75,6 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
return nil, err
}
req = withContext(req, ctx)
if length < 0 && offset > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
} else if length > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
}
if o.userProject != "" {
req.Header.Set("X-Goog-User-Project", o.userProject)
}
Expand All @@ -88,39 +84,57 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
return nil, err
}
var res *http.Response
err = runWithRetry(ctx, func() error {
res, err = o.c.hc.Do(req)
if err != nil {
return err
}
if res.StatusCode == http.StatusNotFound {
res.Body.Close()
return ErrObjectNotExist

// Define a function that initiates a Read with offset and length, assuming we
// have already read seen bytes.
reopen := func(seen int64) (*http.Response, error) {
start := offset + seen
if length < 0 && start > 0 {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
} else if length > 0 {
// The end character isn't affected by how many bytes we've seen.
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
}
if res.StatusCode < 200 || res.StatusCode > 299 {
body, _ := ioutil.ReadAll(res.Body)
res.Body.Close()
return &googleapi.Error{
Code: res.StatusCode,
Header: res.Header,
Body: string(body),
var res *http.Response
err = runWithRetry(ctx, func() error {
res, err = o.c.hc.Do(req)
if err != nil {
return err
}
if res.StatusCode == http.StatusNotFound {
res.Body.Close()
return ErrObjectNotExist
}
if res.StatusCode < 200 || res.StatusCode > 299 {
body, _ := ioutil.ReadAll(res.Body)
res.Body.Close()
return &googleapi.Error{
Code: res.StatusCode,
Header: res.Header,
Body: string(body),
}
}
if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
res.Body.Close()
return errors.New("storage: partial request not satisfied")
}
return nil
})
if err != nil {
return nil, err
}
if offset > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
res.Body.Close()
return errors.New("storage: partial request not satisfied")
}
return nil
})
return res, nil
}

res, err := reopen(0)
if err != nil {
return nil, err
}

var size int64 // total size of object, even if a range was requested.
if res.StatusCode == http.StatusPartialContent {
cr := strings.TrimSpace(res.Header.Get("Content-Range"))
if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {

return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
}
size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
Expand Down Expand Up @@ -155,6 +169,7 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64)
cacheControl: res.Header.Get("Cache-Control"),
wantCRC: crc,
checkCRC: checkCRC,
reopen: reopen,
}, nil
}

Expand All @@ -180,15 +195,16 @@ var emptyBody = ioutil.NopCloser(strings.NewReader(""))
// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
type Reader struct {
body io.ReadCloser
remain, size int64
contentType string
contentEncoding string
cacheControl string
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
checkedCRC bool // did we check the CRC? (For tests.)
body io.ReadCloser
seen, remain, size int64
contentType string
contentEncoding string
cacheControl string
checkCRC bool // should we check the CRC?
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc
checkedCRC bool // did we check the CRC? (For tests.)
reopen func(seen int64) (*http.Response, error)
}

// Close closes the Reader. It must be called when done reading.
Expand All @@ -197,7 +213,7 @@ func (r *Reader) Close() error {
}

func (r *Reader) Read(p []byte) (int, error) {
n, err := r.body.Read(p)
n, err := r.readWithRetry(p)
if r.remain != -1 {
r.remain -= int64(n)
}
Expand All @@ -217,6 +233,35 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) readWithRetry(p []byte) (int, error) {
n := 0
for len(p[n:]) > 0 {
m, err := r.body.Read(p[n:])
n += m
r.seen += int64(m)
if !shouldRetryRead(err) {
return n, err
}
// Read failed, but we will try again. Send a ranged read request that takes
// into account the number of bytes we've already seen.
res, err := r.reopen(r.seen)
if err != nil {
// reopen already retries
return n, err
}
r.body.Close()
r.body = res.Body
}
return n, nil
}

func shouldRetryRead(err error) bool {
if err == nil {
return false
}
return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
}

// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
Expand Down
179 changes: 179 additions & 0 deletions storage/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package storage

import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
Expand Down Expand Up @@ -110,3 +112,180 @@ func handleRangeRead(w http.ResponseWriter, r *http.Request) {
panic(err)
}
}

func TestRangeReaderRetry(t *testing.T) {
retryErr := errors.New("blah blah INTERNAL_ERROR")
readBytes := []byte(readData)
hc, close := newTestServer(handleRangeRead)
defer close()
ctx := context.Background()
c, err := NewClient(ctx, option.WithHTTPClient(hc))
if err != nil {
t.Fatal(err)
}

obj := c.Bucket("b").Object("o")
for i, test := range []struct {
offset, length int64
bodies []fakeReadCloser
want string
}{
{
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{10}, err: io.EOF},
},
want: readData,
},
{
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
},
want: readData,
},
{
offset: 0,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{5}, err: retryErr},
{data: readBytes[5:], counts: []int{1, 3}, err: retryErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData,
},
{
offset: 0,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
},
{
offset: 1,
length: 5,
bodies: []fakeReadCloser{
{data: readBytes, counts: []int{3}, err: retryErr},
{data: readBytes[3:], counts: []int{2}, err: io.EOF},
},
want: readData[:5],
},
{
offset: 1,
length: 3,
bodies: []fakeReadCloser{
{data: readBytes[1:], counts: []int{1}, err: retryErr},
{data: readBytes[2:], counts: []int{2}, err: io.EOF},
},
want: readData[1:4],
},
{
offset: 4,
length: -1,
bodies: []fakeReadCloser{
{data: readBytes[4:], counts: []int{1}, err: retryErr},
{data: readBytes[5:], counts: []int{4}, err: retryErr},
{data: readBytes[9:], counts: []int{1}, err: io.EOF},
},
want: readData[4:],
},
} {
r, err := obj.NewRangeReader(ctx, test.offset, test.length)
if err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
r.body = &test.bodies[0]
b := 0
r.reopen = func(int64) (*http.Response, error) {
b++
return &http.Response{Body: &test.bodies[b]}, nil
}
buf := make([]byte, len(readData)/2)
var gotb []byte
for {
n, err := r.Read(buf)
gotb = append(gotb, buf[:n]...)
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("#%d: %v", i, err)
}
}
if err != nil {
t.Errorf("#%d: %v", i, err)
continue
}
if got := string(gotb); got != test.want {
t.Errorf("#%d: got %q, want %q", i, got, test.want)
}
}
}

type fakeReadCloser struct {
data []byte
counts []int // how much of data to deliver on each read
err error // error to return with last count

d int // current position in data
c int // current position in counts
}

func (f *fakeReadCloser) Close() error {
return nil
}

func (f *fakeReadCloser) Read(buf []byte) (int, error) {
i := f.c
n := 0
if i < len(f.counts) {
n = f.counts[i]
}
var err error
if i >= len(f.counts)-1 {
err = f.err
}
copy(buf, f.data[f.d:f.d+n])
if len(buf) < n {
n = len(buf)
f.counts[i] -= n
err = nil
} else {
f.c++
}
f.d += n
return n, err
}

func TestFakeReadCloser(t *testing.T) {
e := errors.New("")
f := &fakeReadCloser{
data: []byte(readData),
counts: []int{1, 2, 3},
err: e,
}
wants := []string{"0", "12", "345"}
buf := make([]byte, 10)
for i := 0; i < 3; i++ {
n, err := f.Read(buf)
if got, want := n, f.counts[i]; got != want {
t.Fatalf("i=%d: got %d, want %d", i, got, want)
}
var wantErr error
if i == 2 {
wantErr = e
}
if err != wantErr {
t.Fatalf("i=%d: got error %v, want %v", i, err, wantErr)
}
if got, want := string(buf[:n]), wants[i]; got != want {
t.Fatalf("i=%d: got %q, want %q", i, got, want)
}
}
}

0 comments on commit d19004d

Please sign in to comment.