Skip to content

Commit

Permalink
chg: containers Plugin - basic pod support
Browse files Browse the repository at this point in the history
StatsFetcher -> StatsStreamer
  • Loading branch information
RazCrimson committed Feb 18, 2023
1 parent 928752a commit 77759ea
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 151 deletions.
85 changes: 36 additions & 49 deletions glances/plugins/containers/glances_docker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Docker Extension unit for Glances' Containers plugin."""
import threading
import time

from glances.compat import iterkeys, itervalues, nativestr, pretty_date
from glances.logger import logger
from glances.plugins.containers.stats_fetcher import StatsFetcher
from glances.plugins.containers.stats_streamer import StatsStreamer

# Docker-py library (optional and Linux-only)
# https://github.com/docker/docker-py
Expand All @@ -19,48 +18,43 @@
import_docker_error_tag = False


class DockerStatsFetcher(StatsFetcher):
class DockerStatsFetcher:
MANDATORY_MEMORY_FIELDS = ["usage", 'limit']

def __init__(self, container):
super().__init__(container)
# Lock to avoid the daemon thread updating stats when main thread reads the stats
self._stats_lock = threading.Lock()
self._container = container

# Previous computes stats are stored in the self._old_computed_stats variable
# By storing time data we enable IoR/s and IoW/s calculations in the XML/RPC API, which would otherwise
# be overly difficult work for users of the API
# We store time data to enable IoR/s & IoW/s calculations to avoid complexity for consumers of the APIs exposed.
self._old_computed_stats = {}

# Last time when output stats (results) were computed
self._last_stats_output_time = 0
# Last time when the raw_stats were updated by worker thread
self._last_raws_stats_update_time = 1
self._last_stats_computed_time = 0

# Threaded Streamer
stats_iterable = container.stats(decode=True)
self._streamer = StatsStreamer(stats_iterable, initial_stream_value={})

def _log_debug(self, msg, exception=None):
logger.debug("containers (Docker) ID: {} - {} ({}) ".format(self._container.id, msg, exception))
logger.debug(self._streamer.stats)

def stop(self):
self._streamer.stop()

@property
def activity_stats(self):
"""Activity Stats
Each successive access of activity_stats will cause computation of activity_stats from raw_stats
Each successive access of activity_stats will cause computation of activity_stats
"""
computed_activity_stats = self._compute_activity_stats()
self._old_computed_stats = computed_activity_stats
self._last_stats_output_time = time.time()
self._last_stats_computed_time = time.time()
return computed_activity_stats

def _pre_raw_stats_update_hook(self):
self._stats_lock.acquire()

def _post_raw_stats_update_hook(self):
self._last_raws_stats_update_time = time.time()
self._stats_lock.release()

@property
def time_since_update(self):
return self._last_raws_stats_update_time - self._last_stats_output_time

def _compute_activity_stats(self):
with self._stats_lock:
with self._streamer.result_lock:
io_stats = self._get_io_stats()
cpu_stats = self._get_cpu_stats()
memory_stats = self._get_memory_stats()
Expand All @@ -74,6 +68,11 @@ def _compute_activity_stats(self):
}
return computed_stats

@property
def time_since_update(self):
# In case no update, default to 1
return max(1, self._streamer.last_update_time - self._last_stats_computed_time)

def _get_cpu_stats(self):
"""Return the container CPU usage.
Expand All @@ -82,8 +81,8 @@ def _get_cpu_stats(self):
stats = {'total': 0.0}

try:
cpu_stats = self.stats['cpu_stats']
precpu_stats = self.stats['precpu_stats']
cpu_stats = self._streamer.stats['cpu_stats']
precpu_stats = self._streamer.stats['precpu_stats']
cpu = {'system': cpu_stats['system_cpu_usage'], 'total': cpu_stats['cpu_usage']['total_usage']}
precpu = {'system': precpu_stats['system_cpu_usage'], 'total': precpu_stats['cpu_usage']['total_usage']}

Expand All @@ -93,8 +92,7 @@ def _get_cpu_stats(self):
# the corresponding cpu_usage.percpu_usage array should be used.
cpu['nb_core'] = cpu_stats.get('online_cpus') or len(cpu_stats['cpu_usage']['percpu_usage'] or [])
except KeyError as e:
logger.debug("containers plugin - Can't grab CPU stat for container {} ({})".format(self._container.id, e))
logger.debug(self.stats)
self._log_debug("Can't grab CPU stats", e)
return None

try:
Expand All @@ -103,9 +101,7 @@ def _get_cpu_stats(self):
# CPU usage % = (cpu_delta / system_cpu_delta) * number_cpus * 100.0
stats['total'] = (cpu_delta / system_cpu_delta) * cpu['nb_core'] * 100.0
except TypeError as e:
msg = "containers plugin - Can't compute CPU usage for container {} ({})".format(self._container.id, e)
logger.debug(msg)
logger.debug(self.stats)
self._log_debug("Can't compute CPU usage", e)
return None

# Return the stats
Expand All @@ -116,12 +112,11 @@ def _get_memory_stats(self):
Output: a dict {'rss': 1015808, 'cache': 356352, 'usage': ..., 'max_usage': ...}
"""
memory_stats = self.stats.get('memory_stats')
memory_stats = self._streamer.stats.get('memory_stats')

# Checks for memory_stats & mandatory fields
if not memory_stats or any(field not in memory_stats for field in self.MANDATORY_MEMORY_FIELDS):
logger.debug("containers plugin - Missing MEM usage fields for container {}".format(self._container.id))
logger.debug(self.stats)
self._log_debug("Missing MEM usage fields")
return None

stats = {field: memory_stats[field] for field in self.MANDATORY_MEMORY_FIELDS}
Expand All @@ -132,9 +127,7 @@ def _get_memory_stats(self):
stats['max_usage'] = detailed_stats.get('max_usage')
stats['cache'] = detailed_stats.get('cache')
except (KeyError, TypeError) as e:
# self.stats do not have MEM information
logger.debug("containers plugin - Can't grab MEM usage for container {} ({})".format(self._container.id, e))
logger.debug(self.stats)
self._log_debug("Can't grab MEM usage", e) # stats do not have MEM information
return None

# Return the stats
Expand All @@ -149,12 +142,11 @@ def _get_network_stats(self):
rx: Number of bytes received
tx: Number of bytes transmitted
"""
eth0_stats = self.stats.get('networks', {}).get('eth0')
eth0_stats = self._streamer.stats.get('networks', {}).get('eth0')

# Checks for net_stats & mandatory fields
if not eth0_stats or any(field not in eth0_stats for field in ['rx_bytes', 'tx_bytes']):
logger.debug("containers plugin - Missing Network usage fields for container {}".format(self._container.id))
logger.debug(self.stats)
self._log_debug("Missing Network usage fields")
return None

# Read the rx/tx stats (in bytes)
Expand All @@ -179,12 +171,11 @@ def _get_io_stats(self):
ior: Number of bytes read
iow: Number of bytes written
"""
io_service_bytes_recursive = self.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')
io_service_bytes_recursive = self._streamer.stats.get('blkio_stats', {}).get('io_service_bytes_recursive')

# Checks for net_stats
if not io_service_bytes_recursive:
logger.debug("containers plugin - Missing blockIO usage fields for container {}".format(self._container.id))
logger.debug(self.stats)
self._log_debug("Missing blockIO usage fields")
return None

# Read the ior/iow stats (in bytes)
Expand All @@ -193,11 +184,7 @@ def _get_io_stats(self):
cumulative_ior = [i for i in io_service_bytes_recursive if i['op'].lower() == 'read'][0]['value']
cumulative_iow = [i for i in io_service_bytes_recursive if i['op'].lower() == 'write'][0]['value']
except (TypeError, IndexError, KeyError, AttributeError) as e:
# self.stats do not have io information
logger.debug(
"containers plugin - Can't grab blockIO usage for container {} ({})".format(self._container.id, e)
)
logger.debug(self.stats)
self._log_debug("Can't grab blockIO usage", e) # stats do not have io information
return None

stats = {'cumulative_ior': cumulative_ior, 'cumulative_iow': cumulative_iow}
Expand Down
Loading

0 comments on commit 77759ea

Please sign in to comment.