diff --git a/CHANGELOG.md b/CHANGELOG.md index af2c04603..15f99fe16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Adds support for OpenSearch 2.14 ([#552](https://github.com/opensearch-project/opensearch-go/pull/552)) - Adds the `Caches` field to Node stats ([#572](https://github.com/opensearch-project/opensearch-go/pull/572)) - Adds the `SeqNo` and `PrimaryTerm` fields in `SearchHit` ([#574](https://github.com/opensearch-project/opensearch-go/pull/574)) +- Adds guide on configuring the client with retry and backoff ([#540](https://github.com/opensearch-project/opensearch-go/pull/540)) ### Changed - Security roles get response struct has its own sub structs without omitempty ([#572](https://github.com/opensearch-project/opensearch-go/pull/572)) @@ -26,6 +27,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Adjust security Role struct for FLS from string to []string ([#572](https://github.com/opensearch-project/opensearch-go/pull/572)) - Fixes wrong response parsing for indices mapping and recovery ([#572](https://github.com/opensearch-project/opensearch-go/pull/572)) - Fixes wrong response parsing for security get requests ([#572](https://github.com/opensearch-project/opensearch-go/pull/572)) +- Fixes opensearchtransport ignores request context cancellation when `retryBackoff` is configured ([#540](https://github.com/opensearch-project/opensearch-go/pull/540)) +- Fixes opensearchtransport sleeps unexpectedly after the last retry ([#540](https://github.com/opensearch-project/opensearch-go/pull/540)) ### Security diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 012ccc9a4..03294ea4f 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -396,3 +396,4 @@ func getCredentialProvider(accessKey, secretAccessKey, token string) aws.Credent - [Advanced Index Actions](guides/advanced_index_actions.md) - [Index Templates](guides/index_template.md) - [Data Streams](guides/data_streams.md) +- [Retry and Backoff](guides/retry_backoff.md) diff --git a/guides/retry_backoff.md b/guides/retry_backoff.md new file mode 100644 index 000000000..aa11f5664 --- /dev/null +++ b/guides/retry_backoff.md @@ -0,0 +1,53 @@ +# Configure the client with retry and backoff + +The OpenSearch client will retry on certain errors, such as `503 Service Unavailable`. And it will retry right after receiving the error. You can customize the retry behavior. + +## Setup + +Let's create a client instance: + +```go +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/opensearch-project/opensearch-go/v4/opensearchapi" +) + +func main() { + if err := example(); err != nil { + fmt.Println(fmt.Sprintf("Error: %s", err)) + os.Exit(1) + } +} + +func example() error { + client, err := opensearchapi.NewClient(opensearchapi.Config{ + // Retry on 429 TooManyRequests statuses as well (502, 503, 504 are default values) + RetryOnStatus: []int{502, 503, 504, 429}, + + // A simple incremental backoff function + RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }, + + // Retry up to 5 attempts (1 initial + 4 retries) + MaxRetries: 4, + }) + if err != nil { + return err + } +``` + +If you do not want to wait too long when the server is not responsive, then control the total duration of the requests with a context. The on-going request and the backoff will be canceled when the context is canceled. + +```go + rootCtx := context.Background() + ctx := context.WithTimeout(rootCtx, time.Second) + + infoResp, err := client.Info(ctx, nil) + return nil +} +``` diff --git a/opensearchtransport/opensearchtransport.go b/opensearchtransport/opensearchtransport.go index c93b2cd5c..8e492596c 100644 --- a/opensearchtransport/opensearchtransport.go +++ b/opensearchtransport/opensearchtransport.go @@ -382,8 +382,19 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { } // Delay the retry if a backoff function is configured - if c.retryBackoff != nil { - time.Sleep(c.retryBackoff(i + 1)) + if c.retryBackoff != nil && i < c.maxRetries { + var cancelled bool + timer := time.NewTimer(c.retryBackoff(i + 1)) + select { + case <-req.Context().Done(): + timer.Stop() + err = req.Context().Err() + cancelled = true + case <-timer.C: + } + if cancelled { + break + } } } // Read, close and replace the http response body to close the connection diff --git a/opensearchtransport/opensearchtransport_internal_test.go b/opensearchtransport/opensearchtransport_internal_test.go index 6daaeba06..e04a0ab58 100644 --- a/opensearchtransport/opensearchtransport_internal_test.go +++ b/opensearchtransport/opensearchtransport_internal_test.go @@ -31,6 +31,8 @@ package opensearchtransport import ( "bytes" "compress/gzip" + "context" + "errors" "fmt" "io" "math/rand" @@ -848,6 +850,86 @@ func TestTransportPerformRetries(t *testing.T) { t.Errorf("Unexpected duration, want=>%s, got=%s", expectedDuration, end) } }) + + t.Run("Delay the retry with retry on timeout and context deadline", func(t *testing.T) { + var i int + u, _ := url.Parse("http://foo.bar") + tp, _ := New(Config{ + EnableRetryOnTimeout: true, + MaxRetries: 100, + RetryBackoff: func(i int) time.Duration { return time.Hour }, + URLs: []*url.URL{u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + <-req.Context().Done() + return nil, req.Context().Err() + }, + }, + }) + + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + ctx, cancel := context.WithTimeout(req.Context(), 50*time.Millisecond) + defer cancel() + req = req.WithContext(ctx) + + //nolint:bodyclose // Mock response does not have a body to close + _, err := tp.Perform(req) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("expected context.DeadlineExceeded, got %s", err) + } + if i != 1 { + t.Fatalf("unexpected number of requests: expected 1, got %d", i) + } + }) + + t.Run("Don't backoff after the last retry", func(t *testing.T) { + var ( + i int + j int + numReqs = 5 + numRetries = numReqs - 1 + ) + + u, _ := url.Parse("http://foo.bar") + tp, _ := New(Config{ + MaxRetries: numRetries, + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + fmt.Print(": ERR\n") + return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + }, + }, + + // A simple incremental backoff function + // + RetryBackoff: func(i int) time.Duration { + j++ + d := time.Millisecond + fmt.Printf("Attempt: %d | Sleeping for %s...\n", i, d) + return d + }, + }) + + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + + //nolint:bodyclose // Mock response does not have a body to close + _, err := tp.Perform(req) + if err == nil { + t.Fatalf("Expected error, got: %v", err) + } + + if i != numReqs { + t.Errorf("Unexpected number of requests, want=%d, got=%d", numReqs, i) + } + + if j != numRetries { + t.Errorf("Unexpected number of backoffs, want=>%d, got=%d", numRetries, j) + } + }) } func TestURLs(t *testing.T) {