Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing info call before performing requests #219

Merged
merged 3 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Deprecated

### Removed
- Remove info call before performing every request ([#219](https://github.com/opensearch-project/opensearch-go/pull/219))

### Fixed
- Renamed the sequence number struct tag to if_seq_no to fix optimistic concurrency control ([#166](https://github.com/opensearch-project/opensearch-go/pull/166))
Expand Down
2 changes: 1 addition & 1 deletion internal/build/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/opensearch-project/opensearch-go/v2 => ../../
require (
github.com/alecthomas/chroma v0.8.2
github.com/kr/pretty v0.1.0 // indirect
github.com/opensearch-project/opensearch-go/v2 v2.1.0
github.com/opensearch-project/opensearch-go/v2 v2.2.0
github.com/spf13/cobra v1.6.1
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
golang.org/x/tools v0.1.12
Expand Down
2 changes: 1 addition & 1 deletion internal/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ package version

// Client returns the client version as a string.
//
const Client = "2.1.0"
const Client = "2.2.0"
89 changes: 2 additions & 87 deletions opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,14 @@
package opensearch

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/opensearch-project/opensearch-go/v2/signer"
Expand Down Expand Up @@ -99,8 +94,6 @@ type Config struct {
EnableMetrics bool // Enable the metrics collection.
EnableDebugLogger bool // Enable the debug logging.

UseResponseCheckOnly bool

RetryBackoff func(attempt int) time.Duration // Optional backoff duration. Default: nil.

Transport http.RoundTripper // The HTTP transport object.
Expand All @@ -116,10 +109,6 @@ type Config struct {
type Client struct {
*opensearchapi.API // Embeds the API methods
Transport opensearchtransport.Interface
useResponseCheckOnly bool

productCheckMu sync.RWMutex
productCheckSuccess bool
}

type esVersion struct {
Expand Down Expand Up @@ -217,7 +206,7 @@ func NewClient(cfg Config) (*Client, error) {
return nil, fmt.Errorf("error creating transport: %s", err)
}

client := &Client{Transport: tp, useResponseCheckOnly: cfg.UseResponseCheckOnly}
client := &Client{Transport: tp}
client.API = opensearchapi.New(client)

if cfg.DiscoverNodesOnStart {
Expand Down Expand Up @@ -274,84 +263,10 @@ func ParseVersion(version string) (int64, int64, int64, error) {
// Perform delegates to Transport to execute a request and return a response.
//
func (c *Client) Perform(req *http.Request) (*http.Response, error) {
if !c.useResponseCheckOnly {
// Launch product check, request info, check header then payload.
if err := c.doProductCheck(c.productCheck); err != nil {
return nil, err
}
}

// Retrieve the original request.
// Perform the original request.
return c.Transport.Perform(req)
}

// doProductCheck calls f if there as not been a prior successful call to doProductCheck,
// returning nil otherwise.
func (c *Client) doProductCheck(f func() error) error {
c.productCheckMu.RLock()
productCheckSuccess := c.productCheckSuccess
c.productCheckMu.RUnlock()

if productCheckSuccess {
return nil
}

c.productCheckMu.Lock()
defer c.productCheckMu.Unlock()

if c.productCheckSuccess {
return nil
}

if err := f(); err != nil {
return err
}

c.productCheckSuccess = true

return nil
}

// productCheck runs an opensearchapi.Info query to retrieve information of the current cluster
// decodes the response and decides if the cluster can be supported or not.
func (c *Client) productCheck() error {
req := opensearchapi.InfoRequest{}
res, err := req.Do(context.Background(), c.Transport)
if err != nil {
return err
}
defer res.Body.Close()

if res.IsError() {
_, err = io.Copy(ioutil.Discard, res.Body)
if err != nil {
return err
}
switch res.StatusCode {
case http.StatusUnauthorized:
return nil
case http.StatusForbidden:
return nil
default:
return fmt.Errorf("cannot retrieve information from OpenSearch")
}
}

var info info
contentType := res.Header.Get("Content-Type")
if strings.Contains(contentType, "json") {
err = json.NewDecoder(res.Body).Decode(&info)
if err != nil {
return fmt.Errorf("error decoding OpenSearch informations: %s", err)
}
}

if info.Version.Number != "" {
return checkCompatibleInfo(info)
}
return nil
}

// Metrics returns the client metrics.
//
func (c *Client) Metrics() (opensearchtransport.Metrics, error) {
Expand Down
125 changes: 0 additions & 125 deletions opensearch_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ import (
"errors"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"regexp"
"strings"
"testing"
Expand Down Expand Up @@ -446,126 +444,3 @@ func TestGenuineCheckInfo(t *testing.T) {
})
}
}

func TestResponseCheckOnly(t *testing.T) {
tests := []struct {
name string
useResponseCheckOnly bool
response *http.Response
requestErr error
wantErr bool
}{
{
name: "Valid answer without header",
useResponseCheckOnly: false,
response: &http.Response{
Body: ioutil.NopCloser(strings.NewReader("{}")),
},
wantErr: false,
},
{
name: "Valid answer and response check",
useResponseCheckOnly: true,
response: &http.Response{
Body: ioutil.NopCloser(strings.NewReader("{}")),
},
wantErr: false,
},
{
name: "Request failed",
useResponseCheckOnly: true,
response: nil,
requestErr: errors.New("request failed"),
wantErr: true,
},
{
name: "Valid request, 500 response",
useResponseCheckOnly: false,
response: &http.Response{
StatusCode: http.StatusInternalServerError,
Body: ioutil.NopCloser(strings.NewReader("")),
},
requestErr: nil,
wantErr: true,
},
{
name: "Valid request, 404 response",
useResponseCheckOnly: false,
response: &http.Response{
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(strings.NewReader("")),
},
requestErr: nil,
wantErr: true,
},
{
name: "Valid request, 403 response",
useResponseCheckOnly: false,
response: &http.Response{
StatusCode: http.StatusForbidden,
Body: ioutil.NopCloser(strings.NewReader("")),
},
requestErr: nil,
wantErr: false,
},
{
name: "Valid request, 401 response",
useResponseCheckOnly: false,
response: &http.Response{
StatusCode: http.StatusUnauthorized,
Body: ioutil.NopCloser(strings.NewReader("")),
},
requestErr: nil,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, _ := NewClient(Config{
Transport: &mockTransp{RoundTripFunc: func(request *http.Request) (*http.Response, error) {
return tt.response, tt.requestErr
}},
UseResponseCheckOnly: tt.useResponseCheckOnly,
})
_, err := c.Cat.Indices()
if (err != nil) != tt.wantErr {
t.Errorf("Unexpected error, got %v, wantErr %v", err, tt.wantErr)
}
})
}
}

func TestProductCheckError(t *testing.T) {
var requestPaths []string
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestPaths = append(requestPaths, r.URL.Path)
if len(requestPaths) == 1 {
// Simulate transient error from a proxy on the first request.
// This must not be cached by the client.
w.WriteHeader(http.StatusBadGateway)
return
}
w.Write([]byte("{}"))
}))
defer server.Close()

c, _ := NewClient(Config{Addresses: []string{server.URL}, DisableRetry: true})
if _, err := c.Cat.Indices(); err == nil {
t.Fatal("expected error")
}
if c.productCheckSuccess {
t.Fatalf("product check should be invalid, got %v", c.productCheckSuccess)
}
if _, err := c.Cat.Indices(); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if n := len(requestPaths); n != 3 {
t.Fatalf("expected 3 requests, got %d", n)
}
if !reflect.DeepEqual(requestPaths, []string{"/", "/", "/_cat/indices"}) {
t.Fatalf("unexpected request paths: %s", requestPaths)
}
if !c.productCheckSuccess {
t.Fatalf("product check should be valid, got : %v", c.productCheckSuccess)
}
}
2 changes: 1 addition & 1 deletion opensearchapi/test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.11
replace github.com/opensearch-project/opensearch-go/v2 => ../../

require (
github.com/opensearch-project/opensearch-go/v2 v2.1.0
github.com/opensearch-project/opensearch-go/v2 v2.2.0

gopkg.in/yaml.v2 v2.4.0
)