Skip to content

Commit

Permalink
Specify the default for total_shards_per_node in read when it's not p…
Browse files Browse the repository at this point in the history
…resent in the API response
  • Loading branch information
tobio committed Nov 10, 2022
1 parent 85a3ba3 commit 31edfe2
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 48 deletions.
28 changes: 27 additions & 1 deletion internal/clients/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/logging"
Expand Down Expand Up @@ -198,7 +199,7 @@ func (a *ApiClient) ID(ctx context.Context, resourceId string) (*CompositeId, di
return &CompositeId{*clusterId, resourceId}, diags
}

func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
func (a *ApiClient) serverInfo(ctx context.Context) (map[string]interface{}, diag.Diagnostics) {
var diags diag.Diagnostics
res, err := a.es.Info(a.es.Info.WithContext(ctx))
if err != nil {
Expand All @@ -213,6 +214,31 @@ func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
return nil, diag.FromErr(err)
}

return info, diags
}

func (a *ApiClient) ServerVersion(ctx context.Context) (*version.Version, diag.Diagnostics) {
info, diags := a.serverInfo(ctx)
if diags.HasError() {
return nil, diags
}

rawVersion := info["version"].(map[string]interface{})["number"].(string)
serverVersion, err := version.NewVersion(rawVersion)
if err != nil {
return nil, diag.FromErr(err)
}

return serverVersion, nil
}

func (a *ApiClient) ClusterID(ctx context.Context) (*string, diag.Diagnostics) {
info, diags := a.serverInfo(ctx)
if diags.HasError() {
return nil, diags
}

if uuid := info["cluster_uuid"].(string); uuid != "" && uuid != "_na_" {
tflog.Trace(ctx, fmt.Sprintf("cluster UUID: %s", uuid))
return &uuid, diags
Expand Down
84 changes: 62 additions & 22 deletions internal/elasticsearch/index/ilm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/terraform-provider-elasticstack/internal/clients"
"github.com/elastic/terraform-provider-elasticstack/internal/models"
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
"github.com/hashicorp/go-version"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -127,10 +128,23 @@ var supportedActions = map[string]*schema.Schema{
Default: 0,
},
"total_shards_per_node": {
Description: "The maximum number of shards for the index on a single Elasticsearch node. Defaults to `-1` (unlimited). Supported from Elasticsearch version **7.16**",
Description: "The maximum number of shards for the index on a single Elasticsearch node. Supported from Elasticsearch version **7.16**",
Type: schema.TypeInt,
Optional: true,
Default: -1,
// DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool {
// oldInt, err := strconv.Atoi(oldValue)
// if err != nil {
// return false
// }

// newInt, err := strconv.Atoi(newValue)
// if err != nil {
// return false
// }

// return oldInt == -1 && newInt == 0
// },
},
"include": {
Description: "Assigns an index to nodes that have at least one of the specified custom attributes. Must be valid JSON document.",
Expand Down Expand Up @@ -393,7 +407,12 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{
return diags
}

policy, diags := expandIlmPolicy(d)
serverVersion, diags := client.ServerVersion(ctx)
if diags.HasError() {
return diags
}

policy, diags := expandIlmPolicy(d, serverVersion)
if diags.HasError() {
return diags
}
Expand All @@ -407,7 +426,7 @@ func resourceIlmPut(ctx context.Context, d *schema.ResourceData, meta interface{
return resourceIlmRead(ctx, d, meta)
}

func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics) {
func expandIlmPolicy(d *schema.ResourceData, serverVersion *version.Version) (*models.Policy, diag.Diagnostics) {
var diags diag.Diagnostics
var policy models.Policy
phases := make(map[string]models.Phase)
Expand All @@ -424,7 +443,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics)

for _, ph := range supportedIlmPhases {
if v, ok := d.GetOk(ph); ok {
phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d)
phase, diags := expandPhase(v.([]interface{})[0].(map[string]interface{}), d, serverVersion)
if diags.HasError() {
return nil, diags
}
Expand All @@ -436,7 +455,7 @@ func expandIlmPolicy(d *schema.ResourceData) (*models.Policy, diag.Diagnostics)
return &policy, diags
}

func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phase, diag.Diagnostics) {
func expandPhase(p map[string]interface{}, d *schema.ResourceData, serverVersion *version.Version) (*models.Phase, diag.Diagnostics) {
var diags diag.Diagnostics
var phase models.Phase

Expand All @@ -450,44 +469,44 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas
if a := action.([]interface{}); len(a) > 0 {
switch actionName {
case "allocate":
actions[actionName], diags = expandAction(a, "number_of_replicas", "total_shards_per_node", "include", "exclude", "require")
actions[actionName], diags = expandAction(a, serverVersion, "number_of_replicas", "total_shards_per_node", "include", "exclude", "require")
case "delete":
actions[actionName], diags = expandAction(a, "delete_searchable_snapshot")
actions[actionName], diags = expandAction(a, serverVersion, "delete_searchable_snapshot")
case "forcemerge":
actions[actionName], diags = expandAction(a, "max_num_segments", "index_codec")
actions[actionName], diags = expandAction(a, serverVersion, "max_num_segments", "index_codec")
case "freeze":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "migrate":
actions[actionName], diags = expandAction(a, "enabled")
actions[actionName], diags = expandAction(a, serverVersion, "enabled")
case "readonly":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "rollover":
actions[actionName], diags = expandAction(a, "max_age", "max_docs", "max_size", "max_primary_shard_size")
actions[actionName], diags = expandAction(a, serverVersion, "max_age", "max_docs", "max_size", "max_primary_shard_size")
case "searchable_snapshot":
actions[actionName], diags = expandAction(a, "snapshot_repository", "force_merge_index")
actions[actionName], diags = expandAction(a, serverVersion, "snapshot_repository", "force_merge_index")
case "set_priority":
actions[actionName], diags = expandAction(a, "priority")
actions[actionName], diags = expandAction(a, serverVersion, "priority")
case "shrink":
actions[actionName], diags = expandAction(a, "number_of_shards", "max_primary_shard_size")
actions[actionName], diags = expandAction(a, serverVersion, "number_of_shards", "max_primary_shard_size")
case "unfollow":
if a[0] != nil {
ac := a[0].(map[string]interface{})
if ac["enabled"].(bool) {
actions[actionName], diags = expandAction(a)
actions[actionName], diags = expandAction(a, serverVersion)
}
}
case "wait_for_snapshot":
actions[actionName], diags = expandAction(a, "policy")
actions[actionName], diags = expandAction(a, serverVersion, "policy")
default:
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Expand All @@ -503,17 +522,35 @@ func expandPhase(p map[string]interface{}, d *schema.ResourceData) (*models.Phas
return &phase, diags
}

func expandAction(a []interface{}, settings ...string) (map[string]interface{}, diag.Diagnostics) {
var ilmActionSettingOptions = map[string]struct {
skipEmptyCheck bool
def interface{}
minVersion *version.Version
}{
"number_of_replicas": {skipEmptyCheck: true},
"total_shards_per_node": {skipEmptyCheck: true, def: -1, minVersion: version.Must(version.NewVersion("7.16.0"))},
"priority": {skipEmptyCheck: true},
}

func expandAction(a []interface{}, serverVersion *version.Version, settings ...string) (map[string]interface{}, diag.Diagnostics) {
var diags diag.Diagnostics
def := make(map[string]interface{})

// can be zero, so we must skip the empty check
settingsToSkip := map[string]struct{}{"number_of_replicas": {}, "priority": {}, "total_shards_per_node": {}}

if action := a[0]; action != nil {
for _, setting := range settings {
if v, ok := action.(map[string]interface{})[setting]; ok && v != nil {
if _, ok := settingsToSkip[setting]; ok || !utils.IsEmpty(v) {
options := ilmActionSettingOptions[setting]

if options.minVersion != nil && options.minVersion.GreaterThan(serverVersion) {
if v != options.def {
return nil, diag.Errorf("[%s] is not supported in the target Elasticsearch server. Remove the setting from your module definition or set it to the default [%s] value", setting, options.def)
}

// This setting is not supported, and shouldn't be set in the ILM policy object
continue
}

if options.skipEmptyCheck || !utils.IsEmpty(v) {
// these 3 fields must be treated as JSON objects
if setting == "include" || setting == "exclude" || setting == "require" {
res := make(map[string]interface{})
Expand Down Expand Up @@ -626,6 +663,9 @@ func flattenPhase(phaseName string, p models.Phase, d *schema.ResourceData) (int
}
if v, ok := action["total_shards_per_node"]; ok {
allocateAction["total_shards_per_node"] = v
} else {
// Specify the default for total_shards_per_node. This avoids an endless diff loop for ES 7.15 or lower which don't support this setting
allocateAction["total_shards_per_node"] = -1
}
for _, f := range []string{"include", "require", "exclude"} {
if v, ok := action[f]; ok {
Expand Down
29 changes: 4 additions & 25 deletions internal/elasticsearch/index/ilm_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package index_test

import (
"encoding/json"
"context"
"fmt"
"io"
"testing"

"github.com/elastic/terraform-provider-elasticstack/internal/acctest"
Expand Down Expand Up @@ -80,29 +79,9 @@ func TestAccResourceILM(t *testing.T) {
}
func serverVersionLessThanTotalShardsPerNodeLimit() (bool, error) {
client := acctest.ApiClient()
res, err := client.GetESClient().Info()

if err != nil {
return false, err
}

defer res.Body.Close()

if res.IsError() {
body, err := io.ReadAll(res.Body)
return false, fmt.Errorf("failed to check elasticsearch version %s %s", err, body)
}

var body map[string]interface{}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&body); err != nil {
return false, fmt.Errorf("failed to parse the elasticsearch info body %w", err)
}

rawVersion := body["version"].(map[string]interface{})["number"].(string)
serverVersion, err := version.NewVersion(rawVersion)
if err != nil {
return false, fmt.Errorf("failed to parse the elasticsearch version %w", err)
serverVersion, diags := client.ServerVersion(context.Background())
if diags.HasError() {
return false, fmt.Errorf("failed to parse the elasticsearch version %v", diags)
}

return serverVersion.LessThan(totalShardsPerNodeVersionLimit), nil
Expand Down

0 comments on commit 31edfe2

Please sign in to comment.