Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cgroupv2 support for log collector #3188

Merged
merged 6 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions azurelinuxagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from __future__ import print_function

import json
import os
import re
import subprocess
Expand All @@ -31,7 +32,8 @@

from azurelinuxagent.common.exception import CGroupsException
from azurelinuxagent.ga import logcollector, cgroupconfigurator
from azurelinuxagent.ga.controllermetrics import AGENT_LOG_COLLECTOR, CpuMetrics
from azurelinuxagent.ga.cgroupcontroller import AGENT_LOG_COLLECTOR
from azurelinuxagent.ga.cpucontroller import _CpuController
from azurelinuxagent.ga.cgroupapi import get_cgroup_api, log_cgroup_warning, InvalidCgroupMountpointException

import azurelinuxagent.common.conf as conf
Expand Down Expand Up @@ -208,7 +210,7 @@ def collect_logs(self, is_full_mode):

# Check the cgroups unit
log_collector_monitor = None
tracked_metrics = []
tracked_controllers = []
if CollectLogsHandler.is_enabled_monitor_cgroups_check():
try:
cgroup_api = get_cgroup_api()
Expand All @@ -220,10 +222,10 @@ def collect_logs(self, is_full_mode):
sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

log_collector_cgroup = cgroup_api.get_process_cgroup(process_id="self", cgroup_name=AGENT_LOG_COLLECTOR)
tracked_metrics = log_collector_cgroup.get_controller_metrics()
tracked_controllers = log_collector_cgroup.get_controllers()

if len(tracked_metrics) != len(log_collector_cgroup.get_supported_controllers()):
log_cgroup_warning("At least one required controller is missing. The following controllers are required for the log collector to run: {0}".format(log_collector_cgroup.get_supported_controllers()))
if len(tracked_controllers) != len(log_collector_cgroup.get_supported_controller_names()):
log_cgroup_warning("At least one required controller is missing. The following controllers are required for the log collector to run: {0}".format(log_collector_cgroup.get_supported_controller_names()))
sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

if not log_collector_cgroup.check_in_expected_slice(cgroupconfigurator.LOGCOLLECTOR_SLICE):
Expand All @@ -235,15 +237,30 @@ def collect_logs(self, is_full_mode):
# Running log collector resource monitoring only if agent starts the log collector.
# If Log collector start by any other means, then it will not be monitored.
if CollectLogsHandler.is_enabled_monitor_cgroups_check():
for metric in tracked_metrics:
if isinstance(metric, CpuMetrics):
metric.initialize_cpu_usage()
for controller in tracked_controllers:
if isinstance(controller, _CpuController):
controller.initialize_cpu_usage()
break
log_collector_monitor = get_log_collector_monitor_handler(tracked_metrics)
log_collector_monitor = get_log_collector_monitor_handler(tracked_controllers)
log_collector_monitor.run()
archive = log_collector.collect_logs_and_get_archive()

archive, total_uncompressed_size = log_collector.collect_logs_and_get_archive()
logger.info("Log collection successfully completed. Archive can be found at {0} "
"and detailed log output can be found at {1}".format(archive, OUTPUT_RESULTS_FILE_PATH))

if log_collector_monitor is not None:
log_collector_monitor.stop()
try:
metrics_summary = log_collector_monitor.get_max_recorded_metrics()
metrics_summary['Total Uncompressed File Size (B)'] = total_uncompressed_size
msg = json.dumps(metrics_summary)
logger.info(msg)
event.add_event(op=event.WALAEventOperation.LogCollection, message=msg, log_event=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have split thought about whether summary is useful. This msg bunch of information, someone has to parse this msg to understand different metrics across the fleet. Do we already not capture as part of monitor thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't capture log collector metrics in telemetry right now. I'm not sure how we would determine if these new limits are appropriate without sending events for these metrics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of LogCollectorMonitorHandler thread, when we poll for metrics, we send to telemetry as well. I think you used same data to evaluate limits. Now logging same data in different table is not useful I feel

                    metrics = self._poll_resource_usage()
                    self._send_telemetry(metrics)
                    self._verify_memory_limit(metrics)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often is this telemetry sent? the log collector process is short in duration. if the polling period is too long, we won't capture enough info; if it is too short, we may be flooding telemetry. A summary seems appropriate.

As for the parsing, should we send this as JSON instead?

except Exception as e:
msg = "An error occurred while reporting log collector resource usage summary: {0}".format(ustr(e))
logger.warn(msg)
event.add_event(op=event.WALAEventOperation.LogCollection, is_success=False, message=msg, log_event=False)

except Exception as e:
logger.error("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
logger.info("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
Expand Down
23 changes: 21 additions & 2 deletions azurelinuxagent/common/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.CgroupDisableOnQuotaCheckFailure": True,
"Debug.EnableAgentMemoryUsageCheck": False,
"Debug.EnableFastTrack": True,
"Debug.EnableGAVersioning": True
"Debug.EnableGAVersioning": True,
"Debug.EnableCgroupV2ResourceLimiting": False
}


Expand Down Expand Up @@ -200,7 +201,8 @@ def load_conf_from_file(conf_file_path, conf=__conf__):
"Debug.EtpCollectionPeriod": 300,
"Debug.AutoUpdateHotfixFrequency": 14400,
"Debug.AutoUpdateNormalFrequency": 86400,
"Debug.FirewallRulesLogPeriod": 86400
"Debug.FirewallRulesLogPeriod": 86400,
"Debug.LogCollectorInitialDelay": 5 * 60
}


Expand Down Expand Up @@ -680,3 +682,20 @@ def get_firewall_rules_log_period(conf=__conf__):
NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_int("Debug.FirewallRulesLogPeriod", 86400)


def get_enable_cgroup_v2_resource_limiting(conf=__conf__):
Copy link
Contributor Author

@maddieford maddieford Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cgroup v2 log collector will be disabled by default. This conf option will be used to opt-in

"""
If True, the agent will enable resource monitoring and enforcement for the log collector on machines using cgroup v2.
NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_switch("Debug.EnableCgroupV2ResourceLimiting", False)


def get_log_collector_initial_delay(conf=__conf__):
"""
Determine the initial delay at service start before the first periodic log collection.

NOTE: This option is experimental and may be removed in later versions of the Agent.
"""
return conf.get_int("Debug.LogCollectorInitialDelay", 5 * 60)
113 changes: 73 additions & 40 deletions azurelinuxagent/ga/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@

from azurelinuxagent.common import logger
from azurelinuxagent.common.event import WALAEventOperation, add_event
from azurelinuxagent.ga.controllermetrics import CpuMetrics, MemoryMetrics
from azurelinuxagent.ga.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.ga.cpucontroller import _CpuController, CpuControllerV1, CpuControllerV2
from azurelinuxagent.ga.memorycontroller import MemoryControllerV1, MemoryControllerV2
from azurelinuxagent.common.conf import get_agent_pid_file_path
from azurelinuxagent.common.exception import CGroupsException, ExtensionErrorCodes, ExtensionError, \
ExtensionOperationError
Expand Down Expand Up @@ -292,7 +293,7 @@ def _get_controller_mountpoints():
if match is not None:
path = match.group('path')
controller = match.group('controller')
if controller is not None and path is not None and controller in CgroupV1.get_supported_controllers():
if controller is not None and path is not None and controller in CgroupV1.get_supported_controller_names():
mount_points[controller] = path
return mount_points

Expand Down Expand Up @@ -335,7 +336,7 @@ def _get_process_relative_controller_paths(process_id):
if match is not None:
controller = match.group('controller')
path = match.group('path').lstrip('/') if match.group('path') != '/' else None
if path is not None and controller in CgroupV1.get_supported_controllers():
if path is not None and controller in CgroupV1.get_supported_controller_names():
conroller_relative_paths[controller] = path

return conroller_relative_paths
Expand Down Expand Up @@ -371,7 +372,7 @@ def get_process_cgroup(self, process_id, cgroup_name):
controller_paths=process_controller_paths)

def log_root_paths(self):
for controller in CgroupV1.get_supported_controllers():
for controller in CgroupV1.get_supported_controller_names():
mount_point = self._cgroup_mountpoints.get(controller)
if mount_point is None:
log_cgroup_info("The {0} controller is not mounted".format(controller), send_event=False)
Expand Down Expand Up @@ -402,14 +403,14 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh

log_cgroup_info("Started extension in unit '{0}'".format(scope_name), send_event=False)

cpu_metrics = None
cpu_controller = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this rename, the code cleaned up pretty nicely!

try:
cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name)
cgroup = self.get_cgroup_from_relative_path(cgroup_relative_path, extension_name)
for metrics in cgroup.get_controller_metrics():
if isinstance(metrics, CpuMetrics):
cpu_metrics = metrics
CGroupsTelemetry.track_cgroup(metrics)
for controller in cgroup.get_controllers():
if isinstance(controller, _CpuController):
cpu_controller = controller
CGroupsTelemetry.track_cgroup_controller(controller)

except IOError as e:
if e.errno == 2: # 'No such file or directory'
Expand All @@ -421,7 +422,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
# Wait for process completion or timeout
try:
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout,
stderr=stderr, error_code=error_code, cpu_metrics=cpu_metrics)
stderr=stderr, error_code=error_code, cpu_controller=cpu_controller)
except ExtensionError as e:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
Expand Down Expand Up @@ -498,7 +499,7 @@ def _get_controllers_enabled_at_root(root_cgroup_path):
enabled_controllers_file = os.path.join(root_cgroup_path, 'cgroup.subtree_control')
if os.path.exists(enabled_controllers_file):
controllers_enabled_at_root = fileutil.read_file(enabled_controllers_file).rstrip().split()
return list(set(controllers_enabled_at_root) & set(CgroupV2.get_supported_controllers()))
return list(set(controllers_enabled_at_root) & set(CgroupV2.get_supported_controller_names()))
return []

@staticmethod
Expand Down Expand Up @@ -546,7 +547,7 @@ def get_process_cgroup(self, process_id, cgroup_name):

def log_root_paths(self):
log_cgroup_info("The root cgroup path is {0}".format(self._root_cgroup_path), send_event=False)
for controller in CgroupV2.get_supported_controllers():
for controller in CgroupV2.get_supported_controller_names():
if controller in self._controllers_enabled_at_root:
log_cgroup_info("The {0} controller is enabled at the root cgroup".format(controller), send_event=False)
else:
Expand All @@ -564,9 +565,9 @@ def __init__(self, cgroup_name):
self._cgroup_name = cgroup_name

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to 'get_supported_controller_names' to reflect that this returns a list of strings, not controller objects

"""
Cgroup version specific. Returns a list of the controllers which the agent supports.
Cgroup version specific. Returns a list of the controllers which the agent supports as strings.
"""
raise NotImplementedError()

Expand All @@ -578,12 +579,12 @@ def check_in_expected_slice(self, expected_slice):
"""
raise NotImplementedError()

def get_controller_metrics(self, expected_relative_path=None):
def get_controllers(self, expected_relative_path=None):
"""
Cgroup version specific. Returns a list of the metrics for the agent supported controllers which are
mounted/enabled for the cgroup.
Cgroup version specific. Returns a list of the agent supported controllers which are mounted/enabled for the cgroup.

:param expected_relative_path: The expected relative path of the cgroup. If provided, only metrics for controllers at this expected path will be returned.
:param expected_relative_path: The expected relative path of the cgroup. If provided, only controllers mounted
at this expected path will be returned.
"""
raise NotImplementedError()

Expand All @@ -608,7 +609,7 @@ def __init__(self, cgroup_name, controller_mountpoints, controller_paths):
self._controller_paths = controller_paths

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
return [CgroupV1.CPU_CONTROLLER, CgroupV1.MEMORY_CONTROLLER]

def check_in_expected_slice(self, expected_slice):
Expand All @@ -620,39 +621,39 @@ def check_in_expected_slice(self, expected_slice):

return in_expected_slice

def get_controller_metrics(self, expected_relative_path=None):
metrics = []
def get_controllers(self, expected_relative_path=None):
controllers = []

for controller in self.get_supported_controllers():
controller_metrics = None
controller_path = self._controller_paths.get(controller)
controller_mountpoint = self._controller_mountpoints.get(controller)
for supported_controller_name in self.get_supported_controller_names():
controller = None
controller_path = self._controller_paths.get(supported_controller_name)
controller_mountpoint = self._controller_mountpoints.get(supported_controller_name)

if controller_mountpoint is None:
log_cgroup_warning("{0} controller is not mounted; will not track metrics".format(controller), send_event=False)
log_cgroup_warning("{0} controller is not mounted; will not track".format(supported_controller_name), send_event=False)
continue

if controller_path is None:
log_cgroup_warning("{0} is not mounted for the {1} cgroup; will not track metrics".format(controller, self._cgroup_name), send_event=False)
log_cgroup_warning("{0} is not mounted for the {1} cgroup; will not track".format(supported_controller_name, self._cgroup_name), send_event=False)
continue

if expected_relative_path is not None:
expected_path = os.path.join(controller_mountpoint, expected_relative_path)
if controller_path != expected_path:
log_cgroup_warning("The {0} controller is not mounted at the expected path for the {1} cgroup; will not track metrics. Actual cgroup path:[{2}] Expected:[{3}]".format(controller, self._cgroup_name, controller_path, expected_path), send_event=False)
log_cgroup_warning("The {0} controller is not mounted at the expected path for the {1} cgroup; will not track. Actual cgroup path:[{2}] Expected:[{3}]".format(supported_controller_name, self._cgroup_name, controller_path, expected_path), send_event=False)
continue

if controller == self.CPU_CONTROLLER:
controller_metrics = CpuMetrics(self._cgroup_name, controller_path)
elif controller == self.MEMORY_CONTROLLER:
controller_metrics = MemoryMetrics(self._cgroup_name, controller_path)
if supported_controller_name == self.CPU_CONTROLLER:
controller = CpuControllerV1(self._cgroup_name, controller_path)
elif supported_controller_name == self.MEMORY_CONTROLLER:
controller = MemoryControllerV1(self._cgroup_name, controller_path)

if controller_metrics is not None:
msg = "{0} metrics for cgroup: {1}".format(controller, controller_metrics)
if controller is not None:
msg = "{0} controller for cgroup: {1}".format(supported_controller_name, controller)
log_cgroup_info(msg, send_event=False)
metrics.append(controller_metrics)
controllers.append(controller)

return metrics
return controllers

def get_controller_procs_path(self, controller):
controller_path = self._controller_paths.get(controller)
Expand Down Expand Up @@ -687,7 +688,7 @@ def __init__(self, cgroup_name, root_cgroup_path, cgroup_path, enabled_controlle
self._enabled_controllers = enabled_controllers

@staticmethod
def get_supported_controllers():
def get_supported_controller_names():
return [CgroupV2.CPU_CONTROLLER, CgroupV2.MEMORY_CONTROLLER]

def check_in_expected_slice(self, expected_slice):
Expand All @@ -697,9 +698,41 @@ def check_in_expected_slice(self, expected_slice):

return True

def get_controller_metrics(self, expected_relative_path=None):
# TODO - Implement controller metrics for cgroup v2
raise NotImplementedError()
def get_controllers(self, expected_relative_path=None):
controllers = []

for supported_controller_name in self.get_supported_controller_names():
controller = None

if supported_controller_name not in self._enabled_controllers:
log_cgroup_warning("{0} controller is not enabled; will not track".format(supported_controller_name),
send_event=False)
continue

if self._cgroup_path == "":
log_cgroup_warning("Cgroup path for {0} cannot be determined; will not track".format(self._cgroup_name),
send_event=False)
continue

if expected_relative_path is not None:
expected_path = os.path.join(self._root_cgroup_path, expected_relative_path)
if self._cgroup_path != expected_path:
log_cgroup_warning(
"The {0} cgroup is not mounted at the expected path; will not track. Actual cgroup path:[{1}] Expected:[{2}]".format(
self._cgroup_name, self._cgroup_path, expected_path), send_event=False)
continue

if supported_controller_name == self.CPU_CONTROLLER:
controller = CpuControllerV2(self._cgroup_name, self._cgroup_path)
elif supported_controller_name == self.MEMORY_CONTROLLER:
controller = MemoryControllerV2(self._cgroup_name, self._cgroup_path)

if controller is not None:
msg = "{0} controller for cgroup: {1}".format(supported_controller_name, controller)
log_cgroup_info(msg, send_event=False)
controllers.append(controller)

return controllers

def get_procs_path(self):
if self._cgroup_path != "":
Expand Down
Loading
Loading