Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SRE-1150 Additional Monitoring for ES-serviced #3824

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions isvcs/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ESRed ESHealth = iota
ESYellow
ESGreen
NODES_HEALTH_CHECK_NAME = "ESNodesHealth"
)

type ESHealth int
Expand Down Expand Up @@ -81,9 +82,16 @@ func initElasticSearch() {
Timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
}

nodesHealthCheck := healthCheckDefinition{
healthCheck: esNodesHealthCheck(getHostIp(elasticsearch_servicedPortBinding), 9200, ESYellow),
Interval: DEFAULT_HEALTHCHECK_INTERVAL,
Timeout: DEFAULT_HEALTHCHECK_TIMEOUT,
}

healthChecks := []map[string]healthCheckDefinition{
{
DEFAULT_HEALTHCHECK_NAME: defaultHealthCheck,
NODES_HEALTH_CHECK_NAME: nodesHealthCheck,
},
}

Expand Down Expand Up @@ -241,6 +249,37 @@ func getESHealth(url string) <-chan esres {
return esresC
}

func esNodesHealthCheck(host string, port int, minHealth ESHealth) HealthCheckFunction {
return func(cancel <-chan struct{}) error {
url := fmt.Sprintf("http://%s:%d/_nodes/stats", host, port)
log := log.WithFields(logrus.Fields{
"url": url,
"minhealth": minHealth,
})
var r esres
for {
select {
case r = <-getESHealth(url):
if r.err != nil {
log.WithError(r.err).Debugf("Unable to check Elastic Nodes health: %s", r.err)
break
}
if status := int(r.response["_nodes"].(map[string]interface{})["failed"].(float64)); status > 0 {
log.WithFields(logrus.Fields{
"_nodes": r.response,
}).Warn("Elastic Nodes health reported below minimum")
break
}
return nil
case <-cancel:
log.Debug("Canceled health check for Elastic Nodes")
return nil
}
time.Sleep(time.Second)
}
}
}

func esHealthCheck(host string, port int, minHealth ESHealth) HealthCheckFunction {
return func(cancel <-chan struct{}) error {
url := fmt.Sprintf("http://%s:%d/_cluster/health", host, port)
Expand All @@ -258,16 +297,21 @@ func esHealthCheck(host string, port int, minHealth ESHealth) HealthCheckFunctio
}
if status := GetHealth(r.response["status"].(string)); status < minHealth {
log.WithFields(logrus.Fields{
"reported": r.response["status"],
"cluster_name": r.response["cluster_name"],
"timed_out": r.response["timed_out"],
"number_of_nodes": r.response["number_of_nodes"],
"number_of_data_nodes": r.response["number_of_data_nodes"],
"active_primary_shards": r.response["active_primary_shards"],
"active_shards": r.response["active_shards"],
"relocating_shards": r.response["relocating_shards"],
"initializing_shards": r.response["initializing_shards"],
"unassigned_shards": r.response["unassigned_shards"],
"reported": r.response["status"],
"cluster_name": r.response["cluster_name"],
"timed_out": r.response["timed_out"],
"number_of_nodes": r.response["number_of_nodes"],
"number_of_data_nodes": r.response["number_of_data_nodes"],
"active_primary_shards": r.response["active_primary_shards"],
"active_shards": r.response["active_shards"],
"relocating_shards": r.response["relocating_shards"],
"initializing_shards": r.response["initializing_shards"],
"unassigned_shards": r.response["unassigned_shards"],
"delayed_unassigned_shards": r.response["delayed_unassigned_shards"],
"number_of_pending_tasks": r.response["number_of_pending_tasks"],
"number_of_in_flight_fetch": r.response["number_of_in_flight_fetch"],
"task_max_waiting_in_queue_millis": r.response["task_max_waiting_in_queue_millis"],
"active_shards_percent_as_number": r.response["active_shards_percent_as_number"],
}).Warn("Elastic health reported below minimum")
break
}
Expand Down
128 changes: 121 additions & 7 deletions isvcs/esstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,21 @@ import (
var esStore = NewElasticSearchStatsStore()

type ElasticSearchStats struct {
Address string
gc_young_count int
gc_young_time float64
gc_old_count int
gc_old_time float64
threads int
Address string
gc_young_count int
gc_young_time float64
gc_old_count int
gc_old_time float64
threads int
thread_write_queue int
thread_write_rejected int
process_open_file_descriptors int
process_max_file_descriptors int
fs_io_stats_operations int
fs_io_stats_read_kilobytes int
fs_io_stats_read_operations int
fs_io_stats_write_kilobytes int
fs_io_stats_write_operations int
}

type ElasticSearchStatsCache struct {
Expand Down Expand Up @@ -178,6 +187,88 @@ func writeESToOpenTSDB(stats []ElasticSearchStats) {
)

metrics = append(metrics, gcYoungCountMetric)

thread_write_queueMetric := newElasticSearchMetric(
"isvcs.thread.write.queue",
strconv.Itoa(s.thread_write_queue),
t.Unix(),
s.Address,
)

metrics = append(metrics, thread_write_queueMetric)

thread_write_rejectedMetric := newElasticSearchMetric(
"isvcs.thread.write.rejected",
strconv.Itoa(s.thread_write_rejected),
t.Unix(),
s.Address,
)

metrics = append(metrics, thread_write_rejectedMetric)

process_open_file_descriptorsMetric := newElasticSearchMetric(
"isvcs.process.open_file_descriptors",
strconv.Itoa(s.process_open_file_descriptors),
t.Unix(),
s.Address,
)

metrics = append(metrics, process_open_file_descriptorsMetric)

process_max_file_descriptorsMetric := newElasticSearchMetric(
"isvcs.process.max_file_descriptors",
strconv.Itoa(s.process_max_file_descriptors),
t.Unix(),
s.Address,
)

metrics = append(metrics, process_max_file_descriptorsMetric)

fs_io_stats_operationsMetric := newElasticSearchMetric(
"isvcs.fs.io_stats.operations",
strconv.Itoa(s.fs_io_stats_operations),
t.Unix(),
s.Address,
)

metrics = append(metrics, fs_io_stats_operationsMetric)

fs_io_stats_read_kilobytesMetric := newElasticSearchMetric(
"isvcs.fs.io_stats.read_kilobytes",
strconv.Itoa(s.fs_io_stats_read_kilobytes),
t.Unix(),
s.Address,
)

metrics = append(metrics, fs_io_stats_read_kilobytesMetric)

fs_io_stats_read_operationsMetric := newElasticSearchMetric(
"isvcs.fs.io_stats.read_operations",
strconv.Itoa(s.fs_io_stats_read_operations),
t.Unix(),
s.Address,
)

metrics = append(metrics, fs_io_stats_read_operationsMetric)

fs_io_stats_write_kilobytesMetric := newElasticSearchMetric(
"isvcs.fs.io_stats.write_kilobytes",
strconv.Itoa(s.fs_io_stats_write_kilobytes),
t.Unix(),
s.Address,
)

metrics = append(metrics, fs_io_stats_write_kilobytesMetric)

fs_io_stats_write_operationsMetric := newElasticSearchMetric(
"isvcs.fs.io_stats.write_operations",
strconv.Itoa(s.fs_io_stats_write_operations),
t.Unix(),
s.Address,
)

metrics = append(metrics, fs_io_stats_write_operationsMetric)

}

err := postDataToOpenTSDB(metrics)
Expand All @@ -192,7 +283,7 @@ func queryElasticSearchStats(address string) ElasticSearchStats {
logger := log.WithField("elasticsearch_address", address)
stats := ElasticSearchStats{Address: address}

resp, err := http.Get(address + "/_nodes/stats/jvm")
resp, err := http.Get(address + "/_nodes/_all/stats")
if err != nil {
logger.WithError(err).Warn("Unable to get ElasticSearch stats")
return stats
Expand All @@ -208,6 +299,29 @@ func queryElasticSearchStats(address string) ElasticSearchStats {
json.Unmarshal([]byte(body), &result)
nodes := result["nodes"].(map[string]interface{})
for _, value := range nodes {
// FS io_stats
fs := value.(map[string]interface{})["fs"].(map[string]interface{})
if io_stats, found := fs["io_stats"]; found {
total := io_stats.(map[string]interface{})["total"].(map[string]interface{})
stats.fs_io_stats_operations += int(total["operations"].(float64))
stats.fs_io_stats_read_kilobytes += int(total["read_kilobytes"].(float64))
stats.fs_io_stats_read_operations += int(total["read_operations"].(float64))
stats.fs_io_stats_write_kilobytes += int(total["write_kilobytes"].(float64))
stats.fs_io_stats_write_operations += int(total["write_operations"].(float64))
}

// open_file_descriptors & max_file_descriptors
process := value.(map[string]interface{})["process"].(map[string]interface{})
stats.process_open_file_descriptors += int(process["open_file_descriptors"].(float64))
stats.process_max_file_descriptors += int(process["max_file_descriptors"].(float64))

// thread_pool['write'] queue & rejected
thread_pool := value.(map[string]interface{})["thread_pool"].(map[string]interface{})
if _, found := thread_pool["write"]; found {
stats.thread_write_queue += int(thread_pool["write"].(map[string]interface{})["queue"].(float64))
stats.thread_write_rejected += int(thread_pool["write"].(map[string]interface{})["rejected"].(float64))
}

jvm_metrics := value.(map[string]interface{})["jvm"].(map[string]interface{})
thread_count := jvm_metrics["threads"].(map[string]interface{})["count"].(float64)
stats.threads += int(thread_count)
Expand Down
Loading