diff --git a/rest/request.go b/rest/request.go index acf3113615..a1f0a591d3 100644 --- a/rest/request.go +++ b/rest/request.go @@ -34,6 +34,7 @@ import ( "time" "golang.org/x/net/http2" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -116,8 +117,11 @@ type Request struct { subresource string // output - err error - body io.Reader + err error + + // only one of body / bodyBytes may be set. requests using body are not retriable. + body io.Reader + bodyBytes []byte retryFn requestRetryFunc } @@ -443,12 +447,15 @@ func (r *Request) Body(obj interface{}) *Request { return r } glogBody("Request Body", data) - r.body = bytes.NewReader(data) + r.body = nil + r.bodyBytes = data case []byte: glogBody("Request Body", t) - r.body = bytes.NewReader(t) + r.body = nil + r.bodyBytes = t case io.Reader: r.body = t + r.bodyBytes = nil case runtime.Object: // callers may pass typed interface pointers, therefore we must check nil with reflection if reflect.ValueOf(t).IsNil() { @@ -465,7 +472,8 @@ func (r *Request) Body(obj interface{}) *Request { return r } glogBody("Request Body", data) - r.body = bytes.NewReader(data) + r.body = nil + r.bodyBytes = data r.SetHeader("Content-Type", r.c.content.ContentType) default: r.err = fmt.Errorf("unknown type used for body: %+v", obj) @@ -825,9 +833,7 @@ func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) { if err != nil { return nil, err } - if r.body != nil { - req.Body = ioutil.NopCloser(r.body) - } + resp, err := client.Do(req) updateURLMetrics(ctx, r, resp, err) retry.After(ctx, r, resp, err) @@ -889,8 +895,20 @@ func (r *Request) requestPreflightCheck() error { } func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) { + var body io.Reader + switch { + case r.body != nil && r.bodyBytes != nil: + return nil, fmt.Errorf("cannot set both body and bodyBytes") + case r.body != nil: + body = r.body + case r.bodyBytes != nil: + // Create a new reader specifically for this request. + // Giving each request a dedicated reader allows retries to avoid races resetting the request body. + body = bytes.NewReader(r.bodyBytes) + } + url := r.URL().String() - req, err := http.NewRequest(r.verb, url, r.body) + req, err := http.NewRequest(r.verb, url, body) if err != nil { return nil, err } diff --git a/rest/request_test.go b/rest/request_test.go index 2773d6aa35..31a3e496a1 100644 --- a/rest/request_test.go +++ b/rest/request_test.go @@ -1123,42 +1123,6 @@ func TestRequestWatch(t *testing.T) { }, Empty: true, }, - { - name: "max retries 1, server returns a retry-after response, request body seek error", - Request: &Request{ - body: &readSeeker{err: io.EOF}, - c: &RESTClient{ - base: &url.URL{}, - }, - }, - maxRetries: 1, - attemptsExpected: 1, - serverReturns: []responseErr{ - {response: retryAfterResponse(), err: nil}, - }, - Err: true, - ErrFn: func(err error) bool { - return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF") - }, - }, - { - name: "max retries 1, server returns a retryable error, request body seek error", - Request: &Request{ - body: &readSeeker{err: io.EOF}, - c: &RESTClient{ - base: &url.URL{}, - }, - }, - maxRetries: 1, - attemptsExpected: 1, - serverReturns: []responseErr{ - {response: nil, err: io.EOF}, - }, - Err: true, - ErrFn: func(err error) bool { - return !apierrors.IsInternalError(err) - }, - }, { name: "max retries 2, server always returns a response with Retry-After header", Request: &Request{ @@ -1320,7 +1284,7 @@ func TestRequestStream(t *testing.T) { }, }, { - name: "max retries 1, server returns a retry-after response, request body seek error", + name: "max retries 1, server returns a retry-after response, non-bytes request, no retry", Request: &Request{ body: &readSeeker{err: io.EOF}, c: &RESTClient{ @@ -1333,9 +1297,6 @@ func TestRequestStream(t *testing.T) { {response: retryAfterResponse(), err: nil}, }, Err: true, - ErrFn: func(err error) bool { - return !apierrors.IsInternalError(err) && strings.Contains(err.Error(), "failed to reset the request body while retrying a request: EOF") - }, }, { name: "max retries 2, server always returns a response with Retry-After header", @@ -2017,20 +1978,24 @@ func TestBody(t *testing.T) { } } - if r.body == nil { + req, err := r.newHTTPRequest(context.Background()) + if err != nil { + t.Fatal(err) + } + if req.Body == nil { if len(tt.expected) != 0 { - t.Errorf("%d: r.body = %q; want %q", i, r.body, tt.expected) + t.Errorf("%d: req.Body = %q; want %q", i, req.Body, tt.expected) } continue } buf := make([]byte, len(tt.expected)) - if _, err := r.body.Read(buf); err != nil { - t.Errorf("%d: r.body.Read error: %v", i, err) + if _, err := req.Body.Read(buf); err != nil { + t.Errorf("%d: req.Body.Read error: %v", i, err) continue } body := string(buf) if body != tt.expected { - t.Errorf("%d: r.body = %q; want %q", i, body, tt.expected) + t.Errorf("%d: req.Body = %q; want %q", i, body, tt.expected) } } } @@ -2641,6 +2606,7 @@ func TestRequestWithRetry(t *testing.T) { tests := []struct { name string body io.Reader + bodyBytes []byte serverReturns responseErr errExpected error errContains string @@ -2648,52 +2614,52 @@ func TestRequestWithRetry(t *testing.T) { roundTripInvokedExpected int }{ { - name: "server returns retry-after response, request body is not io.Seeker, retry goes ahead", - body: ioutil.NopCloser(bytes.NewReader([]byte{})), + name: "server returns retry-after response, no request body, retry goes ahead", + bodyBytes: nil, serverReturns: responseErr{response: retryAfterResponse(), err: nil}, errExpected: nil, transformFuncInvokedExpected: 1, roundTripInvokedExpected: 2, }, { - name: "server returns retry-after response, request body Seek returns error, retry aborted", - body: &readSeeker{err: io.EOF}, + name: "server returns retry-after response, bytes request body, retry goes ahead", + bodyBytes: []byte{}, serverReturns: responseErr{response: retryAfterResponse(), err: nil}, errExpected: nil, - transformFuncInvokedExpected: 0, - roundTripInvokedExpected: 1, + transformFuncInvokedExpected: 1, + roundTripInvokedExpected: 2, }, { - name: "server returns retry-after response, request body Seek returns no error, retry goes ahead", - body: &readSeeker{err: nil}, + name: "server returns retry-after response, opaque request body, retry aborted", + body: &readSeeker{}, serverReturns: responseErr{response: retryAfterResponse(), err: nil}, errExpected: nil, transformFuncInvokedExpected: 1, - roundTripInvokedExpected: 2, + roundTripInvokedExpected: 1, }, { - name: "server returns retryable err, request body is not io.Seek, retry goes ahead", - body: ioutil.NopCloser(bytes.NewReader([]byte{})), + name: "server returns retryable err, no request body, retry goes ahead", + bodyBytes: nil, serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF}, errExpected: io.ErrUnexpectedEOF, transformFuncInvokedExpected: 0, roundTripInvokedExpected: 2, }, { - name: "server returns retryable err, request body Seek returns error, retry aborted", - body: &readSeeker{err: io.EOF}, + name: "server returns retryable err, bytes request body, retry goes ahead", + bodyBytes: []byte{}, serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF}, - errContains: "failed to reset the request body while retrying a request: EOF", + errExpected: io.ErrUnexpectedEOF, transformFuncInvokedExpected: 0, - roundTripInvokedExpected: 1, + roundTripInvokedExpected: 2, }, { - name: "server returns retryable err, request body Seek returns no err, retry goes ahead", - body: &readSeeker{err: nil}, + name: "server returns retryable err, opaque request body, retry aborted", + body: &readSeeker{}, serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF}, errExpected: io.ErrUnexpectedEOF, transformFuncInvokedExpected: 0, - roundTripInvokedExpected: 2, + roundTripInvokedExpected: 1, }, } @@ -2865,7 +2831,8 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont tests := []struct { name string verb string - body func() io.Reader + body io.Reader + bodyBytes []byte maxRetries int serverReturns []responseErr @@ -2875,7 +2842,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont { name: "server always returns retry-after response", verb: "GET", - body: func() io.Reader { return bytes.NewReader([]byte{}) }, + bodyBytes: []byte{}, maxRetries: 2, serverReturns: []responseErr{ {response: retryAfterResponse(), err: nil}, @@ -2903,7 +2870,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont { name: "server always returns retryable error", verb: "GET", - body: func() io.Reader { return bytes.NewReader([]byte{}) }, + bodyBytes: []byte{}, maxRetries: 2, serverReturns: []responseErr{ {response: nil, err: io.EOF}, @@ -2932,7 +2899,7 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont { name: "server returns success on the final retry", verb: "GET", - body: func() io.Reader { return bytes.NewReader([]byte{}) }, + bodyBytes: []byte{}, maxRetries: 2, serverReturns: []responseErr{ {response: retryAfterResponse(), err: nil}, @@ -2979,13 +2946,10 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont return resp, test.serverReturns[attempts].err }) - reqCountGot := newCount() - reqRecorder := newReadTracker(reqCountGot) - reqRecorder.delegated = test.body() - req := &Request{ - verb: test.verb, - body: reqRecorder, + verb: test.verb, + body: test.body, + bodyBytes: test.bodyBytes, c: &RESTClient{ content: defaultContentConfig(), Client: client, @@ -3005,9 +2969,6 @@ func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Cont t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts) } - if !reflect.DeepEqual(expected.reqCount.seeks, reqCountGot.seeks) { - t.Errorf("Expected request body to have seek invocation: %v, but got: %v", expected.reqCount.seeks, reqCountGot.seeks) - } if expected.respCount.closes != respCountGot.getCloseCount() { t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount()) } @@ -3264,8 +3225,8 @@ func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc t.Fatalf("Wrong test setup - did not find expected for: %s", key) } req := &Request{ - verb: "GET", - body: bytes.NewReader([]byte{}), + verb: "GET", + bodyBytes: []byte{}, c: &RESTClient{ base: base, content: defaultContentConfig(), @@ -3400,8 +3361,8 @@ func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context. t.Fatalf("Wrong test setup - did not find expected for: %s", key) } req := &Request{ - verb: "GET", - body: bytes.NewReader([]byte{}), + verb: "GET", + bodyBytes: []byte{}, c: &RESTClient{ base: base, content: defaultContentConfig(), @@ -3575,8 +3536,8 @@ func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r t.Fatalf("Failed to create new HTTP request - %v", err) } req := &Request{ - verb: "GET", - body: bytes.NewReader([]byte{}), + verb: "GET", + bodyBytes: []byte{}, c: &RESTClient{ base: base, content: defaultContentConfig(), @@ -3811,104 +3772,3 @@ func TestTransportConcurrency(t *testing.T) { }) } } - -// TODO: see if we can consolidate the other trackers into one. -type requestBodyTracker struct { - io.ReadSeeker - f func(string) -} - -func (t *requestBodyTracker) Read(p []byte) (int, error) { - t.f("Request.Body.Read") - return t.ReadSeeker.Read(p) -} - -func (t *requestBodyTracker) Seek(offset int64, whence int) (int64, error) { - t.f("Request.Body.Seek") - return t.ReadSeeker.Seek(offset, whence) -} - -type responseBodyTracker struct { - io.ReadCloser - f func(string) -} - -func (t *responseBodyTracker) Read(p []byte) (int, error) { - t.f("Response.Body.Read") - return t.ReadCloser.Read(p) -} - -func (t *responseBodyTracker) Close() error { - t.f("Response.Body.Close") - return t.ReadCloser.Close() -} - -type recorder struct { - order []string -} - -func (r *recorder) record(call string) { - r.order = append(r.order, call) -} - -func TestRequestBodyResetOrder(t *testing.T) { - recorder := &recorder{} - respBodyTracker := &responseBodyTracker{ - ReadCloser: nil, // the server will fill it - f: recorder.record, - } - - var attempts int - client := clientForFunc(func(req *http.Request) (*http.Response, error) { - defer func() { - attempts++ - }() - - // read the request body. - ioutil.ReadAll(req.Body) - - // first attempt, we send a retry-after - if attempts == 0 { - resp := retryAfterResponse() - respBodyTracker.ReadCloser = ioutil.NopCloser(bytes.NewReader([]byte{})) - resp.Body = respBodyTracker - return resp, nil - } - - return &http.Response{StatusCode: http.StatusOK}, nil - }) - - reqBodyTracker := &requestBodyTracker{ - ReadSeeker: bytes.NewReader([]byte{}), // empty body ensures one Read operation at most. - f: recorder.record, - } - req := &Request{ - verb: "POST", - body: reqBodyTracker, - c: &RESTClient{ - content: defaultContentConfig(), - Client: client, - }, - backoff: &noSleepBackOff{}, - maxRetries: 1, - retryFn: defaultRequestRetryFn, - } - - req.Do(context.Background()) - - expected := []string{ - // 1st attempt: the server handler reads the request body - "Request.Body.Read", - // the server sends a retry-after, client reads the - // response body, and closes it - "Response.Body.Read", - "Response.Body.Close", - // client retry logic seeks to the beginning of the request body - "Request.Body.Seek", - // 2nd attempt: the server reads the request body - "Request.Body.Read", - } - if !reflect.DeepEqual(expected, recorder.order) { - t.Errorf("Expected invocation request and response body operations for retry do not match: %s", cmp.Diff(expected, recorder.order)) - } -} diff --git a/rest/with_retry.go b/rest/with_retry.go index bdcc6f3a98..ab26947540 100644 --- a/rest/with_retry.go +++ b/rest/with_retry.go @@ -154,6 +154,11 @@ func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq * return false } + if restReq.body != nil { + // we have an opaque reader, we can't safely reset it + return false + } + r.attempts++ r.retryAfter = &RetryAfter{Attempt: r.attempts} if r.attempts > r.maxRetries { @@ -210,18 +215,6 @@ func (r *withRetry) Before(ctx context.Context, request *Request) error { return nil } - // At this point we've made atleast one attempt, post which the response - // body should have been fully read and closed in order for it to be safe - // to reset the request body before we reconnect, in order for us to reuse - // the same TCP connection. - if seeker, ok := request.body.(io.Seeker); ok && request.body != nil { - if _, err := seeker.Seek(0, io.SeekStart); err != nil { - err = fmt.Errorf("failed to reset the request body while retrying a request: %v", err) - r.trackPreviousError(err) - return err - } - } - // if we are here, we have made attempt(s) at least once before. if request.backoff != nil { delay := request.backoff.CalculateBackoff(url) diff --git a/rest/with_retry_test.go b/rest/with_retry_test.go index 4dc029b5d8..e9cdd9e547 100644 --- a/rest/with_retry_test.go +++ b/rest/with_retry_test.go @@ -17,7 +17,6 @@ limitations under the License. package rest import ( - "bytes" "context" "errors" "fmt" @@ -212,7 +211,7 @@ func TestIsNextRetry(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { restReq := &Request{ - body: bytes.NewReader([]byte{}), + bodyBytes: []byte{}, c: &RESTClient{ base: &url.URL{}, },