forked from elastic/cloud-on-k8s
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
212 lines (193 loc) · 7.76 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package client
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/pkg/utils/cryptutil"
"github.com/elastic/cloud-on-k8s/pkg/utils/net"
"go.elastic.co/apm/module/apmelasticsearch"
)
const (
// DefaultVotingConfigExclusionsTimeout is the default timeout for setting voting exclusions.
DefaultVotingConfigExclusionsTimeout = "30s"
// DefaultReqTimeout is the default timeout used when performing HTTP calls against Elasticsearch
DefaultReqTimeout = 3 * time.Minute
)
// UserAuth is authentication information for the Elasticsearch client.
type UserAuth struct {
Name string
Password string
}
// Role represents an Elasticsearch role.
type Role struct {
Cluster []string `json:"cluster,omitempty"`
/*Indices []struct {
Names []string `json:"names,omitempty"`
Privileges []string `json:",omitempty"`
} `json:"indices,omitempty"`
Applications []struct {
Application string `json:"application"`
Privileges []string `json:"privileges"`
Resources []string `json:"resources,omitempty"`
} `json:"applications,omitempty"`
RunAs []string `json:"run_as,omitempty"`
Metadata *struct {
Reserved bool `json:"_reserved"`
} `json:"metadata,omitempty"`
TransientMetadata *struct {
Enabled bool `json:"enabled"`
} `json:"transient_metadata,omitempty"`*/
}
// Client captures the information needed to interact with an Elasticsearch cluster via HTTP
type Client interface {
AllocationSetter
ShardLister
LicenseClient
// Close idle connections in the underlying http client.
Close()
// Equal returns true if other can be considered as the same client.
Equal(other Client) bool
// GetClusterInfo get the cluster information at /
GetClusterInfo(ctx context.Context) (Info, error)
// GetClusterRoutingAllocation retrieves the cluster routing allocation settings.
GetClusterRoutingAllocation(ctx context.Context) (ClusterRoutingAllocation, error)
// DisableReplicaShardsAllocation disables shards allocation on the cluster (only primaries are allocated).
DisableReplicaShardsAllocation(ctx context.Context) error
// EnableShardAllocation enables shards allocation on the cluster.
EnableShardAllocation(ctx context.Context) error
// SyncedFlush requests a synced flush on the cluster.
// This is "best-effort", see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-synced-flush.html.
SyncedFlush(ctx context.Context) error
// GetClusterHealth calls the _cluster/health api.
GetClusterHealth(ctx context.Context) (Health, error)
// SetMinimumMasterNodes sets the transient and persistent setting of the same name in cluster settings.
SetMinimumMasterNodes(ctx context.Context, n int) error
// ReloadSecureSettings will decrypt and re-read the entire keystore, on every cluster node,
// but only the reloadable secure settings will be applied
ReloadSecureSettings(ctx context.Context) error
// GetNodes calls the _nodes api to return a map(nodeName -> Node)
GetNodes(ctx context.Context) (Nodes, error)
// GetNodesStats calls the _nodes/stats api to return a map(nodeName -> NodeStats)
GetNodesStats(ctx context.Context) (NodesStats, error)
// ClusterBootstrappedForZen2 returns true if the cluster is relying on zen2 orchestration.
ClusterBootstrappedForZen2(ctx context.Context) (bool, error)
// UpdateRemoteClusterSettings updates the remote clusters of a cluster.
UpdateRemoteClusterSettings(ctx context.Context, settings RemoteClustersSettings) error
// AddVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings.
//
// If timeout is the empty string, the default is used.
//
// Introduced in: Elasticsearch 7.0.0
AddVotingConfigExclusions(ctx context.Context, nodeNames []string, timeout string) error
// DeleteVotingConfigExclusions sets the transient and persistent setting of the same name in cluster settings.
//
// Introduced in: Elasticsearch 7.0.0
DeleteVotingConfigExclusions(ctx context.Context, waitForRemoval bool) error
// Request exposes a low level interface to the underlying HTTP client e.g. for testing purposes.
// The Elasticsearch endpoint will be added automatically to the request URL which should therefore just be the path
// with a leading /
Request(ctx context.Context, r *http.Request) (*http.Response, error)
}
// NewElasticsearchClient creates a new client for the target cluster.
//
// If dialer is not nil, it will be used to create new TCP connections
func NewElasticsearchClient(
dialer net.Dialer,
esURL string,
esUser UserAuth,
v version.Version,
caCerts []*x509.Certificate,
) Client {
certPool := x509.NewCertPool()
for _, c := range caCerts {
certPool.AddCert(c)
}
transportConfig := http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: certPool,
// We use our own certificate verification because we permit users to provide their own certificates, which may not
// be valid for the k8s service URL (though our self-signed certificates are). For instance, users may use a certificate
// issued by a public CA for Elasticsearch. We opt to skip verifying here since we're not validating based on DNS names
// or IP addresses, which means we have to do our own verification in VerifyPeerCertificate instead.
// go requires either ServerName or InsecureSkipVerify (or both) when handshaking as a client since 1.3:
// https://github.com/golang/go/commit/fca335e91a915b6aae536936a7694c4a2a007a60
InsecureSkipVerify: true,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
return errors.New("tls: verify peer certificate not setup")
},
},
}
transportConfig.TLSClientConfig.VerifyPeerCertificate = func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
if verifiedChains != nil {
return errors.New("tls: non-nil verifiedChains argument breaks crypto/tls.Config.VerifyPeerCertificate contract")
}
_, _, err := cryptutil.VerifyCertificateExceptServerName(rawCerts, transportConfig.TLSClientConfig)
return err
}
// use the custom dialer if provided
if dialer != nil {
transportConfig.DialContext = dialer.DialContext
}
base := &baseClient{
Endpoint: esURL,
User: esUser,
caCerts: caCerts,
transport: &transportConfig,
HTTP: &http.Client{
Transport: apmelasticsearch.WrapRoundTripper(&transportConfig),
},
}
return versioned(base, v)
}
// APIError is a non 2xx response from the Elasticsearch API
type APIError struct {
response *http.Response
}
// Error() implements the error interface.
func (e *APIError) Error() string {
defer e.response.Body.Close()
reason := "unknown"
// Elasticsearch has a detailed error message in the response body
var errMsg ErrorResponse
err := json.NewDecoder(e.response.Body).Decode(&errMsg)
if err == nil {
reason = errMsg.Error.Reason
}
return fmt.Sprintf("%s: %s", e.response.Status, reason)
}
// IsNotFound checks whether the error was an HTTP 404 error.
func IsNotFound(err error) bool {
switch err := err.(type) {
case *APIError:
return err.response.StatusCode == http.StatusNotFound
default:
return false
}
}
// IsConflict checks whether the error was an HTTP 409 error.
func IsConflict(err error) bool {
switch err := err.(type) {
case *APIError:
return err.response.StatusCode == http.StatusConflict
default:
return false
}
}
// IsForbidden checks whether the error was an HTTP 403 error.
func IsForbidden(err error) bool {
switch err := err.(type) {
case *APIError:
return err.response.StatusCode == http.StatusForbidden
default:
return false
}
}