-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathresource_opensearch_cluster_settings.go
509 lines (479 loc) · 18.3 KB
/
resource_opensearch_cluster_settings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
package provider
import (
"context"
"encoding/json"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
elastic7 "github.com/olivere/elastic/v7"
)
var (
stringClusterSettings = []string{
"cluster.persistent_tasks.allocation.enable",
"cluster.persistent_tasks.allocation.recheck_interval",
"cluster.info.update.interval",
"cluster.routing.allocation.allow_rebalance",
"cluster.routing.allocation.awareness.attributes",
"cluster.routing.allocation.disk.watermark.high",
"cluster.routing.allocation.disk.watermark.low",
"cluster.routing.rebalance.enable",
"cluster.no_master_block",
"indices.breaker.fielddata.limit",
"indices.breaker.request.limit",
"indices.breaker.total.limit",
"indices.recovery.max_bytes_per_sec",
"network.breaker.inflight_requests.limit",
"script.max_compilations_rate",
"search.default_search_timeout",
"action.auto_create_index",
"cluster.routing.allocation.enable",
"cluster.search.request.slowlog.level",
"cluster.search.request.slowlog.threshold.warn",
"cluster.search.request.slowlog.threshold.info",
"cluster.search.request.slowlog.threshold.debug",
"cluster.search.request.slowlog.threshold.trace",
}
intClusterSettings = []string{
"cluster.max_shards_per_node",
"cluster.max_shards_per_node.frozen",
"cluster.routing.allocation.cluster_concurrent_rebalance",
"cluster.routing.allocation.node_concurrent_incoming_recoveries",
"cluster.routing.allocation.node_concurrent_outgoing_recoveries",
"cluster.routing.allocation.node_concurrent_recoveries",
"cluster.routing.allocation.node_initial_primaries_recoveries",
"cluster.routing.allocation.total_shards_per_node",
}
floatClusterSettings = []string{
"cluster.routing.allocation.balance.index",
"cluster.routing.allocation.balance.shard",
"cluster.routing.allocation.balance.threshold",
"indices.breaker.fielddata.overhead",
"indices.breaker.request.overhead",
"network.breaker.inflight_requests.overhead",
}
boolClusterSettings = []string{
"cluster.blocks.read_only",
"cluster.blocks.read_only_allow_delete",
"cluster.indices.close.enable",
"cluster.routing.allocation.disk.include_relocations",
"cluster.routing.allocation.disk.threshold_enabled",
"cluster.routing.allocation.same_shard.host",
"action.destructive_requires_name",
}
typeListClusterSettings = []string{
"cluster.routing.allocation.awareness.force.zone.values",
}
dynamicClusterSettings = concatStringSlice(stringClusterSettings, intClusterSettings, floatClusterSettings, boolClusterSettings, typeListClusterSettings)
)
func resourceOpensearchClusterSettings() *schema.Resource {
return &schema.Resource{
Description: "Manages a cluster's (persistent) settings.",
Create: resourceOpensearchClusterSettingsCreate,
Read: resourceOpensearchClusterSettingsRead,
Update: resourceOpensearchClusterSettingsUpdate,
Delete: resourceOpensearchClusterSettingsDelete,
Schema: map[string]*schema.Schema{
"cluster_max_shards_per_node": {
Type: schema.TypeInt,
Optional: true,
Description: "The total number of primary and replica shards for the cluster, this number is multiplied by the number of non-frozen data nodes; shards for closed indices do not count toward this limit",
},
"cluster_max_shards_per_node_frozen": {
Type: schema.TypeInt,
Optional: true,
Description: "The total number of primary and replica frozen shards, for the cluster; Ssards for closed indices do not count toward this limit, a cluster with no frozen data nodes is unlimited.",
},
"cluster_persistent_tasks_allocation_enable": {
Type: schema.TypeString,
Optional: true,
Description: "Whether allocation for persistent tasks is active (all, none)",
},
"cluster_persistent_tasks_allocation_recheck_interval": {
Type: schema.TypeString,
Optional: true,
Description: "A time string controling how often assignment checks are performed to react to whether persistent tasks can be assigned to nodes",
},
"cluster_blocks_read_only": {
Type: schema.TypeBool,
Optional: true,
Description: "Make the whole cluster read only and metadata is not allowed to be modified",
},
"cluster_blocks_read_only_allow_delete": {
Type: schema.TypeBool,
Optional: true,
Description: "Make the whole cluster read only, but allows to delete indices to free up resources",
},
"cluster_indices_close_enable": {
Type: schema.TypeBool,
Optional: true,
Description: "If false, you cannot close open indices",
},
"cluster_info_update_interval": {
Type: schema.TypeString,
Optional: true,
Description: "A time string controlling how often OpenSearch should check on disk usage for each node in the cluster",
},
"cluster_routing_allocation_allow_rebalance": {
Type: schema.TypeString,
Optional: true,
Description: "Specify when shard rebalancing is allowed (always, indices_primaries_active, indices_all_active)",
},
"cluster_routing_allocation_awareness_attributes": {
Type: schema.TypeString,
Optional: true,
Description: "Use custom node attributes to take hardware configuration into account when allocating shards",
},
"cluster_routing_allocation_awareness_force_zone_values": {
Type: schema.TypeList,
Optional: true,
Description: "A list of zones for awareness allocation.",
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"cluster_routing_allocation_balance_index": {
Type: schema.TypeFloat,
Optional: true,
Description: "Weight factor for the number of shards per index allocated on a node, increasing this raises the tendency to equalize the number of shards per index across all nodes",
},
"cluster_routing_allocation_balance_shard": {
Type: schema.TypeFloat,
Optional: true,
Description: "Weight factor for the total number of shards allocated on a node, increasing this raises the tendency to equalize the number of shards across all nodes",
},
"cluster_routing_allocation_balance_threshold": {
Type: schema.TypeFloat,
Optional: true,
Description: "Minimal optimization value of operations that should be performed, raising this will cause the cluster to be less aggressive about optimizing the shard balance",
},
"cluster_routing_allocation_cluster_concurrent_rebalance": {
Type: schema.TypeInt,
Optional: true,
Description: "How many concurrent shard rebalances are allowed cluster wide",
},
"cluster_routing_allocation_disk_include_relocations": {
Type: schema.TypeBool,
Optional: true,
Description: "Whether the allocator will take into account shards that are currently being relocated to the target node when computing a node’s disk usage",
},
"cluster_routing_allocation_disk_threshold_enabled": {
Type: schema.TypeBool,
Optional: true,
Description: "Whether the disk allocation decider is active",
},
"cluster_routing_allocation_disk_watermark_high": {
Type: schema.TypeString,
Optional: true,
Description: "Allocator will attempt to relocate shards away from a node whose disk usage is above this percentage disk used",
},
"cluster_routing_allocation_disk_watermark_low": {
Type: schema.TypeString,
Optional: true,
Description: "Allocator will not allocate shards to nodes that have more than this percentage disk used",
},
"cluster_routing_allocation_enable": {
Type: schema.TypeString,
Optional: true,
Description: "Enable or disable allocation for specific kinds of shards (all, primaries, new_primaries, none)",
},
"cluster_search_request_slowlog_level": {
Type: schema.TypeString,
Optional: true,
Description: "Log level for search requests slowlog (TRACE, DEBUG, INFO, WARN)",
},
"cluster_search_request_slowlog_threshold_warn": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for WARN level search requests (e.g., 10s)",
},
"cluster_search_request_slowlog_threshold_info": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for INFO level search requests (e.g., 5s)",
},
"cluster_search_request_slowlog_threshold_debug": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for DEBUG level search requests (e.g., 2s)",
},
"cluster_search_request_slowlog_threshold_trace": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for TRACE level search requests (e.g., 10ms)",
},
"cluster_routing_allocation_node_concurrent_incoming_recoveries": {
Type: schema.TypeInt,
Optional: true,
Description: "How many incoming recoveries where the target shard (likely the replica unless a shard is relocating) are allocated on the node",
},
"cluster_routing_allocation_node_concurrent_outgoing_recoveries": {
Type: schema.TypeInt,
Optional: true,
Description: "How many outgoing recoveries where the source shard (likely the primary unless a shard is relocating) are allocated on the node",
},
"cluster_routing_allocation_node_concurrent_recoveries": {
Type: schema.TypeInt,
Optional: true,
Description: "A shortcut to set both incoming and outgoing recoveries",
},
"cluster_routing_allocation_node_initial_primaries_recoveries": {
Type: schema.TypeInt,
Optional: true,
Description: "Set a (usually) higher rate for primary recovery on node restart (usually from disk, so fast)",
},
"cluster_routing_allocation_same_shard_host": {
Type: schema.TypeBool,
Optional: true,
Description: "Perform a check to prevent allocation of multiple instances of the same shard on a single host, if multiple nodes are started on the host",
},
"cluster_routing_allocation_total_shards_per_node": {
Type: schema.TypeInt,
Optional: true,
Description: "Maximum number of primary and replica shards allocated to each node",
},
"cluster_routing_rebalance_enable": {
Type: schema.TypeString,
Optional: true,
Description: "Allow rebalancing for specific kinds of shards (all, primaries, replicas, none)",
},
"cluster_no_master_block": {
Type: schema.TypeString,
Optional: true,
Description: "Specifies which operations are rejected when there is no active master in a cluster (all, write)",
},
"indices_breaker_fielddata_limit": {
Type: schema.TypeString,
Optional: true,
Description: "The percentage of memory above which if loading a field into the field data cache would cause the cache to exceed this limit, an error is returned",
},
"indices_breaker_fielddata_overhead": {
Type: schema.TypeFloat,
Optional: true,
Description: "A constant that all field data estimations are multiplied by",
},
"indices_breaker_request_limit": {
Type: schema.TypeString,
Optional: true,
Description: "The percentabge of memory above which per-request data structures (e.g. calculating aggregations) are prevented from exceeding",
},
"indices_breaker_request_overhead": {
Type: schema.TypeFloat,
Optional: true,
Description: "A constant that all request estimations are multiplied by",
},
"indices_breaker_total_limit": {
Type: schema.TypeString,
Optional: true,
Description: "The percentage of total amount of memory that can be used across all breakers",
},
"indices_recovery_max_bytes_per_sec": {
Type: schema.TypeString,
Optional: true,
Description: "Maximum total inbound and outbound recovery traffic for each node, in mb",
},
"network_breaker_inflight_requests_limit": {
Type: schema.TypeString,
Optional: true,
Description: "The percentage limit of memory usage on a node of all currently active incoming requests on transport or HTTP level",
},
"network_breaker_inflight_requests_overhead": {
Type: schema.TypeFloat,
Optional: true,
Description: "A constant that all in flight requests estimations are multiplied by",
},
"script_max_compilations_rate": {
Type: schema.TypeString,
Optional: true,
Description: "Limit for the number of unique dynamic scripts within a certain interval that are allowed to be compiled, expressed as compilations divided by a time string",
},
"search_default_search_timeout": {
Type: schema.TypeString,
Optional: true,
Description: "A time string setting a cluster-wide default timeout for all search requests",
},
"action_auto_create_index": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validation.StringMatch(regexp.MustCompile(`^(true|false|([-+]?[a-z0-9_*.,]+)+)$`), "expected value to be one of: true, false or comma-separated list"),
Description: "Whether to automatically create an index if it doesn’t already exist and apply any configured index template",
},
"action_destructive_requires_name": {
Type: schema.TypeBool,
Optional: true,
Description: "When set to true, you must specify the index name to delete an index and it is not possible to delete all indices with _all or use wildcards",
},
},
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
}
}
func resourceOpensearchClusterSettingsCreate(d *schema.ResourceData, meta interface{}) error {
err := resourceOpensearchPutClusterSettings(d, meta)
if err != nil {
return err
}
d.SetId("settings")
return resourceOpensearchClusterSettingsRead(d, meta)
}
func resourceOpensearchPutClusterSettings(d *schema.ResourceData, meta interface{}) error {
var err error
osClient, err := getClient(meta.(*ProviderConf))
if err != nil {
return err
}
settings := make(map[string]interface{})
settings["persistent"] = clusterSettingsFromResourceData(d)
body, err := json.Marshal(settings)
if err != nil {
return err
}
// client doesn't support PUTing settings: https://github.com/olivere/elastic/issues/1274
_, err = osClient.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "PUT",
Path: "/_cluster/settings",
Body: string(body),
})
if err != nil {
return err
}
return err
}
func resourceOpensearchClusterSettingsRead(d *schema.ResourceData, meta interface{}) error {
settings, err := resourceOpensearchClusterSettingsGet(meta)
if err != nil {
return err
}
return clusterResourceDataFromSettings(settings["persistent"].(map[string]interface{}), d)
}
func resourceOpensearchClusterSettingsUpdate(d *schema.ResourceData, meta interface{}) error {
return resourceOpensearchPutClusterSettings(d, meta)
}
func resourceOpensearchClusterSettingsDelete(d *schema.ResourceData, meta interface{}) error {
err := clearAllSettings(meta)
if err != nil {
return err
}
d.SetId("")
return err
}
func resourceOpensearchClusterSettingsGet(meta interface{}) (map[string]interface{}, error) {
var err error
var settings map[string]interface{}
var response *json.RawMessage
osClient, err := getClient(meta.(*ProviderConf))
if err != nil {
return settings, err
}
var res *elastic7.Response
res, err = osClient.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "GET",
Path: "/_cluster/settings?flat_settings=true",
})
if err != nil {
return settings, err
}
response = &res.Body
err = json.Unmarshal(*response, &settings)
if err != nil {
return settings, fmt.Errorf("fail to unmarshal: %v", err)
}
return settings, err
}
func clearAllSettings(meta interface{}) error {
var err error
osClient, err := getClient(meta.(*ProviderConf))
if err != nil {
return err
}
body := `{
"persistent" : {
"cluster.*": null,
"indices.*": null,
"action.*": null,
"script.*": null,
"network.*": null,
"search.*": null,
"plugins.*": null
}
}`
_, err = osClient.PerformRequest(context.TODO(), elastic7.PerformRequestOptions{
Method: "PUT",
Path: "/_cluster/settings",
Body: body,
})
if err != nil {
return err
}
return err
}
func clusterSettingsFromResourceData(d *schema.ResourceData) map[string]interface{} {
settings := make(map[string]interface{})
for _, key := range dynamicClusterSettings {
schemaName := strings.Replace(key, ".", "_", -1)
if raw, ok := d.GetOk(schemaName); ok {
if isTypeListSetting(key) {
if list, ok := raw.([]interface{}); ok {
settings[key] = convertListToSlice(list)
}
} else {
settings[key] = raw
}
}
}
return settings
}
func clusterResourceDataFromSettings(settings map[string]interface{}, d *schema.ResourceData) error {
for _, key := range dynamicClusterSettings {
value, ok := settings[key]
if !ok {
continue
}
schemaName := strings.Replace(key, ".", "_", -1)
if isTypeListSetting(key) {
if list, ok := value.([]interface{}); ok {
if err := d.Set(schemaName, list); err != nil {
return fmt.Errorf("error setting %s: %s", schemaName, err)
}
}
} else {
var err error
if containsString(intClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.Atoi(value.(string))
if err != nil {
return fmt.Errorf("error converting %s to int: %s", key, err)
}
} else if containsString(floatClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.ParseFloat(value.(string), 64)
if err != nil {
return fmt.Errorf("error converting %s to float: %s", key, err)
}
} else if containsString(boolClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.ParseBool(value.(string))
if err != nil {
return fmt.Errorf("error converting %s to bool: %s", key, err)
}
}
if err := d.Set(schemaName, value); err != nil {
return fmt.Errorf("error setting %s: %s", schemaName, err)
}
}
}
return nil
}
func isTypeListSetting(key string) bool {
return containsString(typeListClusterSettings, key)
}
func convertListToSlice(list []interface{}) []string {
var slice []string
for _, item := range list {
if str, ok := item.(string); ok {
slice = append(slice, str)
}
}
return slice
}