From 064bc068f70bacec13af02f6ab74180186a98356 Mon Sep 17 00:00:00 2001 From: Ganga Mahesh Siddem Date: Wed, 16 Dec 2020 15:22:13 -0800 Subject: [PATCH] Gangams/fix rs ooming (#473) * optimize kpi * optimize kube node inventory * add flags for events, deployments and hpa * have separate function parseNodeLimits * refactor code * fix crash * fix bug with service name * fix bugs related to get service name * update oom fix test agent * debug logs * fix service label issue * update to latest agent and enable ephemeral annotation * change stream size to 200 from 250 * update yaml * adjust chunksizes * add ruby gc env * yaml changes for cioomtest11282020-3 * telemetry to track pods latency * service count telemetry * rename variables * wip * nodes inventory telemetry * configmap changes * add emit streams in configmap * yaml updates * fix copy and paste bug * add todo comments * fix node latency telemetry bug * update yaml with latest test image * fix bug * upping rs memory change * fix mdm bug with final emit stream * update to latest image * fix pr feedback * fix pr feedback * rename health config to agent config * fix max allowed hpa chunk size * update to use 1k pod chunk since validated on 1.18+ * remove debug logs * minor updates * move defaults to common place * chart updates * final oomfix agent * update to use prod image so that can be validated with build pipeline * fix typo in comment --- .../installer/datafiles/base_container.data | 2 +- .../scripts/tomlparser-agent-config.rb | 172 +++++ .../scripts/tomlparser-health-config.rb | 73 -- .../templates/omsagent-rs-configmap.yaml | 32 +- charts/azuremonitor-containers/values.yaml | 9 + kubernetes/linux/Dockerfile | 1 + kubernetes/linux/main.sh | 16 +- kubernetes/omsagent.yaml | 18 +- source/plugins/ruby/KubernetesApiClient.rb | 387 +++++----- source/plugins/ruby/in_kube_events.rb | 18 +- source/plugins/ruby/in_kube_nodes.rb | 410 ++++++---- source/plugins/ruby/in_kube_podinventory.rb | 717 ++++++++++-------- .../plugins/ruby/in_kubestate_deployments.rb | 424 ++++++----- source/plugins/ruby/in_kubestate_hpa.rb | 421 +++++----- 14 files changed, 1534 insertions(+), 1166 deletions(-) create mode 100644 build/linux/installer/scripts/tomlparser-agent-config.rb delete mode 100644 build/linux/installer/scripts/tomlparser-health-config.rb diff --git a/build/linux/installer/datafiles/base_container.data b/build/linux/installer/datafiles/base_container.data index ec42d5967..c680f0eea 100644 --- a/build/linux/installer/datafiles/base_container.data +++ b/build/linux/installer/datafiles/base_container.data @@ -123,7 +123,7 @@ MAINTAINER: 'Microsoft Corporation' /opt/tomlparser-mdm-metrics-config.rb; build/linux/installer/scripts/tomlparser-mdm-metrics-config.rb; 755; root; root /opt/tomlparser-metric-collection-config.rb; build/linux/installer/scripts/tomlparser-metric-collection-config.rb; 755; root; root -/opt/tomlparser-health-config.rb; build/linux/installer/scripts/tomlparser-health-config.rb; 755; root; root +/opt/tomlparser-agent-config.rb; build/linux/installer/scripts/tomlparser-agent-config.rb; 755; root; root /opt/tomlparser.rb; build/common/installer/scripts/tomlparser.rb; 755; root; root /opt/td-agent-bit-conf-customizer.rb; build/common/installer/scripts/td-agent-bit-conf-customizer.rb; 755; root; root /opt/ConfigParseErrorLogger.rb; build/common/installer/scripts/ConfigParseErrorLogger.rb; 755; root; root diff --git a/build/linux/installer/scripts/tomlparser-agent-config.rb b/build/linux/installer/scripts/tomlparser-agent-config.rb new file mode 100644 index 000000000..87c5194ed --- /dev/null +++ b/build/linux/installer/scripts/tomlparser-agent-config.rb @@ -0,0 +1,172 @@ +#!/usr/local/bin/ruby + +#this should be require relative in Linux and require in windows, since it is a gem install on windows +@os_type = ENV["OS_TYPE"] +if !@os_type.nil? && !@os_type.empty? && @os_type.strip.casecmp("windows") == 0 + require "tomlrb" +else + require_relative "tomlrb" +end + +require_relative "ConfigParseErrorLogger" + +@configMapMountPath = "/etc/config/settings/agent-settings" +@configSchemaVersion = "" +@enable_health_model = false + +# 250 Node items (15KB per node) account to approximately 4MB +@nodesChunkSize = 250 +# 1000 pods (10KB per pod) account to approximately 10MB +@podsChunkSize = 1000 +# 4000 events (1KB per event) account to approximately 4MB +@eventsChunkSize = 4000 +# roughly each deployment is 8k +# 500 deployments account to approximately 4MB +@deploymentsChunkSize = 500 +# roughly each HPA is 3k +# 2000 HPAs account to approximately 6-7MB +@hpaChunkSize = 2000 +# stream batch sizes to avoid large file writes +# too low will consume higher disk iops +@podsEmitStreamBatchSize = 200 +@nodesEmitStreamBatchSize = 100 + +# higher the chunk size rs pod memory consumption higher and lower api latency +# similarly lower the value, helps on the memory consumption but incurrs additional round trip latency +# these needs to be tuned be based on the workload +# nodes +@nodesChunkSizeMin = 100 +@nodesChunkSizeMax = 400 +# pods +@podsChunkSizeMin = 250 +@podsChunkSizeMax = 1500 +# events +@eventsChunkSizeMin = 2000 +@eventsChunkSizeMax = 10000 +# deployments +@deploymentsChunkSizeMin = 500 +@deploymentsChunkSizeMax = 1000 +# hpa +@hpaChunkSizeMin = 500 +@hpaChunkSizeMax = 2000 + +# emit stream sizes to prevent lower values which costs disk i/o +# max will be upto the chunk size +@podsEmitStreamBatchSizeMin = 50 +@nodesEmitStreamBatchSizeMin = 50 + +def is_number?(value) + true if Integer(value) rescue false +end + +# Use parser to parse the configmap toml file to a ruby structure +def parseConfigMap + begin + # Check to see if config map is created + if (File.file?(@configMapMountPath)) + puts "config::configmap container-azm-ms-agentconfig for agent settings mounted, parsing values" + parsedConfig = Tomlrb.load_file(@configMapMountPath, symbolize_keys: true) + puts "config::Successfully parsed mounted config map" + return parsedConfig + else + puts "config::configmap container-azm-ms-agentconfig for agent settings not mounted, using defaults" + return nil + end + rescue => errorStr + ConfigParseErrorLogger.logError("Exception while parsing config map for agent settings : #{errorStr}, using defaults, please check config map for errors") + return nil + end +end + +# Use the ruby structure created after config parsing to set the right values to be used as environment variables +def populateSettingValuesFromConfigMap(parsedConfig) + begin + if !parsedConfig.nil? && !parsedConfig[:agent_settings].nil? + if !parsedConfig[:agent_settings][:health_model].nil? && !parsedConfig[:agent_settings][:health_model][:enabled].nil? + @enable_health_model = parsedConfig[:agent_settings][:health_model][:enabled] + puts "enable_health_model = #{@enable_health_model}" + end + chunk_config = parsedConfig[:agent_settings][:chunk_config] + if !chunk_config.nil? + nodesChunkSize = chunk_config[:NODES_CHUNK_SIZE] + if !nodesChunkSize.nil? && is_number?(nodesChunkSize) && (@nodesChunkSizeMin..@nodesChunkSizeMax) === nodesChunkSize.to_i + @nodesChunkSize = nodesChunkSize.to_i + puts "Using config map value: NODES_CHUNK_SIZE = #{@nodesChunkSize}" + end + + podsChunkSize = chunk_config[:PODS_CHUNK_SIZE] + if !podsChunkSize.nil? && is_number?(podsChunkSize) && (@podsChunkSizeMin..@podsChunkSizeMax) === podsChunkSize.to_i + @podsChunkSize = podsChunkSize.to_i + puts "Using config map value: PODS_CHUNK_SIZE = #{@podsChunkSize}" + end + + eventsChunkSize = chunk_config[:EVENTS_CHUNK_SIZE] + if !eventsChunkSize.nil? && is_number?(eventsChunkSize) && (@eventsChunkSizeMin..@eventsChunkSizeMax) === eventsChunkSize.to_i + @eventsChunkSize = eventsChunkSize.to_i + puts "Using config map value: EVENTS_CHUNK_SIZE = #{@eventsChunkSize}" + end + + deploymentsChunkSize = chunk_config[:DEPLOYMENTS_CHUNK_SIZE] + if !deploymentsChunkSize.nil? && is_number?(deploymentsChunkSize) && (@deploymentsChunkSizeMin..@deploymentsChunkSizeMax) === deploymentsChunkSize.to_i + @deploymentsChunkSize = deploymentsChunkSize.to_i + puts "Using config map value: DEPLOYMENTS_CHUNK_SIZE = #{@deploymentsChunkSize}" + end + + hpaChunkSize = chunk_config[:HPA_CHUNK_SIZE] + if !hpaChunkSize.nil? && is_number?(hpaChunkSize) && (@hpaChunkSizeMin..@hpaChunkSizeMax) === hpaChunkSize.to_i + @hpaChunkSize = hpaChunkSize.to_i + puts "Using config map value: HPA_CHUNK_SIZE = #{@hpaChunkSize}" + end + + podsEmitStreamBatchSize = chunk_config[:PODS_EMIT_STREAM_BATCH_SIZE] + if !podsEmitStreamBatchSize.nil? && is_number?(podsEmitStreamBatchSize) && + podsEmitStreamBatchSize.to_i <= @podsChunkSize && podsEmitStreamBatchSize.to_i >= @podsEmitStreamBatchSizeMin + @podsEmitStreamBatchSize = podsEmitStreamBatchSize.to_i + puts "Using config map value: PODS_EMIT_STREAM_BATCH_SIZE = #{@podsEmitStreamBatchSize}" + end + nodesEmitStreamBatchSize = chunk_config[:NODES_EMIT_STREAM_BATCH_SIZE] + if !nodesEmitStreamBatchSize.nil? && is_number?(nodesEmitStreamBatchSize) && + nodesEmitStreamBatchSize.to_i <= @nodesChunkSize && nodesEmitStreamBatchSize.to_i >= @nodesEmitStreamBatchSizeMin + @nodesEmitStreamBatchSize = nodesEmitStreamBatchSize.to_i + puts "Using config map value: NODES_EMIT_STREAM_BATCH_SIZE = #{@nodesEmitStreamBatchSize}" + end + end + end + rescue => errorStr + puts "config::error:Exception while reading config settings for agent configuration setting - #{errorStr}, using defaults" + @enable_health_model = false + end +end + +@configSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] +puts "****************Start Config Processing********************" +if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp("v1") == 0 #note v1 is the only supported schema version , so hardcoding it + configMapSettings = parseConfigMap + if !configMapSettings.nil? + populateSettingValuesFromConfigMap(configMapSettings) + end +else + if (File.file?(@configMapMountPath)) + ConfigParseErrorLogger.logError("config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults, please use supported schema version") + end + @enable_health_model = false +end + +# Write the settings to file, so that they can be set as environment variables +file = File.open("agent_config_env_var", "w") + +if !file.nil? + file.write("export AZMON_CLUSTER_ENABLE_HEALTH_MODEL=#{@enable_health_model}\n") + file.write("export NODES_CHUNK_SIZE=#{@nodesChunkSize}\n") + file.write("export PODS_CHUNK_SIZE=#{@podsChunkSize}\n") + file.write("export EVENTS_CHUNK_SIZE=#{@eventsChunkSize}\n") + file.write("export DEPLOYMENTS_CHUNK_SIZE=#{@deploymentsChunkSize}\n") + file.write("export HPA_CHUNK_SIZE=#{@hpaChunkSize}\n") + file.write("export PODS_EMIT_STREAM_BATCH_SIZE=#{@podsEmitStreamBatchSize}\n") + file.write("export NODES_EMIT_STREAM_BATCH_SIZE=#{@nodesEmitStreamBatchSize}\n") + # Close file after writing all environment variables + file.close +else + puts "Exception while opening file for writing config environment variables" + puts "****************End Config Processing********************" +end diff --git a/build/linux/installer/scripts/tomlparser-health-config.rb b/build/linux/installer/scripts/tomlparser-health-config.rb deleted file mode 100644 index 14c8bdb44..000000000 --- a/build/linux/installer/scripts/tomlparser-health-config.rb +++ /dev/null @@ -1,73 +0,0 @@ -#!/usr/local/bin/ruby - -#this should be require relative in Linux and require in windows, since it is a gem install on windows -@os_type = ENV["OS_TYPE"] -if !@os_type.nil? && !@os_type.empty? && @os_type.strip.casecmp("windows") == 0 - require "tomlrb" -else - require_relative "tomlrb" -end - -require_relative "ConfigParseErrorLogger" - -@configMapMountPath = "/etc/config/settings/agent-settings" -@configSchemaVersion = "" -@enable_health_model = false - -# Use parser to parse the configmap toml file to a ruby structure -def parseConfigMap - begin - # Check to see if config map is created - if (File.file?(@configMapMountPath)) - puts "config::configmap container-azm-ms-agentconfig for agent health settings mounted, parsing values" - parsedConfig = Tomlrb.load_file(@configMapMountPath, symbolize_keys: true) - puts "config::Successfully parsed mounted config map" - return parsedConfig - else - puts "config::configmap container-azm-ms-agentconfig for agent health settings not mounted, using defaults" - return nil - end - rescue => errorStr - ConfigParseErrorLogger.logError("Exception while parsing config map for enabling health: #{errorStr}, using defaults, please check config map for errors") - return nil - end -end - -# Use the ruby structure created after config parsing to set the right values to be used as environment variables -def populateSettingValuesFromConfigMap(parsedConfig) - begin - if !parsedConfig.nil? && !parsedConfig[:agent_settings].nil? && !parsedConfig[:agent_settings][:health_model].nil? && !parsedConfig[:agent_settings][:health_model][:enabled].nil? - @enable_health_model = parsedConfig[:agent_settings][:health_model][:enabled] - puts "enable_health_model = #{@enable_health_model}" - end - rescue => errorStr - puts "config::error:Exception while reading config settings for health_model enabled setting - #{errorStr}, using defaults" - @enable_health_model = false - end -end - -@configSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] -puts "****************Start Config Processing********************" -if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp("v1") == 0 #note v1 is the only supported schema version , so hardcoding it - configMapSettings = parseConfigMap - if !configMapSettings.nil? - populateSettingValuesFromConfigMap(configMapSettings) - end -else - if (File.file?(@configMapMountPath)) - ConfigParseErrorLogger.logError("config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults, please use supported schema version") - end - @enable_health_model = false -end - -# Write the settings to file, so that they can be set as environment variables -file = File.open("health_config_env_var", "w") - -if !file.nil? - file.write("export AZMON_CLUSTER_ENABLE_HEALTH_MODEL=#{@enable_health_model}\n") - # Close file after writing all environment variables - file.close -else - puts "Exception while opening file for writing config environment variables" - puts "****************End Config Processing********************" -end \ No newline at end of file diff --git a/charts/azuremonitor-containers/templates/omsagent-rs-configmap.yaml b/charts/azuremonitor-containers/templates/omsagent-rs-configmap.yaml index baeedf1be..fc7c471f8 100644 --- a/charts/azuremonitor-containers/templates/omsagent-rs-configmap.yaml +++ b/charts/azuremonitor-containers/templates/omsagent-rs-configmap.yaml @@ -95,7 +95,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer @@ -108,24 +108,24 @@ data: - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 4m - buffer_type file - buffer_path %STATE_DIR_WS%/state/out_oms_kubepv*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 5s - max_retry_wait 5m + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/state/out_oms_kubepv*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer @@ -155,7 +155,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer @@ -184,7 +184,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer diff --git a/charts/azuremonitor-containers/values.yaml b/charts/azuremonitor-containers/values.yaml index e8acda20e..907e315d1 100644 --- a/charts/azuremonitor-containers/values.yaml +++ b/charts/azuremonitor-containers/values.yaml @@ -81,6 +81,15 @@ omsagent: deployment: affinity: nodeAffinity: + # affinity to schedule on to ephemeral os node if its available + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: storageprofile + operator: NotIn + values: + - managed requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - labelSelector: diff --git a/kubernetes/linux/Dockerfile b/kubernetes/linux/Dockerfile index d04e86128..34ab133da 100644 --- a/kubernetes/linux/Dockerfile +++ b/kubernetes/linux/Dockerfile @@ -15,6 +15,7 @@ ENV HOST_VAR /hostfs/var ENV AZMON_COLLECT_ENV False ENV KUBE_CLIENT_BACKOFF_BASE 1 ENV KUBE_CLIENT_BACKOFF_DURATION 0 +ENV RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR 0.9 RUN /usr/bin/apt-get update && /usr/bin/apt-get install -y libc-bin wget openssl curl sudo python-ctypes init-system-helpers net-tools rsyslog cron vim dmidecode apt-transport-https gnupg && rm -rf /var/lib/apt/lists/* COPY setup.sh main.sh defaultpromenvvariables defaultpromenvvariables-rs mdsd.xml envmdsd $tmpdir/ WORKDIR ${tmpdir} diff --git a/kubernetes/linux/main.sh b/kubernetes/linux/main.sh index a2ba6a1d1..ed16d3e32 100644 --- a/kubernetes/linux/main.sh +++ b/kubernetes/linux/main.sh @@ -171,14 +171,14 @@ done source config_env_var -#Parse the configmap to set the right environment variables for health feature. -/opt/microsoft/omsagent/ruby/bin/ruby tomlparser-health-config.rb +#Parse the configmap to set the right environment variables for agent config. +/opt/microsoft/omsagent/ruby/bin/ruby tomlparser-agent-config.rb -cat health_config_env_var | while read line; do +cat agent_config_env_var | while read line; do #echo $line echo $line >> ~/.bashrc done -source health_config_env_var +source agent_config_env_var #Parse the configmap to set the right environment variables for network policy manager (npm) integration. /opt/microsoft/omsagent/ruby/bin/ruby tomlparser-npm-config.rb @@ -429,7 +429,7 @@ echo "export DOCKER_CIMPROV_VERSION=$DOCKER_CIMPROV_VERSION" >> ~/.bashrc #region check to auto-activate oneagent, to route container logs, #Intent is to activate one agent routing for all managed clusters with region in the regionllist, unless overridden by configmap -# AZMON_CONTAINER_LOGS_ROUTE will have route (if any) specified in the config map +# AZMON_CONTAINER_LOGS_ROUTE will have route (if any) specified in the config map # AZMON_CONTAINER_LOGS_EFFECTIVE_ROUTE will have the final route that we compute & set, based on our region list logic echo "************start oneagent log routing checks************" # by default, use configmap route for safer side @@ -462,9 +462,9 @@ else echo "current region is not in oneagent regions..." fi -if [ "$isoneagentregion" = true ]; then +if [ "$isoneagentregion" = true ]; then #if configmap has a routing for logs, but current region is in the oneagent region list, take the configmap route - if [ ! -z $AZMON_CONTAINER_LOGS_ROUTE ]; then + if [ ! -z $AZMON_CONTAINER_LOGS_ROUTE ]; then AZMON_CONTAINER_LOGS_EFFECTIVE_ROUTE=$AZMON_CONTAINER_LOGS_ROUTE echo "oneagent region is true for current region:$currentregion and config map logs route is not empty. so using config map logs route as effective route:$AZMON_CONTAINER_LOGS_EFFECTIVE_ROUTE" else #there is no configmap route, so route thru oneagent @@ -511,7 +511,7 @@ if [ ! -e "/etc/config/kube.conf" ]; then echo "starting mdsd ..." mdsd -l -e ${MDSD_LOG}/mdsd.err -w ${MDSD_LOG}/mdsd.warn -o ${MDSD_LOG}/mdsd.info -q ${MDSD_LOG}/mdsd.qos & - + touch /opt/AZMON_CONTAINER_LOGS_EFFECTIVE_ROUTE_V2 fi fi diff --git a/kubernetes/omsagent.yaml b/kubernetes/omsagent.yaml index 26c7ae9a0..013e2a6c0 100644 --- a/kubernetes/omsagent.yaml +++ b/kubernetes/omsagent.yaml @@ -143,7 +143,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer @@ -173,7 +173,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer @@ -203,7 +203,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer @@ -232,7 +232,7 @@ data: type out_oms log_level debug - num_threads 5 + num_threads 2 buffer_chunk_limit 4m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer @@ -533,7 +533,6 @@ spec: cpu: 150m memory: 250Mi env: - # azure devops pipeline uses AKS_RESOURCE_ID and AKS_REGION hence ensure to uncomment these - name: AKS_RESOURCE_ID value: "VALUE_AKS_RESOURCE_ID_VALUE" - name: AKS_REGION @@ -588,6 +587,15 @@ spec: periodSeconds: 60 affinity: nodeAffinity: + # affinity to schedule on to ephemeral os node if its available + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + preference: + matchExpressions: + - key: storageprofile + operator: NotIn + values: + - managed requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - labelSelector: diff --git a/source/plugins/ruby/KubernetesApiClient.rb b/source/plugins/ruby/KubernetesApiClient.rb index 073eb0417..aca2142a0 100644 --- a/source/plugins/ruby/KubernetesApiClient.rb +++ b/source/plugins/ruby/KubernetesApiClient.rb @@ -172,6 +172,10 @@ def isAROV3Cluster return @@IsAROV3Cluster end + def isAROv3MasterOrInfraPod(nodeName) + return isAROV3Cluster() && (!nodeName.nil? && (nodeName.downcase.start_with?("infra-") || nodeName.downcase.start_with?("master-"))) + end + def isNodeMaster return @@IsNodeMaster if !@@IsNodeMaster.nil? @@IsNodeMaster = false @@ -276,7 +280,8 @@ def getPods(namespace) def getWindowsNodes winNodes = [] begin - resourceUri = getNodesResourceUri("nodes") + # get only windows nodes + resourceUri = getNodesResourceUri("nodes?labelSelector=kubernetes.io%2Fos%3Dwindows") nodeInventory = JSON.parse(getKubeResourceInfo(resourceUri).body) @Log.info "KubernetesAPIClient::getWindowsNodes : Got nodes from kube api" # Resetting the windows node cache @@ -396,42 +401,67 @@ def getPodUid(podNameSpace, podMetadata) return podUid end - def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + def getContainerResourceRequestsAndLimits(pod, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin clusterId = getClusterId - metricInfo = metricJSON - metricInfo["items"].each do |pod| - podNameSpace = pod["metadata"]["namespace"] - podUid = getPodUid(podNameSpace, pod["metadata"]) - if podUid.nil? - next - end - - # For ARO, skip the pods scheduled on to master or infra nodes to ingest - if isAROV3Cluster() && !pod["spec"].nil? && !pod["spec"]["nodeName"].nil? && - (pod["spec"]["nodeName"].downcase.start_with?("infra-") || - pod["spec"]["nodeName"].downcase.start_with?("master-")) - next - end + podNameSpace = pod["metadata"]["namespace"] + podUid = getPodUid(podNameSpace, pod["metadata"]) + if podUid.nil? + return metricItems + end - podContainers = [] - if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? - podContainers = podContainers + pod["spec"]["containers"] - end - # Adding init containers to the record list as well. - if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty? - podContainers = podContainers + pod["spec"]["initContainers"] - end + nodeName = "" + #for unscheduled (non-started) pods nodeName does NOT exist + if !pod["spec"]["nodeName"].nil? + nodeName = pod["spec"]["nodeName"] + end + # For ARO, skip the pods scheduled on to master or infra nodes to ingest + if isAROv3MasterOrInfraPod(nodeName) + return metricItems + end - if (!podContainers.nil? && !podContainers.empty? && !pod["spec"]["nodeName"].nil?) - nodeName = pod["spec"]["nodeName"] - podContainers.each do |container| - containerName = container["name"] - #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) - metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) + podContainers = [] + if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? + podContainers = podContainers + pod["spec"]["containers"] + end + # Adding init containers to the record list as well. + if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty? + podContainers = podContainers + pod["spec"]["initContainers"] + end + if (!podContainers.nil? && !podContainers.empty? && !pod["spec"]["nodeName"].nil?) + podContainers.each do |container| + containerName = container["name"] + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) + metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) + + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = nodeName + # Adding this so that it is not set by base omsagent since it was not set earlier and being set by base omsagent + metricProps["Computer"] = nodeName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + #No container level limit for the given metric, so default to node level limit + else + nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect + if (metricCategory == "limits" && @@NodeMetrics.has_key?(nodeMetricsHashKey)) + metricValue = @@NodeMetrics[nodeMetricsHashKey] + #@Log.info("Limits not set for container #{clusterId + "/" + podUid + "/" + containerName} using node level limits: #{nodeMetricsHashKey}=#{metricValue} ") metricItem = {} metricItem["DataItems"] = [] @@ -451,32 +481,6 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName metricProps["Collections"].push(metricCollections) metricItem["DataItems"].push(metricProps) metricItems.push(metricItem) - #No container level limit for the given metric, so default to node level limit - else - nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect - if (metricCategory == "limits" && @@NodeMetrics.has_key?(nodeMetricsHashKey)) - metricValue = @@NodeMetrics[nodeMetricsHashKey] - #@Log.info("Limits not set for container #{clusterId + "/" + podUid + "/" + containerName} using node level limits: #{nodeMetricsHashKey}=#{metricValue} ") - metricItem = {} - metricItem["DataItems"] = [] - - metricProps = {} - metricProps["Timestamp"] = metricTime - metricProps["Host"] = nodeName - # Adding this so that it is not set by base omsagent since it was not set earlier and being set by base omsagent - metricProps["Computer"] = nodeName - metricProps["ObjectName"] = "K8SContainer" - metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName - - metricProps["Collections"] = [] - metricCollections = {} - metricCollections["CounterName"] = metricNametoReturn - metricCollections["Value"] = metricValue - - metricProps["Collections"].push(metricCollections) - metricItem["DataItems"].push(metricProps) - metricItems.push(metricItem) - end end end end @@ -488,78 +492,74 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName return metricItems end #getContainerResourceRequestAndLimits - def getContainerResourceRequestsAndLimitsAsInsightsMetrics(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + def getContainerResourceRequestsAndLimitsAsInsightsMetrics(pod, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin clusterId = getClusterId clusterName = getClusterName - - metricInfo = metricJSON - metricInfo["items"].each do |pod| - podNameSpace = pod["metadata"]["namespace"] - if podNameSpace.eql?("kube-system") && !pod["metadata"].key?("ownerReferences") - # The above case seems to be the only case where you have horizontal scaling of pods - # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash - # instead of the actual poduid. Since this uid is not being surface into the UX - # its ok to use this. - # Use kubernetes.io/config.hash to be able to correlate with cadvisor data - if pod["metadata"]["annotations"].nil? - next - else - podUid = pod["metadata"]["annotations"]["kubernetes.io/config.hash"] - end + podNameSpace = pod["metadata"]["namespace"] + if podNameSpace.eql?("kube-system") && !pod["metadata"].key?("ownerReferences") + # The above case seems to be the only case where you have horizontal scaling of pods + # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash + # instead of the actual poduid. Since this uid is not being surface into the UX + # its ok to use this. + # Use kubernetes.io/config.hash to be able to correlate with cadvisor data + if pod["metadata"]["annotations"].nil? + return metricItems else - podUid = pod["metadata"]["uid"] + podUid = pod["metadata"]["annotations"]["kubernetes.io/config.hash"] end + else + podUid = pod["metadata"]["uid"] + end - podContainers = [] - if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? - podContainers = podContainers + pod["spec"]["containers"] - end - # Adding init containers to the record list as well. - if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty? - podContainers = podContainers + pod["spec"]["initContainers"] - end + podContainers = [] + if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? + podContainers = podContainers + pod["spec"]["containers"] + end + # Adding init containers to the record list as well. + if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty? + podContainers = podContainers + pod["spec"]["initContainers"] + end - if (!podContainers.nil? && !podContainers.empty?) - if (!pod["spec"]["nodeName"].nil?) - nodeName = pod["spec"]["nodeName"] + if (!podContainers.nil? && !podContainers.empty?) + if (!pod["spec"]["nodeName"].nil?) + nodeName = pod["spec"]["nodeName"] + else + nodeName = "" #unscheduled pod. We still want to collect limits & requests for GPU + end + podContainers.each do |container| + metricValue = nil + containerName = container["name"] + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) + metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) else - nodeName = "" #unscheduled pod. We still want to collect limits & requests for GPU - end - podContainers.each do |container| - metricValue = nil - containerName = container["name"] - #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) - metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) - else - #No container level limit for the given metric, so default to node level limit for non-gpu metrics - if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") - nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect - metricValue = @@NodeMetrics[nodeMetricsHashKey] - end - end - if (!metricValue.nil?) - metricItem = {} - metricItem["CollectionTime"] = metricTime - metricItem["Computer"] = nodeName - metricItem["Name"] = metricNametoReturn - metricItem["Value"] = metricValue - metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN - metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE - - metricTags = {} - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = clusterId - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName - metricTags[Constants::INSIGHTSMETRICS_TAGS_CONTAINER_NAME] = podUid + "/" + containerName - #metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = podNameSpace - - metricItem["Tags"] = metricTags - - metricItems.push(metricItem) + #No container level limit for the given metric, so default to node level limit for non-gpu metrics + if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") + nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect + metricValue = @@NodeMetrics[nodeMetricsHashKey] end end + if (!metricValue.nil?) + metricItem = {} + metricItem["CollectionTime"] = metricTime + metricItem["Computer"] = nodeName + metricItem["Name"] = metricNametoReturn + metricItem["Value"] = metricValue + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = clusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_CONTAINER_NAME] = podUid + "/" + containerName + #metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = podNameSpace + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) + end end end rescue => error @@ -578,32 +578,9 @@ def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNamet #if we are coming up with the time it should be same for all nodes #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z metricInfo["items"].each do |node| - if (!node["status"][metricCategory].nil?) - - # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" - metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) - - metricItem = {} - metricItem["DataItems"] = [] - metricProps = {} - metricProps["Timestamp"] = metricTime - metricProps["Host"] = node["metadata"]["name"] - # Adding this so that it is not set by base omsagent since it was not set earlier and being set by base omsagent - metricProps["Computer"] = node["metadata"]["name"] - metricProps["ObjectName"] = "K8SNode" - metricProps["InstanceName"] = clusterId + "/" + node["metadata"]["name"] - metricProps["Collections"] = [] - metricCollections = {} - metricCollections["CounterName"] = metricNametoReturn - metricCollections["Value"] = metricValue - - metricProps["Collections"].push(metricCollections) - metricItem["DataItems"].push(metricProps) + metricItem = parseNodeLimitsFromNodeItem(node, metricCategory, metricNameToCollect, metricNametoReturn, metricTime) + if !metricItem.nil? && !metricItem.empty? metricItems.push(metricItem) - #push node level metrics to a inmem hash so that we can use it looking up at container level. - #Currently if container level cpu & memory limits are not defined we default to node level limits - @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue - #@Log.info ("Node metric hash: #{@@NodeMetrics}") end end rescue => error @@ -612,49 +589,82 @@ def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNamet return metricItems end #parseNodeLimits - def parseNodeLimitsAsInsightsMetrics(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) - metricItems = [] + def parseNodeLimitsFromNodeItem(node, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + metricItem = {} begin - metricInfo = metricJSON clusterId = getClusterId - clusterName = getClusterName #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, #if we are coming up with the time it should be same for all nodes #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - metricInfo["items"].each do |node| - if (!node["status"][metricCategory].nil?) && (!node["status"][metricCategory][metricNameToCollect].nil?) - - # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" or "amd.com/gpu" or "nvidia.com/gpu" - metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) - - metricItem = {} - metricItem["CollectionTime"] = metricTime - metricItem["Computer"] = node["metadata"]["name"] - metricItem["Name"] = metricNametoReturn - metricItem["Value"] = metricValue - metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN - metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE - - metricTags = {} - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = clusterId - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName - metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_VENDOR] = metricNameToCollect - - metricItem["Tags"] = metricTags + if (!node["status"][metricCategory].nil?) && (!node["status"][metricCategory][metricNameToCollect].nil?) + # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" + metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) + + metricItem["DataItems"] = [] + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = node["metadata"]["name"] + # Adding this so that it is not set by base omsagent since it was not set earlier and being set by base omsagent + metricProps["Computer"] = node["metadata"]["name"] + metricProps["ObjectName"] = "K8SNode" + metricProps["InstanceName"] = clusterId + "/" + node["metadata"]["name"] + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + + #push node level metrics to a inmem hash so that we can use it looking up at container level. + #Currently if container level cpu & memory limits are not defined we default to node level limits + @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue + #@Log.info ("Node metric hash: #{@@NodeMetrics}") + end + rescue => error + @Log.warn("parseNodeLimitsFromNodeItem failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") + end + return metricItem + end #parseNodeLimitsFromNodeItem - metricItems.push(metricItem) - #push node level metrics (except gpu ones) to a inmem hash so that we can use it looking up at container level. - #Currently if container level cpu & memory limits are not defined we default to node level limits - if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") - @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue - #@Log.info ("Node metric hash: #{@@NodeMetrics}") - end + def parseNodeLimitsAsInsightsMetrics(node, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + metricItem = {} + begin + #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, + #if we are coming up with the time it should be same for all nodes + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + if (!node["status"][metricCategory].nil?) && (!node["status"][metricCategory][metricNameToCollect].nil?) + clusterId = getClusterId + clusterName = getClusterName + + # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" or "amd.com/gpu" or "nvidia.com/gpu" + metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) + + metricItem["CollectionTime"] = metricTime + metricItem["Computer"] = node["metadata"]["name"] + metricItem["Name"] = metricNametoReturn + metricItem["Value"] = metricValue + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = clusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_VENDOR] = metricNameToCollect + + metricItem["Tags"] = metricTags + + #push node level metrics (except gpu ones) to a inmem hash so that we can use it looking up at container level. + #Currently if container level cpu & memory limits are not defined we default to node level limits + if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") + @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue + #@Log.info ("Node metric hash: #{@@NodeMetrics}") end end rescue => error @Log.warn("parseNodeLimitsAsInsightsMetrics failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") end - return metricItems + return metricItem end def getMetricNumericValue(metricName, metricVal) @@ -777,5 +787,32 @@ def getKubeAPIServerUrl end return apiServerUrl end + + def getKubeServicesInventoryRecords(serviceList, batchTime = Time.utc.iso8601) + kubeServiceRecords = [] + begin + if (!serviceList.nil? && !serviceList.empty?) + servicesCount = serviceList["items"].length + @Log.info("KubernetesApiClient::getKubeServicesInventoryRecords : number of services in serviceList #{servicesCount} @ #{Time.now.utc.iso8601}") + serviceList["items"].each do |item| + kubeServiceRecord = {} + kubeServiceRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + kubeServiceRecord["ServiceName"] = item["metadata"]["name"] + kubeServiceRecord["Namespace"] = item["metadata"]["namespace"] + kubeServiceRecord["SelectorLabels"] = [item["spec"]["selector"]] + # added these before emit to avoid memory foot print + # kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId + # kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName + kubeServiceRecord["ClusterIP"] = item["spec"]["clusterIP"] + kubeServiceRecord["ServiceType"] = item["spec"]["type"] + kubeServiceRecords.push(kubeServiceRecord.dup) + end + end + rescue => errorStr + @Log.warn "KubernetesApiClient::getKubeServicesInventoryRecords:Failed with an error : #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + return kubeServiceRecords + end end end diff --git a/source/plugins/ruby/in_kube_events.rb b/source/plugins/ruby/in_kube_events.rb index 6f59a3fc1..4f6017cc5 100644 --- a/source/plugins/ruby/in_kube_events.rb +++ b/source/plugins/ruby/in_kube_events.rb @@ -17,8 +17,9 @@ def initialize require_relative "omslog" require_relative "ApplicationInsightsUtility" - # 30000 events account to approximately 5MB - @EVENTS_CHUNK_SIZE = 30000 + # refer tomlparser-agent-config for defaults + # this configurable via configmap + @EVENTS_CHUNK_SIZE = 0 # Initializing events count for telemetry @eventsCount = 0 @@ -36,6 +37,15 @@ def configure(conf) def start if @run_interval + if !ENV["EVENTS_CHUNK_SIZE"].nil? && !ENV["EVENTS_CHUNK_SIZE"].empty? && ENV["EVENTS_CHUNK_SIZE"].to_i > 0 + @EVENTS_CHUNK_SIZE = ENV["EVENTS_CHUNK_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kube_events::start: setting to default value since got EVENTS_CHUNK_SIZE nil or empty") + @EVENTS_CHUNK_SIZE = 4000 + end + $log.info("in_kube_events::start : EVENTS_CHUNK_SIZE @ #{@EVENTS_CHUNK_SIZE}") + @finished = false @condition = ConditionVariable.new @mutex = Mutex.new @@ -82,6 +92,8 @@ def enumerate end $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + eventsCount = eventList["items"].length + $log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}" newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) else $log.warn "in_kube_events::enumerate:Received empty eventList" @@ -91,6 +103,8 @@ def enumerate while (!continuationToken.nil? && !continuationToken.empty?) continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}") if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + eventsCount = eventList["items"].length + $log.info "in_kube_events::enumerate:Received number of events in eventList is #{eventsCount} @ #{Time.now.utc.iso8601}" newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) else $log.warn "in_kube_events::enumerate:Received empty eventList" diff --git a/source/plugins/ruby/in_kube_nodes.rb b/source/plugins/ruby/in_kube_nodes.rb index 4d58382f5..e7c5060a5 100644 --- a/source/plugins/ruby/in_kube_nodes.rb +++ b/source/plugins/ruby/in_kube_nodes.rb @@ -32,7 +32,12 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" - @NODES_CHUNK_SIZE = "400" + # refer tomlparser-agent-config for the defaults + @NODES_CHUNK_SIZE = 0 + @NODES_EMIT_STREAM_BATCH_SIZE = 0 + + @nodeInventoryE2EProcessingLatencyMs = 0 + @nodesAPIE2ELatencyMs = 0 require_relative "constants" end @@ -45,11 +50,30 @@ def configure(conf) def start if @run_interval + if !ENV["NODES_CHUNK_SIZE"].nil? && !ENV["NODES_CHUNK_SIZE"].empty? && ENV["NODES_CHUNK_SIZE"].to_i > 0 + @NODES_CHUNK_SIZE = ENV["NODES_CHUNK_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kube_nodes::start: setting to default value since got NODES_CHUNK_SIZE nil or empty") + @NODES_CHUNK_SIZE = 250 + end + $log.info("in_kube_nodes::start : NODES_CHUNK_SIZE @ #{@NODES_CHUNK_SIZE}") + + if !ENV["NODES_EMIT_STREAM_BATCH_SIZE"].nil? && !ENV["NODES_EMIT_STREAM_BATCH_SIZE"].empty? && ENV["NODES_EMIT_STREAM_BATCH_SIZE"].to_i > 0 + @NODES_EMIT_STREAM_BATCH_SIZE = ENV["NODES_EMIT_STREAM_BATCH_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kube_nodes::start: setting to default value since got NODES_EMIT_STREAM_BATCH_SIZE nil or empty") + @NODES_EMIT_STREAM_BATCH_SIZE = 100 + end + $log.info("in_kube_nodes::start : NODES_EMIT_STREAM_BATCH_SIZE @ #{@NODES_EMIT_STREAM_BATCH_SIZE}") + @finished = false @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + @@nodeInventoryLatencyTelemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -69,14 +93,20 @@ def enumerate currentTime = Time.now batchTime = currentTime.utc.iso8601 + @nodesAPIE2ELatencyMs = 0 + @nodeInventoryE2EProcessingLatencyMs = 0 + nodeInventoryStartTime = (Time.now.to_f * 1000).to_i + nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?limit=#{@NODES_CHUNK_SIZE}") continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken(resourceUri) - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i + @nodesAPIE2ELatencyMs = (nodesAPIChunkEndTime - nodesAPIChunkStartTime) if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + $log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}") parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" @@ -84,14 +114,26 @@ def enumerate #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) + nodesAPIChunkStartTime = (Time.now.to_f * 1000).to_i continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken(resourceUri + "&continue=#{continuationToken}") + nodesAPIChunkEndTime = (Time.now.to_f * 1000).to_i + @nodesAPIE2ELatencyMs = @nodesAPIE2ELatencyMs + (nodesAPIChunkEndTime - nodesAPIChunkStartTime) if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + $log.info("in_kube_nodes::enumerate : number of node items :#{nodeInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}") parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" end end + @nodeInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - nodeInventoryStartTime) + timeDifference = (DateTime.now.to_time.to_i - @@nodeInventoryLatencyTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= Constants::TELEMETRY_FLUSH_INTERVAL_IN_MINUTES) + ApplicationInsightsUtility.sendMetricTelemetry("NodeInventoryE2EProcessingLatencyMs", @nodeInventoryE2EProcessingLatencyMs, {}) + ApplicationInsightsUtility.sendMetricTelemetry("NodesAPIE2ELatencyMs", @nodesAPIE2ELatencyMs, {}) + @@nodeInventoryLatencyTelemetryTimeTracker = DateTime.now.to_time.to_i + end # Setting this to nil so that we dont hold memory until GC kicks in nodeInventory = nil rescue => errorStr @@ -109,77 +151,32 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) eventStream = MultiEventStream.new containerNodeInventoryEventStream = MultiEventStream.new insightsMetricsEventStream = MultiEventStream.new + kubePerfEventStream = MultiEventStream.new @@istestvar = ENV["ISTEST"] #get node inventory - nodeInventory["items"].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Computer"] = items["metadata"]["name"] - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] - record["Labels"] = [items["metadata"]["labels"]] - record["Status"] = "" - - if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? - if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack - record["KubernetesProviderID"] = "azurestack" - else - #Multicluster kusto query is filtering after splitting by ":" to the left, so do the same here - #https://msazure.visualstudio.com/One/_git/AzureUX-Monitoring?path=%2Fsrc%2FMonitoringExtension%2FClient%2FInfraInsights%2FData%2FQueryTemplates%2FMultiClusterKustoQueryTemplate.ts&_a=contents&version=GBdev - provider = items["spec"]["providerID"].split(":")[0] - if !provider.nil? && !provider.empty? - record["KubernetesProviderID"] = provider - else - record["KubernetesProviderID"] = items["spec"]["providerID"] - end - end - else - record["KubernetesProviderID"] = "onprem" - end - - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. - - if items["status"].key?("conditions") && !items["status"]["conditions"].empty? - allNodeConditions = "" - items["status"]["conditions"].each do |condition| - if condition["status"] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition["type"] - else - allNodeConditions = condition["type"] - end - end - #collect last transition to/from ready (no matter ready is true/false) - if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? - record["LastTransitionTimeReady"] = condition["lastTransitionTime"] - end - end - if !allNodeConditions.empty? - record["Status"] = allNodeConditions + nodeInventory["items"].each do |item| + # node inventory + nodeInventoryRecord = getNodeInventoryRecord(item, batchTime) + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [nodeInventoryRecord.each { |k, v| nodeInventoryRecord[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{@NODES_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@tag, eventStream) if eventStream + $log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{@NODES_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0) + $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + eventStream = MultiEventStream.new end - nodeInfo = items["status"]["nodeInfo"] - record["KubeletVersion"] = nodeInfo["kubeletVersion"] - record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] - containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] - containerRuntimeVersion = nodeInfo["containerRuntimeVersion"] - if containerRuntimeVersion.downcase.start_with?("docker://") - containerNodeInventoryRecord["DockerVersion"] = containerRuntimeVersion.split("//")[1] - else - # using containerRuntimeVersion as DockerVersion as is for non docker runtimes - containerNodeInventoryRecord["DockerVersion"] = containerRuntimeVersion - end - # ContainerNodeInventory data for docker version and operating system. + # container node inventory + containerNodeInventoryRecord = getContainerNodeInventoryRecord(item, batchTime) containerNodeInventoryWrapper = { "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", "IPName" => "ContainerInsights", @@ -187,33 +184,81 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) } containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper - wrapper = { - "DataType" => "KUBE_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper + if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && containerNodeInventoryEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{@NODES_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + containerNodeInventoryEventStream = MultiEventStream.new + end + + # node metrics records + nodeMetricRecords = [] + nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime) + if !nodeMetricRecord.nil? && !nodeMetricRecord.empty? + nodeMetricRecords.push(nodeMetricRecord) + end + nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "allocatable", "memory", "memoryAllocatableBytes", batchTime) + if !nodeMetricRecord.nil? && !nodeMetricRecord.empty? + nodeMetricRecords.push(nodeMetricRecord) + end + nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "cpu", "cpuCapacityNanoCores", batchTime) + if !nodeMetricRecord.nil? && !nodeMetricRecord.empty? + nodeMetricRecords.push(nodeMetricRecord) + end + nodeMetricRecord = KubernetesApiClient.parseNodeLimitsFromNodeItem(item, "capacity", "memory", "memoryCapacityBytes", batchTime) + if !nodeMetricRecord.nil? && !nodeMetricRecord.empty? + nodeMetricRecords.push(nodeMetricRecord) + end + nodeMetricRecords.each do |metricRecord| + metricRecord["DataType"] = "LINUX_PERF_BLOB" + metricRecord["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, metricRecord) if metricRecord + end + if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{@NODES_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + kubePerfEventStream = MultiEventStream.new + end + + # node GPU metrics record + nodeGPUInsightsMetricsRecords = [] + insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "nvidia.com/gpu", "nodeGpuAllocatable", batchTime) + if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty? + nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord) + end + insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "nvidia.com/gpu", "nodeGpuCapacity", batchTime) + if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty? + nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord) + end + insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "allocatable", "amd.com/gpu", "nodeGpuAllocatable", batchTime) + if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty? + nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord) + end + insightsMetricsRecord = KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(item, "capacity", "amd.com/gpu", "nodeGpuCapacity", batchTime) + if !insightsMetricsRecord.nil? && !insightsMetricsRecord.empty? + nodeGPUInsightsMetricsRecords.push(insightsMetricsRecord) + end + nodeGPUInsightsMetricsRecords.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(emitTime, wrapper) if wrapper + end + if @NODES_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @NODES_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{@NODES_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + insightsMetricsEventStream = MultiEventStream.new + end # Adding telemetry to send node telemetry every 10 minutes timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs timeDifferenceInMinutes = timeDifference / 60 if (timeDifferenceInMinutes >= Constants::TELEMETRY_FLUSH_INTERVAL_IN_MINUTES) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - properties["OperatingSystem"] = nodeInfo["operatingSystem"] - # DockerVersion field holds docker version if runtime is docker/moby else :// - if containerRuntimeVersion.downcase.start_with?("docker://") - properties["DockerVersion"] = containerRuntimeVersion.split("//")[1] - else - properties["DockerVersion"] = containerRuntimeVersion - end - properties["KubernetesProviderID"] = record["KubernetesProviderID"] - properties["KernelVersion"] = nodeInfo["kernelVersion"] - properties["OSImage"] = nodeInfo["osImage"] + properties = getNodeTelemetryProps(item) + properties["KubernetesProviderID"] = nodeInventoryRecord["KubernetesProviderID"] + capacityInfo = item["status"]["capacity"] - capacityInfo = items["status"]["capacity"] ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) - begin if (!capacityInfo["nvidia.com/gpu"].nil?) && (!capacityInfo["nvidia.com/gpu"].empty?) properties["nvigpus"] = capacityInfo["nvidia.com/gpu"] @@ -247,72 +292,32 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) telemetrySent = true end end - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream - router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream if telemetrySent == true @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end - - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) - $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + if eventStream.count > 0 + $log.info("in_kube_node::parse_and_emit_records: number of node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@tag, eventStream) if eventStream + $log.info("in_kube_node::parse_and_emit_records: number of mdm node inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + eventStream = nil end - #:optimize:kubeperf merge - begin - #if(!nodeInventory.empty?) - nodeMetricDataItems = [] - #allocatable metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) - #capacity metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) - - kubePerfEventStream = MultiEventStream.new - - nodeMetricDataItems.each do |record| - record["DataType"] = "LINUX_PERF_BLOB" - record["IPName"] = "LogManagement" - kubePerfEventStream.add(emitTime, record) if record - end - #end - router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - - #start GPU InsightsMetrics items - begin - nodeGPUInsightsMetricsDataItems = [] - nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "allocatable", "nvidia.com/gpu", "nodeGpuAllocatable", batchTime)) - nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "capacity", "nvidia.com/gpu", "nodeGpuCapacity", batchTime)) - - nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "allocatable", "amd.com/gpu", "nodeGpuAllocatable", batchTime)) - nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "capacity", "amd.com/gpu", "nodeGpuCapacity", batchTime)) - - nodeGPUInsightsMetricsDataItems.each do |insightsMetricsRecord| - wrapper = { - "DataType" => "INSIGHTS_METRICS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], - } - insightsMetricsEventStream.add(emitTime, wrapper) if wrapper - end - - router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) - $log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - rescue => errorStr - $log.warn "Failed when processing GPU metrics in_kube_nodes : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - #end GPU InsightsMetrics items - rescue => errorStr - $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + if containerNodeInventoryEventStream.count > 0 + $log.info("in_kube_node::parse_and_emit_records: number of container node inventory records emitted #{containerNodeInventoryEventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + containerNodeInventoryEventStream = nil end - #:optimize:end kubeperf merge + if kubePerfEventStream.count > 0 + $log.info("in_kube_nodes::parse_and_emit_records: number of node perf metric records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + kubePerfEventStream = nil + end + if insightsMetricsEventStream.count > 0 + $log.info("in_kube_nodes::parse_and_emit_records: number of GPU node perf metric records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + insightsMetricsEventStream = nil + end rescue => errorStr $log.warn "Failed to retrieve node inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -352,5 +357,112 @@ def run_periodic end @mutex.unlock end + + # TODO - move this method to KubernetesClient or helper class + def getNodeInventoryRecord(item, batchTime = Time.utc.iso8601) + record = {} + begin + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = item["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = item["metadata"]["creationTimestamp"] + record["Labels"] = [item["metadata"]["labels"]] + record["Status"] = "" + + if !item["spec"]["providerID"].nil? && !item["spec"]["providerID"].empty? + if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack + record["KubernetesProviderID"] = "azurestack" + else + #Multicluster kusto query is filtering after splitting by ":" to the left, so do the same here + #https://msazure.visualstudio.com/One/_git/AzureUX-Monitoring?path=%2Fsrc%2FMonitoringExtension%2FClient%2FInfraInsights%2FData%2FQueryTemplates%2FMultiClusterKustoQueryTemplate.ts&_a=contents&version=GBdev + provider = item["spec"]["providerID"].split(":")[0] + if !provider.nil? && !provider.empty? + record["KubernetesProviderID"] = provider + else + record["KubernetesProviderID"] = item["spec"]["providerID"] + end + end + else + record["KubernetesProviderID"] = "onprem" + end + + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. + if item["status"].key?("conditions") && !item["status"]["conditions"].empty? + allNodeConditions = "" + item["status"]["conditions"].each do |condition| + if condition["status"] == "True" + if !allNodeConditions.empty? + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] + end + end + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] + end + end + if !allNodeConditions.empty? + record["Status"] = allNodeConditions + end + end + nodeInfo = item["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + rescue => errorStr + $log.warn "in_kube_nodes::getNodeInventoryRecord:Failed: #{errorStr}" + end + return record + end + + # TODO - move this method to KubernetesClient or helper class + def getContainerNodeInventoryRecord(item, batchTime = Time.utc.iso8601) + containerNodeInventoryRecord = {} + begin + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = item["metadata"]["name"] + nodeInfo = item["status"]["nodeInfo"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + containerRuntimeVersion = nodeInfo["containerRuntimeVersion"] + if containerRuntimeVersion.downcase.start_with?("docker://") + containerNodeInventoryRecord["DockerVersion"] = containerRuntimeVersion.split("//")[1] + else + # using containerRuntimeVersion as DockerVersion as is for non docker runtimes + containerNodeInventoryRecord["DockerVersion"] = containerRuntimeVersion + end + rescue => errorStr + $log.warn "in_kube_nodes::getContainerNodeInventoryRecord:Failed: #{errorStr}" + end + return containerNodeInventoryRecord + end + + # TODO - move this method to KubernetesClient or helper class + def getNodeTelemetryProps(item) + properties = {} + begin + properties["Computer"] = item["metadata"]["name"] + nodeInfo = item["status"]["nodeInfo"] + properties["KubeletVersion"] = nodeInfo["kubeletVersion"] + properties["OperatingSystem"] = nodeInfo["osImage"] + properties["KernelVersion"] = nodeInfo["kernelVersion"] + properties["OSImage"] = nodeInfo["osImage"] + containerRuntimeVersion = nodeInfo["containerRuntimeVersion"] + if containerRuntimeVersion.downcase.start_with?("docker://") + properties["DockerVersion"] = containerRuntimeVersion.split("//")[1] + else + # using containerRuntimeVersion as DockerVersion as is for non docker runtimes + properties["DockerVersion"] = containerRuntimeVersion + end + properties["NODES_CHUNK_SIZE"] = @NODES_CHUNK_SIZE + properties["NODES_EMIT_STREAM_BATCH_SIZE"] = @NODES_EMIT_STREAM_BATCH_SIZE + rescue => errorStr + $log.warn "in_kube_nodes::getContainerNodeIngetNodeTelemetryPropsventoryRecord:Failed: #{errorStr}" + end + return properties + end end # Kube_Node_Input end # module diff --git a/source/plugins/ruby/in_kube_podinventory.rb b/source/plugins/ruby/in_kube_podinventory.rb index bba3e920f..0cff2eefe 100644 --- a/source/plugins/ruby/in_kube_podinventory.rb +++ b/source/plugins/ruby/in_kube_podinventory.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true module Fluent - require_relative "podinventory_to_mdm" + require_relative "podinventory_to_mdm" class Kube_PodInventory_Input < Input Plugin.register_input("kubepodinventory", self) @@ -19,7 +19,7 @@ def initialize require "yajl" require "set" require "time" - + require_relative "kubernetes_container_inventory" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" @@ -27,11 +27,18 @@ def initialize require_relative "omslog" require_relative "constants" - @PODS_CHUNK_SIZE = "1500" + # refer tomlparser-agent-config for updating defaults + # this configurable via configmap + @PODS_CHUNK_SIZE = 0 + @PODS_EMIT_STREAM_BATCH_SIZE = 0 + @podCount = 0 + @serviceCount = 0 @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} + @podInventoryE2EProcessingLatencyMs = 0 + @podsAPIE2ELatencyMs = 0 end config_param :run_interval, :time, :default => 60 @@ -44,6 +51,24 @@ def configure(conf) def start if @run_interval + if !ENV["PODS_CHUNK_SIZE"].nil? && !ENV["PODS_CHUNK_SIZE"].empty? && ENV["PODS_CHUNK_SIZE"].to_i > 0 + @PODS_CHUNK_SIZE = ENV["PODS_CHUNK_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kube_podinventory::start: setting to default value since got PODS_CHUNK_SIZE nil or empty") + @PODS_CHUNK_SIZE = 1000 + end + $log.info("in_kube_podinventory::start : PODS_CHUNK_SIZE @ #{@PODS_CHUNK_SIZE}") + + if !ENV["PODS_EMIT_STREAM_BATCH_SIZE"].nil? && !ENV["PODS_EMIT_STREAM_BATCH_SIZE"].empty? && ENV["PODS_EMIT_STREAM_BATCH_SIZE"].to_i > 0 + @PODS_EMIT_STREAM_BATCH_SIZE = ENV["PODS_EMIT_STREAM_BATCH_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kube_podinventory::start: setting to default value since got PODS_EMIT_STREAM_BATCH_SIZE nil or empty") + @PODS_EMIT_STREAM_BATCH_SIZE = 200 + end + $log.info("in_kube_podinventory::start : PODS_EMIT_STREAM_BATCH_SIZE @ #{@PODS_EMIT_STREAM_BATCH_SIZE}") + @finished = false @condition = ConditionVariable.new @mutex = Mutex.new @@ -67,12 +92,15 @@ def enumerate(podList = nil) podInventory = podList telemetryFlush = false @podCount = 0 + @serviceCount = 0 @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} currentTime = Time.now batchTime = currentTime.utc.iso8601 - + serviceRecords = [] + @podInventoryE2EProcessingLatencyMs = 0 + podInventoryStartTime = (Time.now.to_f * 1000).to_i # Get services first so that we dont need to make a call for very chunk $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") serviceInfo = KubernetesApiClient.getKubeResourceInfo("services") @@ -84,32 +112,48 @@ def enumerate(podList = nil) serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body)) $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}") serviceInfo = nil + # service inventory records much smaller and fixed size compared to serviceList + serviceRecords = KubernetesApiClient.getKubeServicesInventoryRecords(serviceList, batchTime) + # updating for telemetry + @serviceCount += serviceRecords.length + serviceList = nil end + # to track e2e processing latency + @podsAPIE2ELatencyMs = 0 + podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i + @podsAPIE2ELatencyMs = (podsAPIChunkEndTime - podsAPIChunkStartTime) if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) + $log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}") + parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) + podsAPIChunkStartTime = (Time.now.to_f * 1000).to_i continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + podsAPIChunkEndTime = (Time.now.to_f * 1000).to_i + @podsAPIE2ELatencyMs = @podsAPIE2ELatencyMs + (podsAPIChunkEndTime - podsAPIChunkStartTime) if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) + $log.info("in_kube_podinventory::enumerate : number of pod items :#{podInventory["items"].length} from Kube API @ #{Time.now.utc.iso8601}") + parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end end + @podInventoryE2EProcessingLatencyMs = ((Time.now.to_f * 1000).to_i - podInventoryStartTime) # Setting these to nil so that we dont hold memory until GC kicks in podInventory = nil - serviceList = nil + serviceRecords = nil # Adding telemetry to send pod telemetry every 5 minutes timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs @@ -122,14 +166,19 @@ def enumerate(podList = nil) if telemetryFlush == true telemetryProperties = {} telemetryProperties["Computer"] = @@hostName + telemetryProperties["PODS_CHUNK_SIZE"] = @PODS_CHUNK_SIZE + telemetryProperties["PODS_EMIT_STREAM_BATCH_SIZE"] = @PODS_EMIT_STREAM_BATCH_SIZE ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + ApplicationInsightsUtility.sendMetricTelemetry("ServiceCount", @serviceCount, {}) telemetryProperties["ControllerData"] = @controllerData.to_json ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) if @winContainerCount > 0 telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) end + ApplicationInsightsUtility.sendMetricTelemetry("PodInventoryE2EProcessingLatencyMs", @podInventoryE2EProcessingLatencyMs, telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodsAPIE2ELatencyMs", @podsAPIE2ELatencyMs, telemetryProperties) @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end rescue => errorStr @@ -137,260 +186,138 @@ def enumerate(podList = nil) $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end - end + end - def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime = Time.utc.iso8601) + def parse_and_emit_records(podInventory, serviceRecords, continuationToken, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f #batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new + kubePerfEventStream = MultiEventStream.new + insightsMetricsEventStream = MultiEventStream.new @@istestvar = ENV["ISTEST"] begin #begin block start # Getting windows nodes from kubeapi winNodes = KubernetesApiClient.getWindowsNodesArray - - podInventory["items"].each do |items| #podInventory block start - containerInventoryRecords = [] - records = [] - record = {} - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Name"] = items["metadata"]["name"] - podNameSpace = items["metadata"]["namespace"] - - # For ARO v3 cluster, skip the pods scheduled on to master or infra nodes - if KubernetesApiClient.isAROV3Cluster && !items["spec"].nil? && !items["spec"]["nodeName"].nil? && - (items["spec"]["nodeName"].downcase.start_with?("infra-") || - items["spec"]["nodeName"].downcase.start_with?("master-")) - next - end - - podUid = KubernetesApiClient.getPodUid(podNameSpace, items["metadata"]) - if podUid.nil? - next - end - record["PodUid"] = podUid - record["PodLabel"] = [items["metadata"]["labels"]] - record["Namespace"] = podNameSpace - record["PodCreationTimeStamp"] = items["metadata"]["creationTimestamp"] - #for unscheduled (non-started) pods startTime does NOT exist - if !items["status"]["startTime"].nil? - record["PodStartTime"] = items["status"]["startTime"] - else - record["PodStartTime"] = "" - end - #podStatus - # the below is for accounting 'NodeLost' scenario, where-in the pod(s) in the lost node is still being reported as running - podReadyCondition = true - if !items["status"]["reason"].nil? && items["status"]["reason"] == "NodeLost" && !items["status"]["conditions"].nil? - items["status"]["conditions"].each do |condition| - if condition["type"] == "Ready" && condition["status"] == "False" - podReadyCondition = false - break - end + podInventory["items"].each do |item| #podInventory block start + # pod inventory records + podInventoryRecords = getPodInventoryRecords(item, serviceRecords, batchTime) + podInventoryRecords.each do |record| + if !record.nil? + wrapper = { + "DataType" => "KUBE_POD_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + @inventoryToMdmConvertor.process_pod_inventory_record(wrapper) end end - - if podReadyCondition == false - record["PodStatus"] = "Unknown" - # ICM - https://portal.microsofticm.com/imp/v3/incidents/details/187091803/home - elsif !items["metadata"]["deletionTimestamp"].nil? && !items["metadata"]["deletionTimestamp"].empty? - record["PodStatus"] = Constants::POD_STATUS_TERMINATING - else - record["PodStatus"] = items["status"]["phase"] - end - #for unscheduled (non-started) pods podIP does NOT exist - if !items["status"]["podIP"].nil? - record["PodIp"] = items["status"]["podIP"] - else - record["PodIp"] = "" - end - #for unscheduled (non-started) pods nodeName does NOT exist - if !items["spec"]["nodeName"].nil? - record["Computer"] = items["spec"]["nodeName"] - else - record["Computer"] = "" - end - # Setting this flag to true so that we can send ContainerInventory records for containers # on windows nodes and parse environment variables for these containers if winNodes.length > 0 - if (!record["Computer"].empty? && (winNodes.include? record["Computer"])) + nodeName = "" + if !item["spec"]["nodeName"].nil? + nodeName = item["spec"]["nodeName"] + end + if (!nodeName.empty? && (winNodes.include? nodeName)) clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"] #Generate ContainerInventory records for windows nodes so that we can get image and image tag in property panel - containerInventoryRecordsInPodItem = KubernetesContainerInventory.getContainerInventoryRecords(items, batchTime, clusterCollectEnvironmentVar, true) - containerInventoryRecordsInPodItem.each do |containerRecord| - containerInventoryRecords.push(containerRecord) - end + containerInventoryRecords = KubernetesContainerInventory.getContainerInventoryRecords(item, batchTime, clusterCollectEnvironmentVar, true) + # Send container inventory records for containers on windows nodes + @winContainerCount += containerInventoryRecords.length + containerInventoryRecords.each do |cirecord| + if !cirecord.nil? + ciwrapper = { + "DataType" => "CONTAINER_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [cirecord.each { |k, v| cirecord[k] = v }], + } + eventStream.add(emitTime, ciwrapper) if ciwrapper + end + end end end - record["ClusterId"] = KubernetesApiClient.getClusterId - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList) - - if !items["metadata"]["ownerReferences"].nil? - record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"] - record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"] - @controllerSet.add(record["ControllerKind"] + record["ControllerName"]) - #Adding controller kind to telemetry ro information about customer workload - if (@controllerData[record["ControllerKind"]].nil?) - @controllerData[record["ControllerKind"]] = 1 - else - controllerValue = @controllerData[record["ControllerKind"]] - @controllerData[record["ControllerKind"]] += 1 + if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && eventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{@PODS_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0) + $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + router.emit_stream(@tag, eventStream) if eventStream + eventStream = MultiEventStream.new end - podRestartCount = 0 - record["PodRestartCount"] = 0 - #Invoke the helper method to compute ready/not ready mdm metric - @inventoryToMdmConvertor.process_record_for_pods_ready_metric(record["ControllerName"], record["Namespace"], items["status"]["conditions"]) + #container perf records + containerMetricDataItems = [] + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "cpu", "cpuRequestNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "requests", "memory", "memoryRequestBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "cpu", "cpuLimitNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(item, "limits", "memory", "memoryLimitBytes", batchTime)) - podContainers = [] - if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? - podContainers = podContainers + items["status"]["containerStatuses"] - end - # Adding init containers to the record list as well. - if items["status"].key?("initContainerStatuses") && !items["status"]["initContainerStatuses"].empty? - podContainers = podContainers + items["status"]["initContainerStatuses"] + containerMetricDataItems.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, record) if record end - # if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? #container status block start - if !podContainers.empty? #container status block start - podContainers.each do |container| - containerRestartCount = 0 - lastFinishedTime = nil - # Need this flag to determine if we need to process container data for mdm metrics like oomkilled and container restart - #container Id is of the form - #docker://dfd9da983f1fd27432fb2c1fe3049c0a1d25b1c697b2dc1a530c986e58b16527 - if !container["containerID"].nil? - record["ContainerID"] = container["containerID"].split("//")[1] - else - # for containers that have image issues (like invalid image/tag etc..) this will be empty. do not make it all 0 - record["ContainerID"] = "" - end - #keeping this as which is same as InstanceName in perf table - if podUid.nil? || container["name"].nil? - next - else - record["ContainerName"] = podUid + "/" + container["name"] - end - #Pod restart count is a sumtotal of restart counts of individual containers - #within the pod. The restart count of a container is maintained by kubernetes - #itself in the form of a container label. - containerRestartCount = container["restartCount"] - record["ContainerRestartCount"] = containerRestartCount - - containerStatus = container["state"] - record["ContainerStatusReason"] = "" - # state is of the following form , so just picking up the first key name - # "state": { - # "waiting": { - # "reason": "CrashLoopBackOff", - # "message": "Back-off 5m0s restarting failed container=metrics-server pod=metrics-server-2011498749-3g453_kube-system(5953be5f-fcae-11e7-a356-000d3ae0e432)" - # } - # }, - # the below is for accounting 'NodeLost' scenario, where-in the containers in the lost node/pod(s) is still being reported as running - if podReadyCondition == false - record["ContainerStatus"] = "Unknown" - else - record["ContainerStatus"] = containerStatus.keys[0] - end - #TODO : Remove ContainerCreationTimeStamp from here since we are sending it as a metric - #Picking up both container and node start time from cAdvisor to be consistent - if containerStatus.keys[0] == "running" - record["ContainerCreationTimeStamp"] = container["state"]["running"]["startedAt"] - else - if !containerStatus[containerStatus.keys[0]]["reason"].nil? && !containerStatus[containerStatus.keys[0]]["reason"].empty? - record["ContainerStatusReason"] = containerStatus[containerStatus.keys[0]]["reason"] - end - # Process the record to see if job was completed 6 hours ago. If so, send metric to mdm - if !record["ControllerKind"].nil? && record["ControllerKind"].downcase == Constants::CONTROLLER_KIND_JOB - @inventoryToMdmConvertor.process_record_for_terminated_job_metric(record["ControllerName"], record["Namespace"], containerStatus) - end - end - - # Record the last state of the container. This may have information on why a container was killed. - begin - if !container["lastState"].nil? && container["lastState"].keys.length == 1 - lastStateName = container["lastState"].keys[0] - lastStateObject = container["lastState"][lastStateName] - if !lastStateObject.is_a?(Hash) - raise "expected a hash object. This could signify a bug or a kubernetes API change" - end - - if lastStateObject.key?("reason") && lastStateObject.key?("startedAt") && lastStateObject.key?("finishedAt") - newRecord = Hash.new - newRecord["lastState"] = lastStateName # get the name of the last state (ex: terminated) - lastStateReason = lastStateObject["reason"] - # newRecord["reason"] = lastStateObject["reason"] # (ex: OOMKilled) - newRecord["reason"] = lastStateReason # (ex: OOMKilled) - newRecord["startedAt"] = lastStateObject["startedAt"] # (ex: 2019-07-02T14:58:51Z) - lastFinishedTime = lastStateObject["finishedAt"] - newRecord["finishedAt"] = lastFinishedTime # (ex: 2019-07-02T14:58:52Z) - - # only write to the output field if everything previously ran without error - record["ContainerLastStatus"] = newRecord - - #Populate mdm metric for OOMKilled container count if lastStateReason is OOMKilled - if lastStateReason.downcase == Constants::REASON_OOM_KILLED - @inventoryToMdmConvertor.process_record_for_oom_killed_metric(record["ControllerName"], record["Namespace"], lastFinishedTime) - end - lastStateReason = nil - else - record["ContainerLastStatus"] = Hash.new - end - else - record["ContainerLastStatus"] = Hash.new - end - - #Populate mdm metric for container restart count if greater than 0 - if (!containerRestartCount.nil? && (containerRestartCount.is_a? Integer) && containerRestartCount > 0) - @inventoryToMdmConvertor.process_record_for_container_restarts_metric(record["ControllerName"], record["Namespace"], lastFinishedTime) - end - rescue => errorStr - $log.warn "Failed in parse_and_emit_record pod inventory while processing ContainerLastStatus: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - record["ContainerLastStatus"] = Hash.new - end + if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubePerfEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_podinventory::parse_and_emit_records: number of container perf records emitted #{@PODS_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + kubePerfEventStream = MultiEventStream.new + end - podRestartCount += containerRestartCount - records.push(record.dup) - end - else # for unscheduled pods there are no status.containerStatuses, in this case we still want the pod - records.push(record) - end #container status block end - records.each do |record| - if !record.nil? - record["PodRestartCount"] = podRestartCount - wrapper = { - "DataType" => "KUBE_POD_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - @inventoryToMdmConvertor.process_pod_inventory_record(wrapper) - end + # container GPU records + containerGPUInsightsMetricsDataItems = [] + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "nvidia.com/gpu", "containerGpuRequests", batchTime)) + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "nvidia.com/gpu", "containerGpuLimits", batchTime)) + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "requests", "amd.com/gpu", "containerGpuRequests", batchTime)) + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(item, "limits", "amd.com/gpu", "containerGpuLimits", batchTime)) + containerGPUInsightsMetricsDataItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(emitTime, wrapper) if wrapper end - # Send container inventory records for containers on windows nodes - @winContainerCount += containerInventoryRecords.length - containerInventoryRecords.each do |cirecord| - if !cirecord.nil? - ciwrapper = { - "DataType" => "CONTAINER_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [cirecord.each { |k, v| cirecord[k] = v }], - } - eventStream.add(emitTime, ciwrapper) if ciwrapper + + if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && insightsMetricsEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_podinventory::parse_and_emit_records: number of GPU insights metrics records emitted #{@PODS_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0) + $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + insightsMetricsEventStream = MultiEventStream.new end end #podInventory block end - router.emit_stream(@tag, eventStream) if eventStream + if eventStream.count > 0 + $log.info("in_kube_podinventory::parse_and_emit_records: number of pod inventory records emitted #{eventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@tag, eventStream) if eventStream + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0) + $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + eventStream = nil + end + + if kubePerfEventStream.count > 0 + $log.info("in_kube_podinventory::parse_and_emit_records: number of perf records emitted #{kubePerfEventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + kubePerfEventStream = nil + end + + if insightsMetricsEventStream.count > 0 + $log.info("in_kube_podinventory::parse_and_emit_records: number of insights metrics records emitted #{insightsMetricsEventStream.count} @ #{Time.now.utc.iso8601}") + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0) + $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + insightsMetricsEventStream = nil + end - if continuationToken.nil? #no more chunks in this batch to be sent, get all pod inventory records to send + if continuationToken.nil? #no more chunks in this batch to be sent, get all mdm pod inventory records to send @log.info "Sending pod inventory mdm records to out_mdm" pod_inventory_mdm_records = @inventoryToMdmConvertor.get_pod_inventory_mdm_records(batchTime) @log.info "pod_inventory_mdm_records.size #{pod_inventory_mdm_records.size}" @@ -401,101 +328,36 @@ def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTi router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es end - #:optimize:kubeperf merge - begin - #if(!podInventory.empty?) - containerMetricDataItems = [] - #hostName = (OMS::Common.get_hostname) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime)) - - kubePerfEventStream = MultiEventStream.new - insightsMetricsEventStream = MultiEventStream.new - - containerMetricDataItems.each do |record| - record["DataType"] = "LINUX_PERF_BLOB" - record["IPName"] = "LogManagement" - kubePerfEventStream.add(emitTime, record) if record - end - #end - router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - - begin - #start GPU InsightsMetrics items - - containerGPUInsightsMetricsDataItems = [] - containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "requests", "nvidia.com/gpu", "containerGpuRequests", batchTime)) - containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "limits", "nvidia.com/gpu", "containerGpuLimits", batchTime)) - - containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "requests", "amd.com/gpu", "containerGpuRequests", batchTime)) - containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "limits", "amd.com/gpu", "containerGpuLimits", batchTime)) - - containerGPUInsightsMetricsDataItems.each do |insightsMetricsRecord| - wrapper = { - "DataType" => "INSIGHTS_METRICS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], - } - insightsMetricsEventStream.add(emitTime, wrapper) if wrapper - - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) - $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - end - - router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream - #end GPU InsightsMetrics items - rescue => errorStr - $log.warn "Failed when processing GPU metrics in_kube_podinventory : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - rescue => errorStr - $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - #:optimize:end kubeperf merge - - #:optimize:start kubeservices merge - begin - if (!serviceList.nil? && !serviceList.empty?) - kubeServicesEventStream = MultiEventStream.new - serviceList["items"].each do |items| - kubeServiceRecord = {} - kubeServiceRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - kubeServiceRecord["ServiceName"] = items["metadata"]["name"] - kubeServiceRecord["Namespace"] = items["metadata"]["namespace"] - kubeServiceRecord["SelectorLabels"] = [items["spec"]["selector"]] + if continuationToken.nil? # sending kube services inventory records + kubeServicesEventStream = MultiEventStream.new + serviceRecords.each do |kubeServiceRecord| + if !kubeServiceRecord.nil? + # adding before emit to reduce memory foot print kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName - kubeServiceRecord["ClusterIP"] = items["spec"]["clusterIP"] - kubeServiceRecord["ServiceType"] = items["spec"]["type"] - # : Add ports and status fields kubeServicewrapper = { "DataType" => "KUBE_SERVICES_BLOB", "IPName" => "ContainerInsights", "DataItems" => [kubeServiceRecord.each { |k, v| kubeServiceRecord[k] = v }], } kubeServicesEventStream.add(emitTime, kubeServicewrapper) if kubeServicewrapper + if @PODS_EMIT_STREAM_BATCH_SIZE > 0 && kubeServicesEventStream.count >= @PODS_EMIT_STREAM_BATCH_SIZE + $log.info("in_kube_podinventory::parse_and_emit_records: number of service records emitted #{@PODS_EMIT_STREAM_BATCH_SIZE} @ #{Time.now.utc.iso8601}") + router.emit_stream(@@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream + kubeServicesEventStream = MultiEventStream.new + end end + end + + if kubeServicesEventStream.count > 0 + $log.info("in_kube_podinventory::parse_and_emit_records : number of service records emitted #{kubeServicesEventStream.count} @ #{Time.now.utc.iso8601}") router.emit_stream(@@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream end - rescue => errorStr - $log.warn "Failed in parse_and_emit_record for KubeServices from in_kube_podinventory : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + kubeServicesEventStream = nil end - #:optimize:end kubeservices merge #Updating value for AppInsights telemetry @podCount += podInventory["items"].length - - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) - $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end rescue => errorStr $log.warn "Failed in parse_and_emit_record pod inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -535,25 +397,238 @@ def run_periodic @mutex.unlock end - def getServiceNameFromLabels(namespace, labels, serviceList) + # TODO - move this method to KubernetesClient or helper class + def getPodInventoryRecords(item, serviceRecords, batchTime = Time.utc.iso8601) + records = [] + record = {} + + begin + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Name"] = item["metadata"]["name"] + podNameSpace = item["metadata"]["namespace"] + podUid = KubernetesApiClient.getPodUid(podNameSpace, item["metadata"]) + if podUid.nil? + return records + end + + nodeName = "" + #for unscheduled (non-started) pods nodeName does NOT exist + if !item["spec"]["nodeName"].nil? + nodeName = item["spec"]["nodeName"] + end + # For ARO v3 cluster, skip the pods scheduled on to master or infra nodes + if KubernetesApiClient.isAROv3MasterOrInfraPod(nodeName) + return records + end + + record["PodUid"] = podUid + record["PodLabel"] = [item["metadata"]["labels"]] + record["Namespace"] = podNameSpace + record["PodCreationTimeStamp"] = item["metadata"]["creationTimestamp"] + #for unscheduled (non-started) pods startTime does NOT exist + if !item["status"]["startTime"].nil? + record["PodStartTime"] = item["status"]["startTime"] + else + record["PodStartTime"] = "" + end + #podStatus + # the below is for accounting 'NodeLost' scenario, where-in the pod(s) in the lost node is still being reported as running + podReadyCondition = true + if !item["status"]["reason"].nil? && item["status"]["reason"] == "NodeLost" && !item["status"]["conditions"].nil? + item["status"]["conditions"].each do |condition| + if condition["type"] == "Ready" && condition["status"] == "False" + podReadyCondition = false + break + end + end + end + if podReadyCondition == false + record["PodStatus"] = "Unknown" + # ICM - https://portal.microsofticm.com/imp/v3/incidents/details/187091803/home + elsif !item["metadata"]["deletionTimestamp"].nil? && !item["metadata"]["deletionTimestamp"].empty? + record["PodStatus"] = Constants::POD_STATUS_TERMINATING + else + record["PodStatus"] = item["status"]["phase"] + end + #for unscheduled (non-started) pods podIP does NOT exist + if !item["status"]["podIP"].nil? + record["PodIp"] = item["status"]["podIP"] + else + record["PodIp"] = "" + end + + record["Computer"] = nodeName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ServiceName"] = getServiceNameFromLabels(item["metadata"]["namespace"], item["metadata"]["labels"], serviceRecords) + + if !item["metadata"]["ownerReferences"].nil? + record["ControllerKind"] = item["metadata"]["ownerReferences"][0]["kind"] + record["ControllerName"] = item["metadata"]["ownerReferences"][0]["name"] + @controllerSet.add(record["ControllerKind"] + record["ControllerName"]) + #Adding controller kind to telemetry ro information about customer workload + if (@controllerData[record["ControllerKind"]].nil?) + @controllerData[record["ControllerKind"]] = 1 + else + controllerValue = @controllerData[record["ControllerKind"]] + @controllerData[record["ControllerKind"]] += 1 + end + end + podRestartCount = 0 + record["PodRestartCount"] = 0 + + #Invoke the helper method to compute ready/not ready mdm metric + @inventoryToMdmConvertor.process_record_for_pods_ready_metric(record["ControllerName"], record["Namespace"], item["status"]["conditions"]) + + podContainers = [] + if item["status"].key?("containerStatuses") && !item["status"]["containerStatuses"].empty? + podContainers = podContainers + item["status"]["containerStatuses"] + end + # Adding init containers to the record list as well. + if item["status"].key?("initContainerStatuses") && !item["status"]["initContainerStatuses"].empty? + podContainers = podContainers + item["status"]["initContainerStatuses"] + end + # if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? #container status block start + if !podContainers.empty? #container status block start + podContainers.each do |container| + containerRestartCount = 0 + lastFinishedTime = nil + # Need this flag to determine if we need to process container data for mdm metrics like oomkilled and container restart + #container Id is of the form + #docker://dfd9da983f1fd27432fb2c1fe3049c0a1d25b1c697b2dc1a530c986e58b16527 + if !container["containerID"].nil? + record["ContainerID"] = container["containerID"].split("//")[1] + else + # for containers that have image issues (like invalid image/tag etc..) this will be empty. do not make it all 0 + record["ContainerID"] = "" + end + #keeping this as which is same as InstanceName in perf table + if podUid.nil? || container["name"].nil? + next + else + record["ContainerName"] = podUid + "/" + container["name"] + end + #Pod restart count is a sumtotal of restart counts of individual containers + #within the pod. The restart count of a container is maintained by kubernetes + #itself in the form of a container label. + containerRestartCount = container["restartCount"] + record["ContainerRestartCount"] = containerRestartCount + + containerStatus = container["state"] + record["ContainerStatusReason"] = "" + # state is of the following form , so just picking up the first key name + # "state": { + # "waiting": { + # "reason": "CrashLoopBackOff", + # "message": "Back-off 5m0s restarting failed container=metrics-server pod=metrics-server-2011498749-3g453_kube-system(5953be5f-fcae-11e7-a356-000d3ae0e432)" + # } + # }, + # the below is for accounting 'NodeLost' scenario, where-in the containers in the lost node/pod(s) is still being reported as running + if podReadyCondition == false + record["ContainerStatus"] = "Unknown" + else + record["ContainerStatus"] = containerStatus.keys[0] + end + #TODO : Remove ContainerCreationTimeStamp from here since we are sending it as a metric + #Picking up both container and node start time from cAdvisor to be consistent + if containerStatus.keys[0] == "running" + record["ContainerCreationTimeStamp"] = container["state"]["running"]["startedAt"] + else + if !containerStatus[containerStatus.keys[0]]["reason"].nil? && !containerStatus[containerStatus.keys[0]]["reason"].empty? + record["ContainerStatusReason"] = containerStatus[containerStatus.keys[0]]["reason"] + end + # Process the record to see if job was completed 6 hours ago. If so, send metric to mdm + if !record["ControllerKind"].nil? && record["ControllerKind"].downcase == Constants::CONTROLLER_KIND_JOB + @inventoryToMdmConvertor.process_record_for_terminated_job_metric(record["ControllerName"], record["Namespace"], containerStatus) + end + end + + # Record the last state of the container. This may have information on why a container was killed. + begin + if !container["lastState"].nil? && container["lastState"].keys.length == 1 + lastStateName = container["lastState"].keys[0] + lastStateObject = container["lastState"][lastStateName] + if !lastStateObject.is_a?(Hash) + raise "expected a hash object. This could signify a bug or a kubernetes API change" + end + + if lastStateObject.key?("reason") && lastStateObject.key?("startedAt") && lastStateObject.key?("finishedAt") + newRecord = Hash.new + newRecord["lastState"] = lastStateName # get the name of the last state (ex: terminated) + lastStateReason = lastStateObject["reason"] + # newRecord["reason"] = lastStateObject["reason"] # (ex: OOMKilled) + newRecord["reason"] = lastStateReason # (ex: OOMKilled) + newRecord["startedAt"] = lastStateObject["startedAt"] # (ex: 2019-07-02T14:58:51Z) + lastFinishedTime = lastStateObject["finishedAt"] + newRecord["finishedAt"] = lastFinishedTime # (ex: 2019-07-02T14:58:52Z) + + # only write to the output field if everything previously ran without error + record["ContainerLastStatus"] = newRecord + + #Populate mdm metric for OOMKilled container count if lastStateReason is OOMKilled + if lastStateReason.downcase == Constants::REASON_OOM_KILLED + @inventoryToMdmConvertor.process_record_for_oom_killed_metric(record["ControllerName"], record["Namespace"], lastFinishedTime) + end + lastStateReason = nil + else + record["ContainerLastStatus"] = Hash.new + end + else + record["ContainerLastStatus"] = Hash.new + end + + #Populate mdm metric for container restart count if greater than 0 + if (!containerRestartCount.nil? && (containerRestartCount.is_a? Integer) && containerRestartCount > 0) + @inventoryToMdmConvertor.process_record_for_container_restarts_metric(record["ControllerName"], record["Namespace"], lastFinishedTime) + end + rescue => errorStr + $log.warn "Failed in parse_and_emit_record pod inventory while processing ContainerLastStatus: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + record["ContainerLastStatus"] = Hash.new + end + + podRestartCount += containerRestartCount + records.push(record.dup) + end + else # for unscheduled pods there are no status.containerStatuses, in this case we still want the pod + records.push(record) + end #container status block end + + records.each do |record| + if !record.nil? + record["PodRestartCount"] = podRestartCount + end + end + rescue => error + $log.warn("getPodInventoryRecords failed: #{error}") + end + return records + end + + # TODO - move this method to KubernetesClient or helper class + def getServiceNameFromLabels(namespace, labels, serviceRecords) serviceName = "" begin if !labels.nil? && !labels.empty? - if (!serviceList.nil? && !serviceList.empty? && serviceList.key?("items") && !serviceList["items"].empty?) - serviceList["items"].each do |item| - found = 0 - if !item["spec"].nil? && !item["spec"]["selector"].nil? && item["metadata"]["namespace"] == namespace - selectorLabels = item["spec"]["selector"] - if !selectorLabels.empty? - selectorLabels.each do |key, value| - if !(labels.select { |k, v| k == key && v == value }.length > 0) - break - end - found = found + 1 + serviceRecords.each do |kubeServiceRecord| + found = 0 + if kubeServiceRecord["Namespace"] == namespace + selectorLabels = {} + # selector labels wrapped in array in kube service records so unwrapping here + if !kubeServiceRecord["SelectorLabels"].nil? && kubeServiceRecord["SelectorLabels"].length > 0 + selectorLabels = kubeServiceRecord["SelectorLabels"][0] + end + if !selectorLabels.nil? && !selectorLabels.empty? + selectorLabels.each do |key, value| + if !(labels.select { |k, v| k == key && v == value }.length > 0) + break end + found = found + 1 end + # service can have no selectors if found == selectorLabels.length - return item["metadata"]["name"] + return kubeServiceRecord["ServiceName"] end end end diff --git a/source/plugins/ruby/in_kubestate_deployments.rb b/source/plugins/ruby/in_kubestate_deployments.rb index bcf397150..27e4709a2 100644 --- a/source/plugins/ruby/in_kubestate_deployments.rb +++ b/source/plugins/ruby/in_kubestate_deployments.rb @@ -2,230 +2,238 @@ # frozen_string_literal: true module Fluent - class Kube_Kubestate_Deployments_Input < Input - Plugin.register_input("kubestatedeployments", self) - @@istestvar = ENV["ISTEST"] - # telemetry - To keep telemetry cost reasonable, we keep track of the max deployments over a period of 15m - @@deploymentsCount = 0 - - - - def initialize - super - require "yajl/json_gem" - require "yajl" - require "date" - require "time" - - require_relative "KubernetesApiClient" - require_relative "oms_common" - require_relative "omslog" - require_relative "ApplicationInsightsUtility" - require_relative "constants" - - # roughly each deployment is 8k - # 1000 deployments account to approximately 8MB - @DEPLOYMENTS_CHUNK_SIZE = 1000 - @DEPLOYMENTS_API_GROUP = "apps" - @@telemetryLastSentTime = DateTime.now.to_time.to_i - - - @deploymentsRunningTotal = 0 - - @NodeName = OMS::Common.get_hostname - @ClusterId = KubernetesApiClient.getClusterId - @ClusterName = KubernetesApiClient.getClusterName - end - - config_param :run_interval, :time, :default => 60 - config_param :tag, :string, :default => Constants::INSIGHTSMETRICS_FLUENT_TAG - - def configure(conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) + class Kube_Kubestate_Deployments_Input < Input + Plugin.register_input("kubestatedeployments", self) + @@istestvar = ENV["ISTEST"] + # telemetry - To keep telemetry cost reasonable, we keep track of the max deployments over a period of 15m + @@deploymentsCount = 0 + + def initialize + super + require "yajl/json_gem" + require "yajl" + require "date" + require "time" + + require_relative "KubernetesApiClient" + require_relative "oms_common" + require_relative "omslog" + require_relative "ApplicationInsightsUtility" + require_relative "constants" + + # refer tomlparser-agent-config for defaults + # this configurable via configmap + @DEPLOYMENTS_CHUNK_SIZE = 0 + + @DEPLOYMENTS_API_GROUP = "apps" + @@telemetryLastSentTime = DateTime.now.to_time.to_i + + @deploymentsRunningTotal = 0 + + @NodeName = OMS::Common.get_hostname + @ClusterId = KubernetesApiClient.getClusterId + @ClusterName = KubernetesApiClient.getClusterName + end + + config_param :run_interval, :time, :default => 60 + config_param :tag, :string, :default => Constants::INSIGHTSMETRICS_FLUENT_TAG + + def configure(conf) + super + end + + def start + if @run_interval + if !ENV["DEPLOYMENTS_CHUNK_SIZE"].nil? && !ENV["DEPLOYMENTS_CHUNK_SIZE"].empty? && ENV["DEPLOYMENTS_CHUNK_SIZE"].to_i > 0 + @DEPLOYMENTS_CHUNK_SIZE = ENV["DEPLOYMENTS_CHUNK_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kubestate_deployments::start: setting to default value since got DEPLOYMENTS_CHUNK_SIZE nil or empty") + @DEPLOYMENTS_CHUNK_SIZE = 500 end + $log.info("in_kubestate_deployments::start : DEPLOYMENTS_CHUNK_SIZE @ #{@DEPLOYMENTS_CHUNK_SIZE}") + + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join end - - def enumerate - begin - deploymentList = nil - currentTime = Time.now - batchTime = currentTime.utc.iso8601 - - #set the running total for this batch to 0 - @deploymentsRunningTotal = 0 - - # Initializing continuation token to nil - continuationToken = nil - $log.info("in_kubestate_deployments::enumerate : Getting deployments from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}", api_group: @DEPLOYMENTS_API_GROUP) - $log.info("in_kubestate_deployments::enumerate : Done getting deployments from Kube API @ #{Time.now.utc.iso8601}") + end + + def enumerate + begin + deploymentList = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + #set the running total for this batch to 0 + @deploymentsRunningTotal = 0 + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kubestate_deployments::enumerate : Getting deployments from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}", api_group: @DEPLOYMENTS_API_GROUP) + $log.info("in_kubestate_deployments::enumerate : Done getting deployments from Kube API @ #{Time.now.utc.iso8601}") + if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?) + $log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}") + parse_and_emit_records(deploymentList, batchTime) + else + $log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList" + end + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @DEPLOYMENTS_API_GROUP) if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?) + $log.info("in_kubestate_deployments::enumerate : number of deployment items :#{deploymentList["items"].length} from Kube API @ #{Time.now.utc.iso8601}") parse_and_emit_records(deploymentList, batchTime) else $log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList" end - - #If we receive a continuation token, make calls, process and flush data until we have processed all data - while (!continuationToken.nil? && !continuationToken.empty?) - continuationToken, deploymentList = KubernetesApiClient.getResourcesAndContinuationToken("deployments?limit=#{@DEPLOYMENTS_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @DEPLOYMENTS_API_GROUP) - if (!deploymentList.nil? && !deploymentList.empty? && deploymentList.key?("items") && !deploymentList["items"].nil? && !deploymentList["items"].empty?) - parse_and_emit_records(deploymentList, batchTime) - else - $log.warn "in_kubestate_deployments::enumerate:Received empty deploymentList" - end + end + + # Setting this to nil so that we dont hold memory until GC kicks in + deploymentList = nil + + $log.info("successfully emitted a total of #{@deploymentsRunningTotal} kube_state_deployment metrics") + # Flush AppInsights telemetry once all the processing is done, only if the number of events flushed is greater than 0 + if (@deploymentsRunningTotal > @@deploymentsCount) + @@deploymentsCount = @deploymentsRunningTotal + end + if (((DateTime.now.to_time.to_i - @@telemetryLastSentTime).abs) / 60) >= Constants::KUBE_STATE_TELEMETRY_FLUSH_INTERVAL_IN_MINUTES + #send telemetry + $log.info "sending deployemt telemetry..." + ApplicationInsightsUtility.sendMetricTelemetry("MaxDeploymentCount", @@deploymentsCount, {}) + #reset last sent value & time + @@deploymentsCount = 0 + @@telemetryLastSentTime = DateTime.now.to_time.to_i + end + rescue => errorStr + $log.warn "in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}") + end + end # end enumerate + + def parse_and_emit_records(deployments, batchTime = Time.utc.iso8601) + metricItems = [] + insightsMetricsEventStream = MultiEventStream.new + begin + metricInfo = deployments + metricInfo["items"].each do |deployment| + deploymentName = deployment["metadata"]["name"] + deploymentNameSpace = deployment["metadata"]["namespace"] + deploymentCreatedTime = "" + if !deployment["metadata"]["creationTimestamp"].nil? + deploymentCreatedTime = deployment["metadata"]["creationTimestamp"] + end + deploymentStrategy = "RollingUpdate" #default when not specified as per spec + if !deployment["spec"]["strategy"].nil? && !deployment["spec"]["strategy"]["type"].nil? + deploymentStrategy = deployment["spec"]["strategy"]["type"] end - - # Setting this to nil so that we dont hold memory until GC kicks in - deploymentList = nil - - $log.info("successfully emitted a total of #{@deploymentsRunningTotal} kube_state_deployment metrics") - # Flush AppInsights telemetry once all the processing is done, only if the number of events flushed is greater than 0 - if (@deploymentsRunningTotal > @@deploymentsCount) - @@deploymentsCount = @deploymentsRunningTotal + deploymentSpecReplicas = 1 #default is 1 as per k8s spec + if !deployment["spec"]["replicas"].nil? + deploymentSpecReplicas = deployment["spec"]["replicas"] end - if (((DateTime.now.to_time.to_i - @@telemetryLastSentTime).abs)/60 ) >= Constants::KUBE_STATE_TELEMETRY_FLUSH_INTERVAL_IN_MINUTES - #send telemetry - $log.info "sending deployemt telemetry..." - ApplicationInsightsUtility.sendMetricTelemetry("MaxDeploymentCount", @@deploymentsCount, {}) - #reset last sent value & time - @@deploymentsCount = 0 - @@telemetryLastSentTime = DateTime.now.to_time.to_i + deploymentStatusReadyReplicas = 0 + if !deployment["status"]["readyReplicas"].nil? + deploymentStatusReadyReplicas = deployment["status"]["readyReplicas"] end - rescue => errorStr - $log.warn "in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::enumerate:Failed in enumerate: #{errorStr}") + deploymentStatusUpToDateReplicas = 0 + if !deployment["status"]["updatedReplicas"].nil? + deploymentStatusUpToDateReplicas = deployment["status"]["updatedReplicas"] + end + deploymentStatusAvailableReplicas = 0 + if !deployment["status"]["availableReplicas"].nil? + deploymentStatusAvailableReplicas = deployment["status"]["availableReplicas"] + end + + metricItem = {} + metricItem["CollectionTime"] = batchTime + metricItem["Computer"] = @NodeName + metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_DEPLOYMENT_STATE + metricItem["Value"] = deploymentStatusReadyReplicas + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_NAME] = deploymentName + metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = deploymentNameSpace + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STRATEGY] = deploymentStrategy + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = deploymentCreatedTime + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_SPEC_REPLICAS] = deploymentSpecReplicas + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_UPDATED] = deploymentStatusUpToDateReplicas + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_AVAILABLE] = deploymentStatusAvailableReplicas + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) end - end # end enumerate - - def parse_and_emit_records(deployments, batchTime = Time.utc.iso8601) - metricItems = [] - insightsMetricsEventStream = MultiEventStream.new - begin - metricInfo = deployments - metricInfo["items"].each do |deployment| - deploymentName = deployment["metadata"]["name"] - deploymentNameSpace = deployment["metadata"]["namespace"] - deploymentCreatedTime = "" - if !deployment["metadata"]["creationTimestamp"].nil? - deploymentCreatedTime = deployment["metadata"]["creationTimestamp"] - end - deploymentStrategy = "RollingUpdate" #default when not specified as per spec - if !deployment["spec"]["strategy"].nil? && !deployment["spec"]["strategy"]["type"].nil? - deploymentStrategy = deployment["spec"]["strategy"]["type"] - end - deploymentSpecReplicas = 1 #default is 1 as per k8s spec - if !deployment["spec"]["replicas"].nil? - deploymentSpecReplicas = deployment["spec"]["replicas"] - end - deploymentStatusReadyReplicas = 0 - if !deployment["status"]["readyReplicas"].nil? - deploymentStatusReadyReplicas = deployment["status"]["readyReplicas"] - end - deploymentStatusUpToDateReplicas = 0 - if !deployment["status"]["updatedReplicas"].nil? - deploymentStatusUpToDateReplicas = deployment["status"]["updatedReplicas"] - end - deploymentStatusAvailableReplicas = 0 - if !deployment["status"]["availableReplicas"].nil? - deploymentStatusAvailableReplicas = deployment["status"]["availableReplicas"] - end - - metricItem = {} - metricItem["CollectionTime"] = batchTime - metricItem["Computer"] = @NodeName - metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_DEPLOYMENT_STATE - metricItem["Value"] = deploymentStatusReadyReplicas - metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN - metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE - - metricTags = {} - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_NAME] = deploymentName - metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = deploymentNameSpace - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STRATEGY ] = deploymentStrategy - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = deploymentCreatedTime - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_SPEC_REPLICAS] = deploymentSpecReplicas - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_UPDATED] = deploymentStatusUpToDateReplicas - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_DEPLOYMENT_STATUS_REPLICAS_AVAILABLE] = deploymentStatusAvailableReplicas - - - metricItem["Tags"] = metricTags - - metricItems.push(metricItem) - end - - time = Time.now.to_f - metricItems.each do |insightsMetricsRecord| - wrapper = { - "DataType" => "INSIGHTS_METRICS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], - } - insightsMetricsEventStream.add(time, wrapper) if wrapper - end - - router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream - $log.info("successfully emitted #{metricItems.length()} kube_state_deployment metrics") - @deploymentsRunningTotal = @deploymentsRunningTotal + metricItems.length() - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) - $log.info("kubestatedeploymentsInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - rescue => error - $log.warn("in_kubestate_deployments::parse_and_emit_records failed: #{error} ") - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::parse_and_emit_records failed: #{error}") + + time = Time.now.to_f + metricItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(time, wrapper) if wrapper + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + $log.info("successfully emitted #{metricItems.length()} kube_state_deployment metrics") + + @deploymentsRunningTotal = @deploymentsRunningTotal + metricItems.length() + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("kubestatedeploymentsInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") end - + rescue => error + $log.warn("in_kubestate_deployments::parse_and_emit_records failed: #{error} ") + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::parse_and_emit_records failed: #{error}") end - - def run_periodic - @mutex.lock + end + + def run_periodic + @mutex.lock + done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval + until done + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished - @nextTimeToRun = Time.now - @waitTimeout = @run_interval - until done - @nextTimeToRun = @nextTimeToRun + @run_interval - @now = Time.now - if @nextTimeToRun <= @now - @waitTimeout = 1 - @nextTimeToRun = @now - else - @waitTimeout = @nextTimeToRun - @now - end - @condition.wait(@mutex, @waitTimeout) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kubestate_deployments::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") - enumerate - $log.info("in_kubestate_deployments::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") - rescue => errorStr - $log.warn "in_kubestate_deployments::run_periodic: enumerate Failed to retrieve kube deployments: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::run_periodic: enumerate Failed to retrieve kube deployments: #{errorStr}") - end + @mutex.unlock + if !done + begin + $log.info("in_kubestate_deployments::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") + enumerate + $log.info("in_kubestate_deployments::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") + rescue => errorStr + $log.warn "in_kubestate_deployments::run_periodic: enumerate Failed to retrieve kube deployments: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_deployments::run_periodic: enumerate Failed to retrieve kube deployments: #{errorStr}") end - @mutex.lock end - @mutex.unlock + @mutex.lock end + @mutex.unlock end -end \ No newline at end of file + end +end diff --git a/source/plugins/ruby/in_kubestate_hpa.rb b/source/plugins/ruby/in_kubestate_hpa.rb index 3ce63a75a..afecf8e3b 100644 --- a/source/plugins/ruby/in_kubestate_hpa.rb +++ b/source/plugins/ruby/in_kubestate_hpa.rb @@ -2,231 +2,236 @@ # frozen_string_literal: true module Fluent - class Kube_Kubestate_HPA_Input < Input - Plugin.register_input("kubestatehpa", self) - @@istestvar = ENV["ISTEST"] - - - def initialize - super - require "yajl/json_gem" - require "yajl" - require "time" - - require_relative "KubernetesApiClient" - require_relative "oms_common" - require_relative "omslog" - require_relative "ApplicationInsightsUtility" - require_relative "constants" - - # roughly each HPA is 3k - # 2000 HPAs account to approximately 6-7MB - @HPA_CHUNK_SIZE = 2000 - @HPA_API_GROUP = "autoscaling" - - # telemetry - @hpaCount = 0 - - @NodeName = OMS::Common.get_hostname - @ClusterId = KubernetesApiClient.getClusterId - @ClusterName = KubernetesApiClient.getClusterName - end - - config_param :run_interval, :time, :default => 60 - config_param :tag, :string, :default => Constants::INSIGHTSMETRICS_FLUENT_TAG - - def configure(conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) + class Kube_Kubestate_HPA_Input < Input + Plugin.register_input("kubestatehpa", self) + @@istestvar = ENV["ISTEST"] + + def initialize + super + require "yajl/json_gem" + require "yajl" + require "time" + + require_relative "KubernetesApiClient" + require_relative "oms_common" + require_relative "omslog" + require_relative "ApplicationInsightsUtility" + require_relative "constants" + + # refer tomlparser-agent-config for defaults + # this configurable via configmap + @HPA_CHUNK_SIZE = 0 + + @HPA_API_GROUP = "autoscaling" + + # telemetry + @hpaCount = 0 + + @NodeName = OMS::Common.get_hostname + @ClusterId = KubernetesApiClient.getClusterId + @ClusterName = KubernetesApiClient.getClusterName + end + + config_param :run_interval, :time, :default => 60 + config_param :tag, :string, :default => Constants::INSIGHTSMETRICS_FLUENT_TAG + + def configure(conf) + super + end + + def start + if @run_interval + if !ENV["HPA_CHUNK_SIZE"].nil? && !ENV["HPA_CHUNK_SIZE"].empty? && ENV["HPA_CHUNK_SIZE"].to_i > 0 + @HPA_CHUNK_SIZE = ENV["HPA_CHUNK_SIZE"].to_i + else + # this shouldnt happen just setting default here as safe guard + $log.warn("in_kubestate_hpa::start: setting to default value since got HPA_CHUNK_SIZE nil or empty") + @HPA_CHUNK_SIZE = 2000 end + $log.info("in_kubestate_hpa::start : HPA_CHUNK_SIZE @ #{@HPA_CHUNK_SIZE}") + + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join end - - def enumerate - begin - hpaList = nil - currentTime = Time.now - batchTime = currentTime.utc.iso8601 - - @hpaCount = 0 - - # Initializing continuation token to nil - continuationToken = nil - $log.info("in_kubestate_hpa::enumerate : Getting HPAs from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, hpaList = KubernetesApiClient.getResourcesAndContinuationToken("horizontalpodautoscalers?limit=#{@HPA_CHUNK_SIZE}", api_group: @HPA_API_GROUP) - $log.info("in_kubestate_hpa::enumerate : Done getting HPAs from Kube API @ #{Time.now.utc.iso8601}") + end + + def enumerate + begin + hpaList = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + @hpaCount = 0 + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kubestate_hpa::enumerate : Getting HPAs from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, hpaList = KubernetesApiClient.getResourcesAndContinuationToken("horizontalpodautoscalers?limit=#{@HPA_CHUNK_SIZE}", api_group: @HPA_API_GROUP) + $log.info("in_kubestate_hpa::enumerate : Done getting HPAs from Kube API @ #{Time.now.utc.iso8601}") + if (!hpaList.nil? && !hpaList.empty? && hpaList.key?("items") && !hpaList["items"].nil? && !hpaList["items"].empty?) + parse_and_emit_records(hpaList, batchTime) + else + $log.warn "in_kubestate_hpa::enumerate:Received empty hpaList" + end + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, hpaList = KubernetesApiClient.getResourcesAndContinuationToken("horizontalpodautoscalers?limit=#{@HPA_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @HPA_API_GROUP) if (!hpaList.nil? && !hpaList.empty? && hpaList.key?("items") && !hpaList["items"].nil? && !hpaList["items"].empty?) parse_and_emit_records(hpaList, batchTime) else $log.warn "in_kubestate_hpa::enumerate:Received empty hpaList" end - - #If we receive a continuation token, make calls, process and flush data until we have processed all data - while (!continuationToken.nil? && !continuationToken.empty?) - continuationToken, hpaList = KubernetesApiClient.getResourcesAndContinuationToken("horizontalpodautoscalers?limit=#{@HPA_CHUNK_SIZE}&continue=#{continuationToken}", api_group: @HPA_API_GROUP) - if (!hpaList.nil? && !hpaList.empty? && hpaList.key?("items") && !hpaList["items"].nil? && !hpaList["items"].empty?) - parse_and_emit_records(hpaList, batchTime) - else - $log.warn "in_kubestate_hpa::enumerate:Received empty hpaList" + end + + # Setting this to nil so that we dont hold memory until GC kicks in + hpaList = nil + + # Flush AppInsights telemetry once all the processing is done, only if the number of events flushed is greater than 0 + if (@hpaCount > 0) + # this will not be a useful telemetry, as hpa counts will not be huge, just log for now + $log.info("in_kubestate_hpa::hpaCount= #{hpaCount}") + #ApplicationInsightsUtility.sendMetricTelemetry("HPACount", @hpaCount, {}) + end + rescue => errorStr + $log.warn "in_kubestate_hpa::enumerate:Failed in enumerate: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::enumerate:Failed in enumerate: #{errorStr}") + end + end # end enumerate + + def parse_and_emit_records(hpas, batchTime = Time.utc.iso8601) + metricItems = [] + insightsMetricsEventStream = MultiEventStream.new + begin + metricInfo = hpas + metricInfo["items"].each do |hpa| + hpaName = hpa["metadata"]["name"] + hpaNameSpace = hpa["metadata"]["namespace"] + hpaCreatedTime = "" + if !hpa["metadata"]["creationTimestamp"].nil? + hpaCreatedTime = hpa["metadata"]["creationTimestamp"] + end + hpaSpecMinReplicas = 1 #default is 1 as per k8s spec + if !hpa["spec"]["minReplicas"].nil? + hpaSpecMinReplicas = hpa["spec"]["minReplicas"] + end + hpaSpecMaxReplicas = 0 + if !hpa["spec"]["maxReplicas"].nil? + hpaSpecMaxReplicas = hpa["spec"]["maxReplicas"] + end + hpaSpecScaleTargetKind = "" + hpaSpecScaleTargetName = "" + if !hpa["spec"]["scaleTargetRef"].nil? + if !hpa["spec"]["scaleTargetRef"]["kind"].nil? + hpaSpecScaleTargetKind = hpa["spec"]["scaleTargetRef"]["kind"] + end + if !hpa["spec"]["scaleTargetRef"]["name"].nil? + hpaSpecScaleTargetName = hpa["spec"]["scaleTargetRef"]["name"] end end - - # Setting this to nil so that we dont hold memory until GC kicks in - hpaList = nil - - # Flush AppInsights telemetry once all the processing is done, only if the number of events flushed is greater than 0 - if (@hpaCount > 0) - # this will not be a useful telemetry, as hpa counts will not be huge, just log for now - $log.info("in_kubestate_hpa::hpaCount= #{hpaCount}") - #ApplicationInsightsUtility.sendMetricTelemetry("HPACount", @hpaCount, {}) + hpaStatusCurrentReplicas = 0 + if !hpa["status"]["currentReplicas"].nil? + hpaStatusCurrentReplicas = hpa["status"]["currentReplicas"] end - rescue => errorStr - $log.warn "in_kubestate_hpa::enumerate:Failed in enumerate: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::enumerate:Failed in enumerate: #{errorStr}") + hpaStatusDesiredReplicas = 0 + if !hpa["status"]["desiredReplicas"].nil? + hpaStatusDesiredReplicas = hpa["status"]["desiredReplicas"] + end + + hpaStatuslastScaleTime = "" + if !hpa["status"]["lastScaleTime"].nil? + hpaStatuslastScaleTime = hpa["status"]["lastScaleTime"] + end + + metricItem = {} + metricItem["CollectionTime"] = batchTime + metricItem["Computer"] = @NodeName + metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_HPA_STATE + metricItem["Value"] = hpaStatusCurrentReplicas + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_NAME] = hpaName + metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = hpaNameSpace + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = hpaCreatedTime + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_MIN_REPLICAS] = hpaSpecMinReplicas + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_MAX_REPLICAS] = hpaSpecMaxReplicas + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_SCALE_TARGET_KIND] = hpaSpecScaleTargetKind + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_SCALE_TARGET_NAME] = hpaSpecScaleTargetName + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_STATUS_DESIRED_REPLICAS] = hpaStatusDesiredReplicas + metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_STATUS_LAST_SCALE_TIME] = hpaStatuslastScaleTime + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) end - end # end enumerate - - def parse_and_emit_records(hpas, batchTime = Time.utc.iso8601) - metricItems = [] - insightsMetricsEventStream = MultiEventStream.new - begin - metricInfo = hpas - metricInfo["items"].each do |hpa| - hpaName = hpa["metadata"]["name"] - hpaNameSpace = hpa["metadata"]["namespace"] - hpaCreatedTime = "" - if !hpa["metadata"]["creationTimestamp"].nil? - hpaCreatedTime = hpa["metadata"]["creationTimestamp"] - end - hpaSpecMinReplicas = 1 #default is 1 as per k8s spec - if !hpa["spec"]["minReplicas"].nil? - hpaSpecMinReplicas = hpa["spec"]["minReplicas"] - end - hpaSpecMaxReplicas = 0 - if !hpa["spec"]["maxReplicas"].nil? - hpaSpecMaxReplicas = hpa["spec"]["maxReplicas"] - end - hpaSpecScaleTargetKind = "" - hpaSpecScaleTargetName = "" - if !hpa["spec"]["scaleTargetRef"].nil? - if !hpa["spec"]["scaleTargetRef"]["kind"].nil? - hpaSpecScaleTargetKind = hpa["spec"]["scaleTargetRef"]["kind"] - end - if !hpa["spec"]["scaleTargetRef"]["name"].nil? - hpaSpecScaleTargetName = hpa["spec"]["scaleTargetRef"]["name"] - end - - end - hpaStatusCurrentReplicas = 0 - if !hpa["status"]["currentReplicas"].nil? - hpaStatusCurrentReplicas = hpa["status"]["currentReplicas"] - end - hpaStatusDesiredReplicas = 0 - if !hpa["status"]["desiredReplicas"].nil? - hpaStatusDesiredReplicas = hpa["status"]["desiredReplicas"] - end - - hpaStatuslastScaleTime = "" - if !hpa["status"]["lastScaleTime"].nil? - hpaStatuslastScaleTime = hpa["status"]["lastScaleTime"] - end - - - metricItem = {} - metricItem["CollectionTime"] = batchTime - metricItem["Computer"] = @NodeName - metricItem["Name"] = Constants::INSIGHTSMETRICS_METRIC_NAME_KUBE_STATE_HPA_STATE - metricItem["Value"] = hpaStatusCurrentReplicas - metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN - metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_KUBESTATE_NAMESPACE - - metricTags = {} - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID] = @ClusterId - metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = @ClusterName - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_NAME] = hpaName - metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = hpaNameSpace - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_CREATIONTIME] = hpaCreatedTime - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_MIN_REPLICAS] = hpaSpecMinReplicas - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_MAX_REPLICAS] = hpaSpecMaxReplicas - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_SCALE_TARGET_KIND] = hpaSpecScaleTargetKind - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_SPEC_SCALE_TARGET_NAME] = hpaSpecScaleTargetName - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_STATUS_DESIRED_REPLICAS] = hpaStatusDesiredReplicas - metricTags[Constants::INSIGHTSMETRICS_TAGS_KUBE_STATE_HPA_STATUS_LAST_SCALE_TIME] = hpaStatuslastScaleTime - - - metricItem["Tags"] = metricTags - - metricItems.push(metricItem) - end - time = Time.now.to_f - metricItems.each do |insightsMetricsRecord| - wrapper = { - "DataType" => "INSIGHTS_METRICS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], - } - insightsMetricsEventStream.add(time, wrapper) if wrapper - end - - router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream - $log.info("successfully emitted #{metricItems.length()} kube_state_hpa metrics") - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) - $log.info("kubestatehpaInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - rescue => error - $log.warn("in_kubestate_hpa::parse_and_emit_records failed: #{error} ") - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::parse_and_emit_records failed: #{error}") + time = Time.now.to_f + metricItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(time, wrapper) if wrapper + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + $log.info("successfully emitted #{metricItems.length()} kube_state_hpa metrics") + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("kubestatehpaInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") end - + rescue => error + $log.warn("in_kubestate_hpa::parse_and_emit_records failed: #{error} ") + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::parse_and_emit_records failed: #{error}") end - - def run_periodic - @mutex.lock + end + + def run_periodic + @mutex.lock + done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval + until done + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished - @nextTimeToRun = Time.now - @waitTimeout = @run_interval - until done - @nextTimeToRun = @nextTimeToRun + @run_interval - @now = Time.now - if @nextTimeToRun <= @now - @waitTimeout = 1 - @nextTimeToRun = @now - else - @waitTimeout = @nextTimeToRun - @now - end - @condition.wait(@mutex, @waitTimeout) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kubestate_hpa::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") - enumerate - $log.info("in_kubestate_hpa::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") - rescue => errorStr - $log.warn "in_kubestate_hpa::run_periodic: enumerate Failed to retrieve kube hpas: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::run_periodic: enumerate Failed to retrieve kube hpas: #{errorStr}") - end + @mutex.unlock + if !done + begin + $log.info("in_kubestate_hpa::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") + enumerate + $log.info("in_kubestate_hpa::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") + rescue => errorStr + $log.warn "in_kubestate_hpa::run_periodic: enumerate Failed to retrieve kube hpas: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry("in_kubestate_hpa::run_periodic: enumerate Failed to retrieve kube hpas: #{errorStr}") end - @mutex.lock end - @mutex.unlock + @mutex.lock end + @mutex.unlock end -end \ No newline at end of file + end +end