Skip to content

Commit

Permalink
[ISSUE-112] Add total_shards_per_node setting to allocate in ILM (#120)
Browse files Browse the repository at this point in the history
* resolve typo in var name

* add total_shards_per_node setting

* re-format (never-ending spaces vs tabs fiasco)

* remove index_test entry

* re-add test for setting via index

* setting only supported from ES v7.16

* update docs

* Test total_shards_per_node only when supported

* Specify the default for total_shards_per_node in read when it's not present in the API response

Co-authored-by: Toby Brain <[email protected]>
  • Loading branch information
RobsonSutton and tobio authored Nov 10, 2022
1 parent 671c914 commit d150b45
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Add `elasticstack_elasticsearch_script` resource ([#173](https://github.com/elastic/terraform-provider-elasticstack/pull/173))
- Add `elasticstack_elasticsearch_security_role` data source ([#177](https://github.com/elastic/terraform-provider-elasticstack/pull/177))
- Add `elasticstack_elasticsearch_security_role_mapping` data source ([#178](https://github.com/elastic/terraform-provider-elasticstack/pull/178))
- Apply `total_shards_per_node` setting in `allocate` action in ILM. Supported from Elasticsearch version **7.16** ([#112](https://github.com/elastic/terraform-provider-elasticstack/issues/112))

### Fixed
- Remove unnecessary unsetting id on delete ([#174](https://github.com/elastic/terraform-provider-elasticstack/pull/174))
Expand Down
7 changes: 4 additions & 3 deletions docs/resources/elasticsearch_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ resource "elasticstack_elasticsearch_index" "my_index" {
}
})
number_of_shards = 1
number_of_replicas = 2
search_idle_after = "20s"
number_of_shards = 1
number_of_replicas = 2
search_idle_after = "20s"
total_shards_per_node = 200
}
```

Expand Down
5 changes: 4 additions & 1 deletion docs/resources/elasticsearch_index_lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" {
exclude = jsonencode({
box_type = "hot"
})
number_of_replicas = 1
number_of_replicas = 1
total_shards_per_node = 200
}
}
Expand Down Expand Up @@ -97,6 +98,7 @@ Optional:
- `include` (String) Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.
- `number_of_replicas` (Number) Number of replicas to assign to the index. Default: `0`
- `require` (String) Assigns an index to nodes that have all of the specified custom attributes. Must be valid JSON document.
- `total_shards_per_node` (Number) The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**


<a id="nestedblock--cold--freeze"></a>
Expand Down Expand Up @@ -319,6 +321,7 @@ Optional:
- `include` (String) Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.
- `number_of_replicas` (Number) Number of replicas to assign to the index. Default: `0`
- `require` (String) Assigns an index to nodes that have all of the specified custom attributes. Must be valid JSON document.
- `total_shards_per_node` (Number) The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**


<a id="nestedblock--warm--forcemerge"></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ resource "elasticstack_elasticsearch_index" "my_index" {
}
})

number_of_shards = 1
number_of_replicas = 2
search_idle_after = "20s"
number_of_shards = 1
number_of_replicas = 2
search_idle_after = "20s"
total_shards_per_node = 200
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" {
exclude = jsonencode({
box_type = "hot"
})
number_of_replicas = 1
number_of_replicas = 1
total_shards_per_node = 200
}
}

Expand Down
28 changes: 27 additions & 1 deletion internal/clients/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/logging"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (a *ApiClient) ID(ctx context.Context, resourceId string) (*CompositeId, di
return &CompositeId{*clusterId, resourceId}, diags
}

func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
func (a *ApiClient) serverInfo(ctx context.Context) (map[string]interface{}, diag.Diagnostics) {
var diags diag.Diagnostics
res, err := a.es.Info(a.es.Info.WithContext(ctx))
if err != nil {
Expand All @@ -213,6 +214,31 @@ func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
return nil, diag.FromErr(err)
}

return info, diags
}

func (a *ApiClient) ServerVersion(ctx context.Context) (*version.Version, diag.Diagnostics) {
info, diags := a.serverInfo(ctx)
if diags.HasError() {
return nil, diags
}

rawVersion := info["version"].(map[string]interface{})["number"].(string)
serverVersion, err := version.NewVersion(rawVersion)
if err != nil {
return nil, diag.FromErr(err)
}

return serverVersion, nil
}

func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
info, diags := a.serverInfo(ctx)
if diags.HasError() {
return nil, diags
}

if uuid := info["cluster_uuid"].(string); uuid != "" && uuid != "_na_" {
tflog.Trace(ctx, fmt.Sprintf("cluster UUID: %s", uuid))
return &uuid, diags
Expand Down
82 changes: 59 additions & 23 deletions internal/elasticsearch/index/ilm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/models"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -112,7 +113,7 @@ func ResourceIlm() *schema.Resource {
}
}

var suportedActions = map[string]*schema.Schema{
var supportedActions = map[string]*schema.Schema{
"allocate": {
Description: "Updates the index settings to change which nodes are allowed to host the index shards and change the number of replicas.",
Type: schema.TypeList,
Expand All @@ -126,6 +127,12 @@ var suportedActions = map[string]*schema.Schema{
Optional: true,
Default: 0,
},
"total_shards_per_node": {
Description: "The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**",
Type: schema.TypeInt,
Optional: true,
Default: -1,
},
"include": {
Description: "Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.",
Type: schema.TypeString,
Expand Down Expand Up @@ -361,7 +368,7 @@ var suportedActions = map[string]*schema.Schema{
func getSchema(actions ...string) map[string]*schema.Schema {
sch := make(map[string]*schema.Schema)
for _, a := range actions {
if action, ok := suportedActions[a]; ok {
if action, ok := supportedActions[a]; ok {
sch[a] = action
}
}
Expand All @@ -387,7 +394,12 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{
return diags
}

policy, diags := expandIlmPolicy(d)
serverVersion, diags := client.ServerVersion(ctx)
if diags.HasError() {
return diags
}

policy, diags := expandIlmPolicy(d, serverVersion)
if diags.HasError() {
return diags
}
Expand All @@ -401,7 +413,7 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{
return resourceIlmRead(ctx, d, meta)
}

func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics) {
func expandIlmPolicy(d *schema.ResourceData, serverVersion *version.Version) (*models.Policy, diag.Diagnostics) {
var diags diag.Diagnostics
var policy models.Policy
phases := make(map[string]models.Phase)
Expand All @@ -418,7 +430,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics)

for _, ph := range supportedIlmPhases {
if v, ok := d.GetOk(ph); ok {
phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d)
phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d, serverVersion)
if diags.HasError() {
return nil, diags
}
Expand All @@ -430,7 +442,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics)
return &policy, diags
}

func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phase, diag.Diagnostics) {
func expandPhase(p map[string]interface{}, d *schema.ResourceData, serverVersion *version.Version) (*models.Phase, diag.Diagnostics) {
var diags diag.Diagnostics
var phase models.Phase

Expand All @@ -444,44 +456,44 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas
if a := action.([]interface{}); len(a) > 0 {
switch actionName {
case "allocate":
actions[actionName], diags = expandAction(a, "number_of_replicas", "include", "exclude", "require")
actions[actionName], diags = expandAction(a, serverVersion, "number_of_replicas", "total_shards_per_node", "include", "exclude", "require")
case "delete":
actions[actionName], diags = expandAction(a, "delete_searchable_snapshot")
actions[actionName], diags = expandAction(a, serverVersion, "delete_searchable_snapshot")
case "forcemerge":
actions[actionName], diags = expandAction(a, "max_num_segments", "index_codec")
actions[actionName], diags = expandAction(a, serverVersion, "max_num_segments", "index_codec")
case "freeze":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "migrate":
actions[actionName], diags = expandAction(a, "enabled")
actions[actionName], diags = expandAction(a, serverVersion, "enabled")
case "readonly":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "rollover":
actions[actionName], diags = expandAction(a, "max_age", "max_docs", "max_size", "max_primary_shard_size")
actions[actionName], diags = expandAction(a, serverVersion, "max_age", "max_docs", "max_size", "max_primary_shard_size")
case "searchable_snapshot":
actions[actionName], diags = expandAction(a, "snapshot_repository", "force_merge_index")
actions[actionName], diags = expandAction(a, serverVersion, "snapshot_repository", "force_merge_index")
case "set_priority":
actions[actionName], diags = expandAction(a, "priority")
actions[actionName], diags = expandAction(a, serverVersion, "priority")
case "shrink":
actions[actionName], diags = expandAction(a, "number_of_shards", "max_primary_shard_size")
actions[actionName], diags = expandAction(a, serverVersion, "number_of_shards", "max_primary_shard_size")
case "unfollow":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "wait_for_snapshot":
actions[actionName], diags = expandAction(a, "policy")
actions[actionName], diags = expandAction(a, serverVersion, "policy")
default:
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Expand All @@ -497,17 +509,35 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas
return &phase, diags
}

func expandAction(a []interface{}, settings ...string) (map[string]interface{}, diag.Diagnostics) {
var ilmActionSettingOptions = map[string]struct {
skipEmptyCheck bool
def interface{}
minVersion *version.Version
}{
"number_of_replicas": {skipEmptyCheck: true},
"total_shards_per_node": {skipEmptyCheck: true, def: -1, minVersion: version.Must(version.NewVersion("7.16.0"))},
"priority": {skipEmptyCheck: true},
}

func expandAction(a []interface{}, serverVersion *version.Version, settings ...string) (map[string]interface{}, diag.Diagnostics) {
var diags diag.Diagnostics
def := make(map[string]interface{})

// can be zero, so we must skip the empty check
settingsToSkip := map[string]struct{}{"number_of_replicas": {}, "priority": {}}

if action := a[0]; action != nil {
for _, setting := range settings {
if v, ok := action.(map[string]interface{})[setting]; ok && v != nil {
if _, ok := settingsToSkip[setting]; ok || !utils.IsEmpty(v) {
options := ilmActionSettingOptions[setting]

if options.minVersion != nil && options.minVersion.GreaterThan(serverVersion) {
if v != options.def {
return nil, diag.Errorf("[%s] is not supported in the target Elasticsearch server. Remove the setting from your module definition or set it to the default [%s] value", setting, options.def)
}

// This setting is not supported, and shouldn't be set in the ILM policy object
continue
}

if options.skipEmptyCheck || !utils.IsEmpty(v) {
// these 3 fields must be treated as JSON objects
if setting == "include" || setting == "exclude" || setting == "require" {
res := make(map[string]interface{})
Expand Down Expand Up @@ -618,6 +648,12 @@ func flattenPhase(phaseName string, p models.Phase, d *schema.ResourceData) (int
if v, ok := action["number_of_replicas"]; ok {
allocateAction["number_of_replicas"] = v
}
if v, ok := action["total_shards_per_node"]; ok {
allocateAction["total_shards_per_node"] = v
} else {
// Specify the default for total_shards_per_node. This avoids an endless diff loop for ES 7.15 or lower which don't support this setting
allocateAction["total_shards_per_node"] = -1
}
for _, f := range []string{"include", "require", "exclude"} {
if v, ok := action[f]; ok {
res, err := json.Marshal(v)
Expand Down
Loading

0 comments on commit d150b45

Please sign in to comment.