Skip to content

Commit

Permalink
Add cgroupv2 support for log collector (Azure#3188)
Browse files Browse the repository at this point in the history
* Lc v2 implementation branch (#18)

* memory experimentation changes

* Initial changes

* obvious issues

* Fix e2e test

* First round of unit test fixes

* Fix existing unit tests

* Remove unneeded cpu files

* Get memory usage should return tuple

* Fix log for tracking cgroup

* Add unit tests

* Add unit tests

* Address pylint comments

* Clean up code

* clean up code

* Fix unit tests (#19)

* Fix unit tests

* Fix unit tests

* Revisions (#20)

* Respond to comments

* Test failures

* Fix type issue

* Revisions

* Additional revisions (#21)

* Revisions

* Remove unit test for sending telem

* final fixes

* add config flag

* Fix e2e tests
  • Loading branch information
maddieford authored Aug 26, 2024
1 parent fc7644a commit 0681f14
Show file tree
Hide file tree
Showing 57 changed files with 2,354 additions and 1,043 deletions.
37 changes: 27 additions & 10 deletions azurelinuxagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from __future__ import print_function

import json
import os
import re
import subprocess
Expand All @@ -31,7 +32,8 @@

from azurelinuxagent.common.exception import CGroupsException
from azurelinuxagent.ga import logcollector, cgroupconfigurator
from azurelinuxagent.ga.controllermetrics import AGENT_LOG_COLLECTOR, CpuMetrics
from azurelinuxagent.ga.cgroupcontroller import AGENT_LOG_COLLECTOR
from azurelinuxagent.ga.cpucontroller import _CpuController
from azurelinuxagent.ga.cgroupapi import get_cgroup_api, log_cgroup_warning, InvalidCgroupMountpointException

import azurelinuxagent.common.conf as conf
Expand Down Expand Up @@ -208,7 +210,7 @@ def collect_logs(self, is_full_mode):

# Check the cgroups unit
log_collector_monitor = None
tracked_metrics = []
tracked_controllers = []
if CollectLogsHandler.is_enabled_monitor_cgroups_check():
try:
cgroup_api = get_cgroup_api()
Expand All @@ -220,10 +222,10 @@ def collect_logs(self, is_full_mode):
sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

log_collector_cgroup = cgroup_api.get_process_cgroup(process_id="self", cgroup_name=AGENT_LOG_COLLECTOR)
tracked_metrics = log_collector_cgroup.get_controller_metrics()
tracked_controllers = log_collector_cgroup.get_controllers()

if len(tracked_metrics) != len(log_collector_cgroup.get_supported_controllers()):
log_cgroup_warning("At least one required controller is missing. The following controllers are required for the log collector to run: {0}".format(log_collector_cgroup.get_supported_controllers()))
if len(tracked_controllers) != len(log_collector_cgroup.get_supported_controller_names()):
log_cgroup_warning("At least one required controller is missing. The following controllers are required for the log collector to run: {0}".format(log_collector_cgroup.get_supported_controller_names()))
sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

if not log_collector_cgroup.check_in_expected_slice(cgroupconfigurator.LOGCOLLECTOR_SLICE):
Expand All @@ -235,15 +237,30 @@ def collect_logs(self, is_full_mode):
# Running log collector resource monitoring only if agent starts the log collector.
# If Log collector start by any other means, then it will not be monitored.
if CollectLogsHandler.is_enabled_monitor_cgroups_check():
for metric in tracked_metrics:
if isinstance(metric, CpuMetrics):
metric.initialize_cpu_usage()
for controller in tracked_controllers:
if isinstance(controller, _CpuController):
controller.initialize_cpu_usage()
break
log_collector_monitor = get_log_collector_monitor_handler(tracked_metrics)
log_collector_monitor = get_log_collector_monitor_handler(tracked_controllers)
log_collector_monitor.run()
archive = log_collector.collect_logs_and_get_archive()

archive, total_uncompressed_size = log_collector.collect_logs_and_get_archive()
logger.info("Log collection successfully completed. Archive can be found at {0} "
"and detailed log output can be found at {1}".format(archive, OUTPUT_RESULTS_FILE_PATH))

if log_collector_monitor is not None:
log_collector_monitor.stop()
try:
metrics_summary = log_collector_monitor.get_max_recorded_metrics()
metrics_summary['Total Uncompressed File Size (B)'] = total_uncompressed_size
msg = json.dumps(metrics_summary)
logger.info(msg)
event.add_event(op=event.WALAEventOperation.LogCollection, message=msg, log_event=False)
except Exception as e:
msg = "An error occurred while reporting log collector resource usage summary: {0}".format(ustr(e))
logger.warn(msg)
event.add_event(op=event.WALAEventOperation.LogCollection, is_success=False, message=msg, log_event=False)

except Exception as e:
logger.error("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
logger.info("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
Expand Down
23 changes: 21 additions & 2 deletions azurelinuxagent/common/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.CgroupDisableOnQuotaCheckFailure": True,
"Debug.EnableAgentMemoryUsageCheck": False,
"Debug.EnableFastTrack": True,
"Debug.EnableGAVersioning": True
"Debug.EnableGAVersioning": True,
"Debug.EnableCgroupV2ResourceLimiting": False
}


Expand Down Expand Up @@ -200,7 +201,8 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.EtpCollectionPeriod": 300,
"Debug.AutoUpdateHotfixFrequency": 14400,
"Debug.AutoUpdateNormalFrequency": 86400,
"Debug.FirewallRulesLogPeriod": 86400
"Debug.FirewallRulesLogPeriod": 86400,
"Debug.LogCollectorInitialDelay": 5 * 60
}


Expand Down Expand Up @@ -680,3 +682,20 @@ def get_firewall_rules_log_period(conf=__conf__):
NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_int("Debug.FirewallRulesLogPeriod", 86400)


def get_enable_cgroup_v2_resource_limiting(conf=__conf__):
"""
If True, the agent will enable resource monitoring and enforcement for the log collector on machines using cgroup v2.
NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_switch("Debug.EnableCgroupV2ResourceLimiting", False)


def get_log_collector_initial_delay(conf=__conf__):
"""
Determine the initial delay at service start before the first periodic log collection.
NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_int("Debug.LogCollectorInitialDelay", 5 * 60)
113 changes: 73 additions & 40 deletions azurelinuxagent/ga/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

from azurelinuxagent.common import logger
from azurelinuxagent.common.event import WALAEventOperation, add_event
from azurelinuxagent.ga.controllermetrics import CpuMetrics, MemoryMetrics
from azurelinuxagent.ga.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.ga.cpucontroller import _CpuController, CpuControllerV1, CpuControllerV2
from azurelinuxagent.ga.memorycontroller import MemoryControllerV1, MemoryControllerV2
from azurelinuxagent.common.conf import get_agent_pid_file_path
from azurelinuxagent.common.exception import CGroupsException, ExtensionErrorCodes, ExtensionError, \
ExtensionOperationError
Expand Down Expand Up @@ -292,7 +293,7 @@ def _get_controller_mountpoints():
if match is not None:
path = match.group('path')
controller = match.group('controller')
if controller is not None and path is not None and controller in CgroupV1.get_supported_controllers():
if controller is not None and path is not None and controller in CgroupV1.get_supported_controller_names():
mount_points[controller] = path
return mount_points

Expand Down Expand Up @@ -335,7 +336,7 @@ def _get_process_relative_controller_paths(process_id):
if match is not None:
controller = match.group('controller')
path = match.group('path').lstrip('/') if match.group('path') != '/' else None
if path is not None and controller in CgroupV1.get_supported_controllers():
if path is not None and controller in CgroupV1.get_supported_controller_names():
conroller_relative_paths[controller] = path

return conroller_relative_paths
Expand Down Expand Up @@ -371,7 +372,7 @@ def get_process_cgroup(self, process_id, cgroup_name):
controller_paths=process_controller_paths)

def log_root_paths(self):
for controller in CgroupV1.get_supported_controllers():
for controller in CgroupV1.get_supported_controller_names():
mount_point = self._cgroup_mountpoints.get(controller)
if mount_point is None:
log_cgroup_info("The {0} controller is not mounted".format(controller), send_event=False)
Expand Down Expand Up @@ -402,14 +403,14 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh

log_cgroup_info("Started extension in unit '{0}'".format(scope_name), send_event=False)

cpu_metrics = None
cpu_controller = None
try:
cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name)
cgroup = self.get_cgroup_from_relative_path(cgroup_relative_path, extension_name)
for metrics in cgroup.get_controller_metrics():
if isinstance(metrics, CpuMetrics):
cpu_metrics = metrics
CGroupsTelemetry.track_cgroup(metrics)
for controller in cgroup.get_controllers():
if isinstance(controller, _CpuController):
cpu_controller = controller
CGroupsTelemetry.track_cgroup_controller(controller)

except IOError as e:
if e.errno == 2: # 'No such file or directory'
Expand All @@ -421,7 +422,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
# Wait for process completion or timeout
try:
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout,
stderr=stderr, error_code=error_code, cpu_metrics=cpu_metrics)
stderr=stderr, error_code=error_code, cpu_controller=cpu_controller)
except ExtensionError as e:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
Expand Down Expand Up @@ -498,7 +499,7 @@ def _get_controllers_enabled_at_root(root_cgroup_path):
enabled_controllers_file = os.path.join(root_cgroup_path, 'cgroup.subtree_control')
if os.path.exists(enabled_controllers_file):
controllers_enabled_at_root = fileutil.read_file(enabled_controllers_file).rstrip().split()
return list(set(controllers_enabled_at_root) & set(CgroupV2.get_supported_controllers()))
return list(set(controllers_enabled_at_root) & set(CgroupV2.get_supported_controller_names()))
return []

@staticmethod
Expand Down Expand Up @@ -546,7 +547,7 @@ def get_process_cgroup(self, process_id, cgroup_name):

def log_root_paths(self):
log_cgroup_info("The root cgroup path is {0}".format(self._root_cgroup_path), send_event=False)
for controller in CgroupV2.get_supported_controllers():
for controller in CgroupV2.get_supported_controller_names():
if controller in self._controllers_enabled_at_root:
log_cgroup_info("The {0} controller is enabled at the root cgroup".format(controller), send_event=False)
else:
Expand All @@ -564,9 +565,9 @@ def __init__(self, cgroup_name):
self._cgroup_name = cgroup_name

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
"""
Cgroup version specific. Returns a list of the controllers which the agent supports.
Cgroup version specific. Returns a list of the controllers which the agent supports as strings.
"""
raise NotImplementedError()

Expand All @@ -578,12 +579,12 @@ def check_in_expected_slice(self, expected_slice):
"""
raise NotImplementedError()

def get_controller_metrics(self, expected_relative_path=None):
def get_controllers(self, expected_relative_path=None):
"""
Cgroup version specific. Returns a list of the metrics for the agent supported controllers which are
mounted/enabled for the cgroup.
Cgroup version specific. Returns a list of the agent supported controllers which are mounted/enabled for the cgroup.
:param expected_relative_path: The expected relative path of the cgroup. If provided, only metrics for controllers at this expected path will be returned.
:param expected_relative_path: The expected relative path of the cgroup. If provided, only controllers mounted
at this expected path will be returned.
"""
raise NotImplementedError()

Expand All @@ -608,7 +609,7 @@ def __init__(self, cgroup_name, controller_mountpoints, controller_paths):
self._controller_paths = controller_paths

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
return [CgroupV1.CPU_CONTROLLER, CgroupV1.MEMORY_CONTROLLER]

def check_in_expected_slice(self, expected_slice):
Expand All @@ -620,39 +621,39 @@ def check_in_expected_slice(self, expected_slice):

return in_expected_slice

def get_controller_metrics(self, expected_relative_path=None):
metrics = []
def get_controllers(self, expected_relative_path=None):
controllers = []

for controller in self.get_supported_controllers():
controller_metrics = None
controller_path = self._controller_paths.get(controller)
controller_mountpoint = self._controller_mountpoints.get(controller)
for supported_controller_name in self.get_supported_controller_names():
controller = None
controller_path = self._controller_paths.get(supported_controller_name)
controller_mountpoint = self._controller_mountpoints.get(supported_controller_name)

if controller_mountpoint is None:
log_cgroup_warning("{0} controller is not mounted; will not track metrics".format(controller), send_event=False)
log_cgroup_warning("{0} controller is not mounted; will not track".format(supported_controller_name), send_event=False)
continue

if controller_path is None:
log_cgroup_warning("{0} is not mounted for the {1} cgroup; will not track metrics".format(controller, self._cgroup_name), send_event=False)
log_cgroup_warning("{0} is not mounted for the {1} cgroup; will not track".format(supported_controller_name, self._cgroup_name), send_event=False)
continue

if expected_relative_path is not None:
expected_path = os.path.join(controller_mountpoint, expected_relative_path)
if controller_path != expected_path:
log_cgroup_warning("The {0} controller is not mounted at the expected path for the {1} cgroup; will not track metrics. Actual cgroup path:[{2}] Expected:[{3}]".format(controller, self._cgroup_name, controller_path, expected_path), send_event=False)
log_cgroup_warning("The {0} controller is not mounted at the expected path for the {1} cgroup; will not track. Actual cgroup path:[{2}] Expected:[{3}]".format(supported_controller_name, self._cgroup_name, controller_path, expected_path), send_event=False)
continue

if controller == self.CPU_CONTROLLER:
controller_metrics = CpuMetrics(self._cgroup_name, controller_path)
elif controller == self.MEMORY_CONTROLLER:
controller_metrics = MemoryMetrics(self._cgroup_name, controller_path)
if supported_controller_name == self.CPU_CONTROLLER:
controller = CpuControllerV1(self._cgroup_name, controller_path)
elif supported_controller_name == self.MEMORY_CONTROLLER:
controller = MemoryControllerV1(self._cgroup_name, controller_path)

if controller_metrics is not None:
msg = "{0} metrics for cgroup: {1}".format(controller, controller_metrics)
if controller is not None:
msg = "{0} controller for cgroup: {1}".format(supported_controller_name, controller)
log_cgroup_info(msg, send_event=False)
metrics.append(controller_metrics)
controllers.append(controller)

return metrics
return controllers

def get_controller_procs_path(self, controller):
controller_path = self._controller_paths.get(controller)
Expand Down Expand Up @@ -687,7 +688,7 @@ def __init__(self, cgroup_name, root_cgroup_path, cgroup_path, enabled_controlle
self._enabled_controllers = enabled_controllers

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
return [CgroupV2.CPU_CONTROLLER, CgroupV2.MEMORY_CONTROLLER]

def check_in_expected_slice(self, expected_slice):
Expand All @@ -697,9 +698,41 @@ def check_in_expected_slice(self, expected_slice):

return True

def get_controller_metrics(self, expected_relative_path=None):
# TODO - Implement controller metrics for cgroup v2
raise NotImplementedError()
def get_controllers(self, expected_relative_path=None):
controllers = []

for supported_controller_name in self.get_supported_controller_names():
controller = None

if supported_controller_name not in self._enabled_controllers:
log_cgroup_warning("{0} controller is not enabled; will not track".format(supported_controller_name),
send_event=False)
continue

if self._cgroup_path == "":
log_cgroup_warning("Cgroup path for {0} cannot be determined; will not track".format(self._cgroup_name),
send_event=False)
continue

if expected_relative_path is not None:
expected_path = os.path.join(self._root_cgroup_path, expected_relative_path)
if self._cgroup_path != expected_path:
log_cgroup_warning(
"The {0} cgroup is not mounted at the expected path; will not track. Actual cgroup path:[{1}] Expected:[{2}]".format(
self._cgroup_name, self._cgroup_path, expected_path), send_event=False)
continue

if supported_controller_name == self.CPU_CONTROLLER:
controller = CpuControllerV2(self._cgroup_name, self._cgroup_path)
elif supported_controller_name == self.MEMORY_CONTROLLER:
controller = MemoryControllerV2(self._cgroup_name, self._cgroup_path)

if controller is not None:
msg = "{0} controller for cgroup: {1}".format(supported_controller_name, controller)
log_cgroup_info(msg, send_event=False)
controllers.append(controller)

return controllers

def get_procs_path(self):
if self._cgroup_path != "":
Expand Down
Loading

0 comments on commit 0681f14

Please sign in to comment.