Skip to content

Commit

Permalink
Support slow logs in cluster settings (opensearch-project#211)
Browse files Browse the repository at this point in the history
Signed-off-by: Prudhvi Godithi <[email protected]>
  • Loading branch information
prudhvigodithi authored Sep 16, 2024
1 parent 3e84564 commit d86d6a3
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-
- name: Run Docker containers
run: docker-compose up --detach
run: docker compose up --detach
env:
OSS_IMAGE: "${{ matrix.oss-image }}:${{ matrix.version }}"
OS_COMMAND: "${{matrix.OS_COMMAND}}"
Expand Down
2 changes: 2 additions & 0 deletions docs/data-sources/host.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ data "opensearch_host" "test" {

- `id` (String) The ID of this resource.
- `url` (String) the url of the active cluster


2 changes: 2 additions & 0 deletions docs/resources/anomaly_detection.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@ EOF
### Read-Only

- `id` (String) The ID of this resource.


8 changes: 8 additions & 0 deletions docs/resources/cluster_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ resource "opensearch_cluster_settings" "global" {
- `cluster_persistent_tasks_allocation_recheck_interval` (String) A time string controling how often assignment checks are performed to react to whether persistent tasks can be assigned to nodes
- `cluster_routing_allocation_allow_rebalance` (String) Specify when shard rebalancing is allowed (always, indices_primaries_active, indices_all_active)
- `cluster_routing_allocation_awareness_attributes` (String) Use custom node attributes to take hardware configuration into account when allocating shards
- `cluster_routing_allocation_awareness_force_zone_values` (List of String) A list of zones for awareness allocation.
- `cluster_routing_allocation_balance_index` (Number) Weight factor for the number of shards per index allocated on a node, increasing this raises the tendency to equalize the number of shards per index across all nodes
- `cluster_routing_allocation_balance_shard` (Number) Weight factor for the total number of shards allocated on a node, increasing this raises the tendency to equalize the number of shards across all nodes
- `cluster_routing_allocation_balance_threshold` (Number) Minimal optimization value of operations that should be performed, raising this will cause the cluster to be less aggressive about optimizing the shard balance
Expand All @@ -53,6 +54,11 @@ resource "opensearch_cluster_settings" "global" {
- `cluster_routing_allocation_same_shard_host` (Boolean) Perform a check to prevent allocation of multiple instances of the same shard on a single host, if multiple nodes are started on the host
- `cluster_routing_allocation_total_shards_per_node` (Number) Maximum number of primary and replica shards allocated to each node
- `cluster_routing_rebalance_enable` (String) Allow rebalancing for specific kinds of shards (all, primaries, replicas, none)
- `cluster_search_request_slowlog_level` (String) Log level for search requests slowlog (TRACE, DEBUG, INFO, WARN)
- `cluster_search_request_slowlog_threshold_debug` (String) Slowlog threshold for DEBUG level search requests (e.g., 2s)
- `cluster_search_request_slowlog_threshold_info` (String) Slowlog threshold for INFO level search requests (e.g., 5s)
- `cluster_search_request_slowlog_threshold_trace` (String) Slowlog threshold for TRACE level search requests (e.g., 10ms)
- `cluster_search_request_slowlog_threshold_warn` (String) Slowlog threshold for WARN level search requests (e.g., 10s)
- `indices_breaker_fielddata_limit` (String) The percentage of memory above which if loading a field into the field data cache would cause the cache to exceed this limit, an error is returned
- `indices_breaker_fielddata_overhead` (Number) A constant that all field data estimations are multiplied by
- `indices_breaker_request_limit` (String) The percentabge of memory above which per-request data structures (e.g. calculating aggregations) are prevented from exceeding
Expand All @@ -67,3 +73,5 @@ resource "opensearch_cluster_settings" "global" {
### Read-Only

- `id` (String) The ID of this resource.


2 changes: 2 additions & 0 deletions docs/resources/dashboard_object.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ EOF
### Read-Only

- `id` (String) The ID of this resource.


2 changes: 2 additions & 0 deletions docs/resources/data_stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ resource "opensearch_data_stream" "foo" {
### Read-Only

- `id` (String) The ID of this resource.


25 changes: 25 additions & 0 deletions examples/cluster-settings/provider.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Configure the OpenSearch provider
terraform {
required_providers {
opensearch = {
source = "registry.terraform.io/opensearch-project/opensearch"
}
}
}

provider "opensearch" {
url = "http://127.0.0.1:9200"
username = "admin"
password = "myStrongPassword123@456"
}

resource "opensearch_cluster_settings" "persistent" {
cluster_max_shards_per_node = 10
cluster_search_request_slowlog_level = "WARN"
cluster_search_request_slowlog_threshold_warn = "10s"
cluster_search_request_slowlog_threshold_info = "5s"
cluster_search_request_slowlog_threshold_debug = "2s"
cluster_search_request_slowlog_threshold_trace = "100ms"
cluster_routing_allocation_awareness_attributes = "zone"
cluster_routing_allocation_awareness_force_zone_values = ["zoneA", "zoneB"]
}
111 changes: 87 additions & 24 deletions provider/resource_opensearch_cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"reflect"
"regexp"
"strconv"
Expand Down Expand Up @@ -36,6 +35,11 @@ var (
"search.default_search_timeout",
"action.auto_create_index",
"cluster.routing.allocation.enable",
"cluster.search.request.slowlog.level",
"cluster.search.request.slowlog.threshold.warn",
"cluster.search.request.slowlog.threshold.info",
"cluster.search.request.slowlog.threshold.debug",
"cluster.search.request.slowlog.threshold.trace",
}
intClusterSettings = []string{
"cluster.max_shards_per_node",
Expand Down Expand Up @@ -64,7 +68,10 @@ var (
"cluster.routing.allocation.same_shard.host",
"action.destructive_requires_name",
}
dynamicClusterSettings = concatStringSlice(stringClusterSettings, intClusterSettings, floatClusterSettings, boolClusterSettings)
typeListClusterSettings = []string{
"cluster.routing.allocation.awareness.force.zone.values",
}
dynamicClusterSettings = concatStringSlice(stringClusterSettings, intClusterSettings, floatClusterSettings, boolClusterSettings, typeListClusterSettings)
)

func resourceOpensearchClusterSettings() *schema.Resource {
Expand Down Expand Up @@ -125,6 +132,14 @@ func resourceOpensearchClusterSettings() *schema.Resource {
Optional: true,
Description: "Use custom node attributes to take hardware configuration into account when allocating shards",
},
"cluster_routing_allocation_awareness_force_zone_values": {
Type: schema.TypeList,
Optional: true,
Description: "A list of zones for awareness allocation.",
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"cluster_routing_allocation_balance_index": {
Type: schema.TypeFloat,
Optional: true,
Expand Down Expand Up @@ -170,6 +185,31 @@ func resourceOpensearchClusterSettings() *schema.Resource {
Optional: true,
Description: "Enable or disable allocation for specific kinds of shards (all, primaries, new_primaries, none)",
},
"cluster_search_request_slowlog_level": {
Type: schema.TypeString,
Optional: true,
Description: "Log level for search requests slowlog (TRACE, DEBUG, INFO, WARN)",
},
"cluster_search_request_slowlog_threshold_warn": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for WARN level search requests (e.g., 10s)",
},
"cluster_search_request_slowlog_threshold_info": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for INFO level search requests (e.g., 5s)",
},
"cluster_search_request_slowlog_threshold_debug": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for DEBUG level search requests (e.g., 2s)",
},
"cluster_search_request_slowlog_threshold_trace": {
Type: schema.TypeString,
Optional: true,
Description: "Slowlog threshold for TRACE level search requests (e.g., 10ms)",
},
"cluster_routing_allocation_node_concurrent_incoming_recoveries": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -401,46 +441,69 @@ func clusterSettingsFromResourceData(d *schema.ResourceData) map[string]interfac
for _, key := range dynamicClusterSettings {
schemaName := strings.Replace(key, ".", "_", -1)
if raw, ok := d.GetOk(schemaName); ok {
log.Printf("[INFO] clusterSettingsFromResourceData: key:%+v schemaName:%+v value:%+v, %+v", key, schemaName, raw, settings)
settings[key] = raw
if isTypeListSetting(key) {
if list, ok := raw.([]interface{}); ok {
settings[key] = convertListToSlice(list)
}
} else {
settings[key] = raw
}
}
}
return settings
}

func clusterResourceDataFromSettings(settings map[string]interface{}, d *schema.ResourceData) error {
log.Printf("[INFO] clusterResourceDataFromSettings: %+v", settings)
for _, key := range dynamicClusterSettings {
value, ok := settings[key]
if !ok {
continue
}

schemaName := strings.Replace(key, ".", "_", -1)
if containsString(intClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
var err error
value, err = strconv.Atoi(value.(string))
if err != nil {
return err
if isTypeListSetting(key) {
if list, ok := value.([]interface{}); ok {
if err := d.Set(schemaName, list); err != nil {
return fmt.Errorf("error setting %s: %s", schemaName, err)
}
}
} else if containsString(floatClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
} else {
var err error
value, err = strconv.ParseFloat(value.(string), 64)
if err != nil {
return err
if containsString(intClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.Atoi(value.(string))
if err != nil {
return fmt.Errorf("error converting %s to int: %s", key, err)
}
} else if containsString(floatClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.ParseFloat(value.(string), 64)
if err != nil {
return fmt.Errorf("error converting %s to float: %s", key, err)
}
} else if containsString(boolClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
value, err = strconv.ParseBool(value.(string))
if err != nil {
return fmt.Errorf("error converting %s to bool: %s", key, err)
}
}
} else if containsString(boolClusterSettings, key) && reflect.TypeOf(value).String() == "string" {
var err error
value, err = strconv.ParseBool(value.(string))
if err != nil {
return err

if err := d.Set(schemaName, value); err != nil {
return fmt.Errorf("error setting %s: %s", schemaName, err)
}
}
err := d.Set(schemaName, value)
if err != nil {
log.Printf("[ERROR] clusterResourceDataFromSettings: %+v", err)
return err
}
}
return nil
}

func isTypeListSetting(key string) bool {
return containsString(typeListClusterSettings, key)
}

func convertListToSlice(list []interface{}) []string {
var slice []string
for _, item := range list {
if str, ok := item.(string); ok {
slice = append(slice, str)
}
}
return slice
}
46 changes: 46 additions & 0 deletions provider/resource_opensearch_cluster_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,40 @@ func TestAccOpensearchClusterSettings(t *testing.T) {
})
}

func TestAccOpensearchClusterSettingsSlowLogs(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: checkOpensearchClusterSettingsDestroy,
Steps: []resource.TestStep{
{
Config: testAccOpensearchClusterSettingsSlowLog,
Check: resource.ComposeTestCheckFunc(
testCheckOpensearchClusterSettingInState("opensearch_cluster_settings.global"),
testCheckOpensearchClusterSettingExists("cluster.search.request.slowlog.level"),
),
},
},
})
}

func TestAccOpensearchClusterSettingsTypeList(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: checkOpensearchClusterSettingsDestroy,
Steps: []resource.TestStep{
{
Config: testAccOpensearchClusterSettingsTypeList,
Check: resource.ComposeTestCheckFunc(
testCheckOpensearchClusterSettingInState("opensearch_cluster_settings.global"),
testCheckOpensearchClusterSettingExists("cluster.routing.allocation.awareness.force.zone.values"),
),
},
},
})
}

func testCheckOpensearchClusterSettingInState(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[name]
Expand Down Expand Up @@ -89,3 +123,15 @@ resource "opensearch_cluster_settings" "global" {
action_auto_create_index = "my-index-000001,index10,-index1*,+ind*,-.aws_cold_catalog*,+*"
}
`

var testAccOpensearchClusterSettingsSlowLog = `
resource "opensearch_cluster_settings" "global" {
cluster_search_request_slowlog_level = "WARN"
cluster_search_request_slowlog_threshold_warn = "10s"
}
`
var testAccOpensearchClusterSettingsTypeList = `
resource "opensearch_cluster_settings" "global" {
cluster_routing_allocation_awareness_force_zone_values = ["zone1", "zone2", "zone3"]
}
`

0 comments on commit d86d6a3

Please sign in to comment.