From d150b4544a4701d0c110dccff4512de67eba9875 Mon Sep 17 00:00:00 2001 From: Robson Sutton <88885489+RobsonSutton@users.noreply.github.com> Date: Thu, 10 Nov 2022 06:26:42 +0000 Subject: [PATCH] [ISSUE-112] Add total_shards_per_node setting to allocate in ILM (#120) * resolve typo in var name * add total_shards_per_node setting * re-format (never-ending spaces vs tabs fiasco) * remove index_test entry * re-add test for setting via index * setting only supported from ES v7.16 * update docs * Test total_shards_per_node only when supported * Specify the default for total_shards_per_node in read when it's not present in the API response Co-authored-by: Toby Brain --- CHANGELOG.md | 1 + docs/resources/elasticsearch_index.md | 7 +- .../elasticsearch_index_lifecycle.md | 5 +- .../resource.tf | 7 +- .../resource.tf | 3 +- internal/clients/api_client.go | 28 ++++++- internal/elasticsearch/index/ilm.go | 82 +++++++++++++------ internal/elasticsearch/index/ilm_test.go | 67 +++++++++++++++ internal/elasticsearch/index/index_test.go | 6 +- internal/elasticsearch/security/user_test.go | 10 ++- 10 files changed, 182 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f6cd81b7..f7323f1ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Add `elasticstack_elasticsearch_script` resource ([#173](https://github.com/elastic/terraform-provider-elasticstack/pull/173)) - Add `elasticstack_elasticsearch_security_role` data source ([#177](https://github.com/elastic/terraform-provider-elasticstack/pull/177)) - Add `elasticstack_elasticsearch_security_role_mapping` data source ([#178](https://github.com/elastic/terraform-provider-elasticstack/pull/178)) +- Apply `total_shards_per_node` setting in `allocate` action in ILM. Supported from Elasticsearch version **7.16** ([#112](https://github.com/elastic/terraform-provider-elasticstack/issues/112)) ### Fixed - Remove unnecessary unsetting id on delete ([#174](https://github.com/elastic/terraform-provider-elasticstack/pull/174)) diff --git a/docs/resources/elasticsearch_index.md b/docs/resources/elasticsearch_index.md index 1b4d734e9..781fe4759 100644 --- a/docs/resources/elasticsearch_index.md +++ b/docs/resources/elasticsearch_index.md @@ -44,9 +44,10 @@ resource "elasticstack_elasticsearch_index" "my_index" { } }) - number_of_shards = 1 - number_of_replicas = 2 - search_idle_after = "20s" + number_of_shards = 1 + number_of_replicas = 2 + search_idle_after = "20s" + total_shards_per_node = 200 } ``` diff --git a/docs/resources/elasticsearch_index_lifecycle.md b/docs/resources/elasticsearch_index_lifecycle.md index 4fd43d19a..6449042f9 100644 --- a/docs/resources/elasticsearch_index_lifecycle.md +++ b/docs/resources/elasticsearch_index_lifecycle.md @@ -41,7 +41,8 @@ resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" { exclude = jsonencode({ box_type = "hot" }) - number_of_replicas = 1 + number_of_replicas = 1 + total_shards_per_node = 200 } } @@ -97,6 +98,7 @@ Optional: - `include` (String) Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document. - `number_of_replicas` (Number) Number of replicas to assign to the index. Default: `0` - `require` (String) Assigns an index to nodes that have all of the specified custom attributes. Must be valid JSON document. +- `total_shards_per_node` (Number) The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16** @@ -319,6 +321,7 @@ Optional: - `include` (String) Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document. - `number_of_replicas` (Number) Number of replicas to assign to the index. Default: `0` - `require` (String) Assigns an index to nodes that have all of the specified custom attributes. Must be valid JSON document. +- `total_shards_per_node` (Number) The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16** diff --git a/examples/resources/elasticstack_elasticsearch_index/resource.tf b/examples/resources/elasticstack_elasticsearch_index/resource.tf index 3b5801672..19198ffb6 100644 --- a/examples/resources/elasticstack_elasticsearch_index/resource.tf +++ b/examples/resources/elasticstack_elasticsearch_index/resource.tf @@ -29,7 +29,8 @@ resource "elasticstack_elasticsearch_index" "my_index" { } }) - number_of_shards = 1 - number_of_replicas = 2 - search_idle_after = "20s" + number_of_shards = 1 + number_of_replicas = 2 + search_idle_after = "20s" + total_shards_per_node = 200 } diff --git a/examples/resources/elasticstack_elasticsearch_index_lifecycle/resource.tf b/examples/resources/elasticstack_elasticsearch_index_lifecycle/resource.tf index c573856cf..70d9915b4 100644 --- a/examples/resources/elasticstack_elasticsearch_index_lifecycle/resource.tf +++ b/examples/resources/elasticstack_elasticsearch_index_lifecycle/resource.tf @@ -26,7 +26,8 @@ resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" { exclude = jsonencode({ box_type = "hot" }) - number_of_replicas = 1 + number_of_replicas = 1 + total_shards_per_node = 200 } } diff --git a/internal/clients/api_client.go b/internal/clients/api_client.go index 845171ffe..1657b9b2b 100644 --- a/internal/clients/api_client.go +++ b/internal/clients/api_client.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/hashicorp/go-version" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/logging" @@ -198,7 +199,7 @@ func (a *ApiClient) ID(ctx context.Context, resourceId string) (*CompositeId, di return &CompositeId{*clusterId, resourceId}, diags } -func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) { +func (a *ApiClient) serverInfo(ctx context.Context) (map[string]interface{}, diag.Diagnostics) { var diags diag.Diagnostics res, err := a.es.Info(a.es.Info.WithContext(ctx)) if err != nil { @@ -213,6 +214,31 @@ func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) { if err := json.NewDecoder(res.Body).Decode(&info); err != nil { return nil, diag.FromErr(err) } + + return info, diags +} + +func (a *ApiClient) ServerVersion(ctx context.Context) (*version.Version, diag.Diagnostics) { + info, diags := a.serverInfo(ctx) + if diags.HasError() { + return nil, diags + } + + rawVersion := info["version"].(map[string]interface{})["number"].(string) + serverVersion, err := version.NewVersion(rawVersion) + if err != nil { + return nil, diag.FromErr(err) + } + + return serverVersion, nil +} + +func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) { + info, diags := a.serverInfo(ctx) + if diags.HasError() { + return nil, diags + } + if uuid := info["cluster_uuid"].(string); uuid != "" && uuid != "_na_" { tflog.Trace(ctx, fmt.Sprintf("cluster UUID: %s", uuid)) return &uuid, diags diff --git a/internal/elasticsearch/index/ilm.go b/internal/elasticsearch/index/ilm.go index e7a551b8f..b4ba7b84a 100644 --- a/internal/elasticsearch/index/ilm.go +++ b/internal/elasticsearch/index/ilm.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/terraform-provider-elasticstack/internal/clients" "github.com/elastic/terraform-provider-elasticstack/internal/models" "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/hashicorp/go-version" "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -112,7 +113,7 @@ func ResourceIlm() *schema.Resource { } } -var suportedActions = map[string]*schema.Schema{ +var supportedActions = map[string]*schema.Schema{ "allocate": { Description: "Updates the index settings to change which nodes are allowed to host the index shards and change the number of replicas.", Type: schema.TypeList, @@ -126,6 +127,12 @@ var suportedActions = map[string]*schema.Schema{ Optional: true, Default: 0, }, + "total_shards_per_node": { + Description: "The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**", + Type: schema.TypeInt, + Optional: true, + Default: -1, + }, "include": { Description: "Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.", Type: schema.TypeString, @@ -361,7 +368,7 @@ var suportedActions = map[string]*schema.Schema{ func getSchema(actions ...string) map[string]*schema.Schema { sch := make(map[string]*schema.Schema) for _, a := range actions { - if action, ok := suportedActions[a]; ok { + if action, ok := supportedActions[a]; ok { sch[a] = action } } @@ -387,7 +394,12 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{ return diags } - policy, diags := expandIlmPolicy(d) + serverVersion, diags := client.ServerVersion(ctx) + if diags.HasError() { + return diags + } + + policy, diags := expandIlmPolicy(d, serverVersion) if diags.HasError() { return diags } @@ -401,7 +413,7 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{ return resourceIlmRead(ctx, d, meta) } -func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics) { +func expandIlmPolicy(d *schema.ResourceData, serverVersion *version.Version) (*models.Policy, diag.Diagnostics) { var diags diag.Diagnostics var policy models.Policy phases := make(map[string]models.Phase) @@ -418,7 +430,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics) for _, ph := range supportedIlmPhases { if v, ok := d.GetOk(ph); ok { - phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d) + phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d, serverVersion) if diags.HasError() { return nil, diags } @@ -430,7 +442,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics) return &policy, diags } -func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phase, diag.Diagnostics) { +func expandPhase(p map[string]interface{}, d *schema.ResourceData, serverVersion *version.Version) (*models.Phase, diag.Diagnostics) { var diags diag.Diagnostics var phase models.Phase @@ -444,44 +456,44 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas if a := action.([]interface{}); len(a) > 0 { switch actionName { case "allocate": - actions[actionName], diags = expandAction(a, "number_of_replicas", "include", "exclude", "require") + actions[actionName], diags = expandAction(a, serverVersion, "number_of_replicas", "total_shards_per_node", "include", "exclude", "require") case "delete": - actions[actionName], diags = expandAction(a, "delete_searchable_snapshot") + actions[actionName], diags = expandAction(a, serverVersion, "delete_searchable_snapshot") case "forcemerge": - actions[actionName], diags = expandAction(a, "max_num_segments", "index_codec") + actions[actionName], diags = expandAction(a, serverVersion, "max_num_segments", "index_codec") case "freeze": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { - actions[actionName], diags = expandAction(a) + actions[actionName], diags = expandAction(a, serverVersion) } } case "migrate": - actions[actionName], diags = expandAction(a, "enabled") + actions[actionName], diags = expandAction(a, serverVersion, "enabled") case "readonly": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { - actions[actionName], diags = expandAction(a) + actions[actionName], diags = expandAction(a, serverVersion) } } case "rollover": - actions[actionName], diags = expandAction(a, "max_age", "max_docs", "max_size", "max_primary_shard_size") + actions[actionName], diags = expandAction(a, serverVersion, "max_age", "max_docs", "max_size", "max_primary_shard_size") case "searchable_snapshot": - actions[actionName], diags = expandAction(a, "snapshot_repository", "force_merge_index") + actions[actionName], diags = expandAction(a, serverVersion, "snapshot_repository", "force_merge_index") case "set_priority": - actions[actionName], diags = expandAction(a, "priority") + actions[actionName], diags = expandAction(a, serverVersion, "priority") case "shrink": - actions[actionName], diags = expandAction(a, "number_of_shards", "max_primary_shard_size") + actions[actionName], diags = expandAction(a, serverVersion, "number_of_shards", "max_primary_shard_size") case "unfollow": if a[0] != nil { ac := a[0].(map[string]interface{}) if ac["enabled"].(bool) { - actions[actionName], diags = expandAction(a) + actions[actionName], diags = expandAction(a, serverVersion) } } case "wait_for_snapshot": - actions[actionName], diags = expandAction(a, "policy") + actions[actionName], diags = expandAction(a, serverVersion, "policy") default: diags = append(diags, diag.Diagnostic{ Severity: diag.Error, @@ -497,17 +509,35 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas return &phase, diags } -func expandAction(a []interface{}, settings ...string) (map[string]interface{}, diag.Diagnostics) { +var ilmActionSettingOptions = map[string]struct { + skipEmptyCheck bool + def interface{} + minVersion *version.Version +}{ + "number_of_replicas": {skipEmptyCheck: true}, + "total_shards_per_node": {skipEmptyCheck: true, def: -1, minVersion: version.Must(version.NewVersion("7.16.0"))}, + "priority": {skipEmptyCheck: true}, +} + +func expandAction(a []interface{}, serverVersion *version.Version, settings ...string) (map[string]interface{}, diag.Diagnostics) { var diags diag.Diagnostics def := make(map[string]interface{}) - // can be zero, so we must skip the empty check - settingsToSkip := map[string]struct{}{"number_of_replicas": {}, "priority": {}} - if action := a[0]; action != nil { for _, setting := range settings { if v, ok := action.(map[string]interface{})[setting]; ok && v != nil { - if _, ok := settingsToSkip[setting]; ok || !utils.IsEmpty(v) { + options := ilmActionSettingOptions[setting] + + if options.minVersion != nil && options.minVersion.GreaterThan(serverVersion) { + if v != options.def { + return nil, diag.Errorf("[%s] is not supported in the target Elasticsearch server. Remove the setting from your module definition or set it to the default [%s] value", setting, options.def) + } + + // This setting is not supported, and shouldn't be set in the ILM policy object + continue + } + + if options.skipEmptyCheck || !utils.IsEmpty(v) { // these 3 fields must be treated as JSON objects if setting == "include" || setting == "exclude" || setting == "require" { res := make(map[string]interface{}) @@ -618,6 +648,12 @@ func flattenPhase(phaseName string, p models.Phase, d *schema.ResourceData) (int if v, ok := action["number_of_replicas"]; ok { allocateAction["number_of_replicas"] = v } + if v, ok := action["total_shards_per_node"]; ok { + allocateAction["total_shards_per_node"] = v + } else { + // Specify the default for total_shards_per_node. This avoids an endless diff loop for ES 7.15 or lower which don't support this setting + allocateAction["total_shards_per_node"] = -1 + } for _, f := range []string{"include", "require", "exclude"} { if v, ok := action[f]; ok { res, err := json.Marshal(v) diff --git a/internal/elasticsearch/index/ilm_test.go b/internal/elasticsearch/index/ilm_test.go index bdf104781..248c70c75 100644 --- a/internal/elasticsearch/index/ilm_test.go +++ b/internal/elasticsearch/index/ilm_test.go @@ -1,16 +1,20 @@ package index_test import ( + "context" "fmt" "testing" "github.com/elastic/terraform-provider-elasticstack/internal/acctest" "github.com/elastic/terraform-provider-elasticstack/internal/clients" + "github.com/hashicorp/go-version" sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" ) +var totalShardsPerNodeVersionLimit = version.Must(version.NewVersion("7.16.0")) + func TestAccResourceILM(t *testing.T) { // generate a random policy name policyName := sdkacctest.RandStringFromCharSet(10, sdkacctest.CharSetAlphaNum) @@ -56,9 +60,32 @@ func TestAccResourceILM(t *testing.T) { resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "frozen.#", "0"), ), }, + { + SkipFunc: serverVersionLessThanTotalShardsPerNodeLimit, + Config: testAccResourceILMTotalShardsPerNode(policyName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "name", policyName), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.#", "1"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.min_age", "0ms"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.set_priority.0.priority", "60"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.readonly.#", "1"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.allocate.#", "1"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.allocate.0.number_of_replicas", "1"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index_lifecycle.test", "warm.0.allocate.0.total_shards_per_node", "200"), + ), + }, }, }) } +func serverVersionLessThanTotalShardsPerNodeLimit() (bool, error) { + client := acctest.ApiClient() + serverVersion, diags := client.ServerVersion(context.Background()) + if diags.HasError() { + return false, fmt.Errorf("failed to parse the elasticsearch version %v", diags) + } + + return serverVersion.LessThan(totalShardsPerNodeVersionLimit), nil +} func testAccResourceILMCreate(name string) string { return fmt.Sprintf(` @@ -129,6 +156,46 @@ resource "elasticstack_elasticsearch_index_lifecycle" "test" { `, name) } +func testAccResourceILMTotalShardsPerNode(name string) string { + return fmt.Sprintf(` +provider "elasticstack" { + elasticsearch {} +} + +resource "elasticstack_elasticsearch_index_lifecycle" "test" { + name = "%s" + + hot { + min_age = "1h" + + set_priority { + priority = 0 + } + + rollover { + max_age = "2d" + } + } + + warm { + min_age = "0ms" + set_priority { + priority = 60 + } + readonly {} + allocate { + exclude = jsonencode({ + box_type = "hot" + }) + number_of_replicas = 1 + total_shards_per_node = 200 + } + } + +} + `, name) +} + func checkResourceILMDestroy(s *terraform.State) error { client := acctest.Provider.Meta().(*clients.ApiClient) diff --git a/internal/elasticsearch/index/index_test.go b/internal/elasticsearch/index/index_test.go index bf51092bb..a7a5c504f 100644 --- a/internal/elasticsearch/index/index_test.go +++ b/internal/elasticsearch/index/index_test.go @@ -27,7 +27,7 @@ func TestAccResourceIndex(t *testing.T) { resource.TestCheckResourceAttr("elasticstack_elasticsearch_index.test", "alias.0.name", "test_alias_1"), resource.TestCheckResourceAttr("elasticstack_elasticsearch_index.test", "alias.1.name", "test_alias_2"), resource.TestCheckResourceAttr("elasticstack_elasticsearch_index.test", "alias.#", "2"), - resource.TestCheckResourceAttr("elasticstack_elasticsearch_index.test", "settings.0.setting.#", "2"), + resource.TestCheckResourceAttr("elasticstack_elasticsearch_index.test", "settings.0.setting.#", "3"), ), }, { @@ -168,6 +168,10 @@ resource "elasticstack_elasticsearch_index" "test" { name = "index.number_of_replicas" value = "2" } + setting { + name = "index.routing.allocation.total_shards_per_node" + value = "200" + } setting { name = "index.search.idle.after" value = "20s" diff --git a/internal/elasticsearch/security/user_test.go b/internal/elasticsearch/security/user_test.go index 583bba74d..b5e5674b5 100644 --- a/internal/elasticsearch/security/user_test.go +++ b/internal/elasticsearch/security/user_test.go @@ -58,9 +58,11 @@ func TestAccImportedUserDoesNotResetPassword(t *testing.T) { resp, err := client.GetESClient().Security.PutUser(username, strings.NewReader(body)) if err != nil { - return false, nil + return false, err } + defer resp.Body.Close() + if resp.IsError() { body, err := io.ReadAll(resp.Body) return false, fmt.Errorf("failed to manually create import test user [%s] %s %s", username, body, err) @@ -121,6 +123,8 @@ func TestAccImportedUserDoesNotResetPassword(t *testing.T) { return false, nil } + defer resp.Body.Close() + if resp.IsError() { body, err := io.ReadAll(resp.Body) return false, fmt.Errorf("failed to manually change import test user password [%s] %s %s", username, body, err) @@ -151,6 +155,8 @@ func checkUserCanAuthenticate(username string, password string) func(*terraform. return err } + defer resp.Body.Close() + if resp.IsError() { body, err := io.ReadAll(resp.Body) @@ -220,6 +226,8 @@ func checkResourceSecurityUserDestroy(s *terraform.State) error { return err } + defer res.Body.Close() + if res.StatusCode != 404 { return fmt.Errorf("User (%s) still exists", compId.ResourceId) }