From d9102e97d7819b20bde100d4700fc8547e78b1f9 Mon Sep 17 00:00:00 2001 From: nofaralfasi Date: Wed, 23 Aug 2023 12:14:26 +0300 Subject: [PATCH] Fixes #36341 - Refactor AnsibleRunner and Introduce ArtifactsProcessor This commit enhances the AnsibleRunner to improve functionality, and introduces the ArtifactsProcessor for better artifact handling. --- .../artifacts_processor.rb | 105 +++++++++++++++++ .../configuration_loader.rb | 1 + .../runner/ansible_runner.rb | 111 +++++++----------- 3 files changed, 151 insertions(+), 66 deletions(-) create mode 100644 lib/smart_proxy_ansible/artifacts_processor.rb diff --git a/lib/smart_proxy_ansible/artifacts_processor.rb b/lib/smart_proxy_ansible/artifacts_processor.rb new file mode 100644 index 0000000..9c9f48c --- /dev/null +++ b/lib/smart_proxy_ansible/artifacts_processor.rb @@ -0,0 +1,105 @@ +require 'json' + +module Proxy + module Ansible + # Helper for Artifacts Processor + class ArtifactsProcessor + ARTIFACTS_DIR = 'artifacts'.freeze + + attr_reader :last_file_num + + def initialize + @current_file_index = 1 + @last_file_num = 0 + end + + def process_artifacts(root) + @events_files = [] + @uuid ||= find_uuid(root) + return unless @uuid + + job_event_dir = File.join(root, ARTIFACTS_DIR, @uuid, 'job_events') + + loop do + files = Dir["#{job_event_dir}/*.json"].map do |file| + num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial') + [file, num] + end + files_with_nums = files.select { |(_, num)| num && num >= @current_file_index }.sort_by(&:last) + break if files_with_nums.empty? + + @events_files.concat(files_with_nums) + @current_file_index = files_with_nums.last.last + 1 + end + @current_file_index - 1 + end + + def get_next_event + file_path = @events_files[@last_file_num][0] + @last_file_num += 1 + json_event = parse_json_event(file_path) + ArtifactEvent.new(json_event) + end + + private + + def find_uuid(root) + f = Dir["#{root}/#{ARTIFACTS_DIR}/*"].first + File.basename(f) if f + end + + def parse_json_event(file_path) + JSON.parse(File.read(file_path)) + rescue JSON::ParserError => e + raise ArgumentError, "ERROR: Could not parse value as JSON. Please check the value is a valid JSON #{file_path}." + end + end + + class ArtifactEvent + attr_reader :json_event, :host, :type, :output, :exit_status + + def initialize(json_event) + @json_event = json_event + @host = extract_host_name(@json_event) + @type = @json_event['event'] + @output = @json_event['stdout'] + end + + def set_exit_status + case @type + when 'runner_on_ok' + @exit_status = 0 + when 'runner_on_unreachable' + @exit_status = 1 + when 'runner_on_failed' + @exit_status = 2 if @json_event.dig('event_data', 'ignore_errors').nil? + end + end + + def parse_failures + @failures = { + failures: extract_event_data('failures'), + unreachable: extract_event_data('dark'), + rescued: extract_event_data('rescued') + } + end + + def has_failures_for_host(host) + @failures[:failures][host].to_i <= 0 && + @failures[:unreachable][host].to_i <= 0 && + @failures[:rescued][host].to_i > 0 + end + + private + + def extract_host_name(event) + hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr') + hostname.to_s unless hostname.nil? || hostname.empty? + end + + def extract_event_data(data_type) + @json_event.dig('event_data', data_type) || {} + end + end + end +end diff --git a/lib/smart_proxy_ansible/configuration_loader.rb b/lib/smart_proxy_ansible/configuration_loader.rb index 354b955..b5acc44 100644 --- a/lib/smart_proxy_ansible/configuration_loader.rb +++ b/lib/smart_proxy_ansible/configuration_loader.rb @@ -4,6 +4,7 @@ def load_classes require 'smart_proxy_dynflow' require 'smart_proxy_dynflow/continuous_output' require 'smart_proxy_ansible/task_launcher/ansible_runner' + require 'smart_proxy_ansible/artifacts_processor' require 'smart_proxy_ansible/runner/ansible_runner' Proxy::Dynflow::TaskLauncherRegistry.register('ansible-runner', diff --git a/lib/smart_proxy_ansible/runner/ansible_runner.rb b/lib/smart_proxy_ansible/runner/ansible_runner.rb index bf11172..8e40f99 100644 --- a/lib/smart_proxy_ansible/runner/ansible_runner.rb +++ b/lib/smart_proxy_ansible/runner/ansible_runner.rb @@ -27,6 +27,7 @@ def initialize(input, suspended_action:, id: nil) @passphrase = action_input['secrets']['key_passphrase'] @execution_timeout_interval = action_input[:execution_timeout_interval] @cleanup_working_dirs = action_input.fetch(:cleanup_working_dirs, true) + @artifacts_processor = ArtifactsProcessor.new end def start @@ -79,90 +80,68 @@ def initialize_command(*command) private def process_artifacts - @counter ||= 1 - @uuid ||= if (f = Dir["#{@root}/artifacts/*"].first) - File.basename(f) - end - return unless @uuid - job_event_dir = File.join(@root, 'artifacts', @uuid, 'job_events') - loop do - files = Dir["#{job_event_dir}/*.json"].map do |file| - num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial') - [file, num] - end - files_with_nums = files.select { |(_, num)| num && num >= @counter }.sort_by(&:last) - break if files_with_nums.empty? - logger.debug("[foreman_ansible] - processing event files: #{files_with_nums.map(&:first).inspect}}") - files_with_nums.map(&:first).each { |event_file| handle_event_file(event_file) } - @counter = files_with_nums.last.last + 1 - end - end + total_files = @artifacts_processor.process_artifacts(@root) - def handle_event_file(event_file) - logger.debug("[foreman_ansible] - parsing event file #{event_file}") - begin - event = JSON.parse(File.read(event_file)) - if (hostname = hostname_for_event(event)) - handle_host_event(hostname, event) + while @artifacts_processor.last_file_num < total_files + event = @artifacts_processor.get_next_event + + if @targets.key?(event.host) + handle_host_event(event) else handle_broadcast_data(event) end - true - rescue JSON::ParserError => e - logger.error("[foreman_ansible] - Error parsing runner event at #{event_file}: #{e.class}: #{e.message}") - logger.debug(e.backtrace.join("\n")) end end - def hostname_for_event(event) - hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr') - return nil if hostname.nil? || hostname.empty? + def handle_host_event(event) + log_event("for host: #{event.host.inspect}", event.json_event) + publish_data_for(event.host, "#{event.output}\n", 'stdout') if event.output + handle_exit_status(event) + end - unless @targets.key?(hostname) - logger.warn("handle_host_event: unknown host #{hostname} for event '#{event['event']}', broadcasting") - return nil + def handle_exit_status(event) + event.set_exit_status + return if event.exit_status.nil? + + if event.exit_status == 0 + publish_exit_status_for(event.host, 0) if @exit_statuses[event.host].nil? + else + publish_exit_status_for(event.host, event.exit_status) end - hostname end - def handle_host_event(hostname, event) - log_event("for host: #{hostname.inspect}", event) - publish_data_for(hostname, event['stdout'] + "\n", 'stdout') if event['stdout'] - case event['event'] - when 'runner_on_ok' - publish_exit_status_for(hostname, 0) if @exit_statuses[hostname].nil? - when 'runner_on_unreachable' - publish_exit_status_for(hostname, 1) - when 'runner_on_failed' - publish_exit_status_for(hostname, 2) if event.dig('event_data', 'ignore_errors').nil? + def handle_broadcast_data(event) + logger.warn("handle_host_event: unknown host #{event.host} for event '#{event.type}', broadcasting") + log_event("broadcast", event.json_event) + + if event.type == 'playbook_on_stats' + process_playbook_stats_event(event) + else + broadcast_data(event.output + "\n", 'stdout') end + + fail_all_other_tasks if event.type == 'error' end - def handle_broadcast_data(event) - log_event("broadcast", event) - if event['event'] == 'playbook_on_stats' - failures = event.dig('event_data', 'failures') || {} - unreachable = event.dig('event_data', 'dark') || {} - rescued = event.dig('event_data', 'rescued') || {} - header, *rows = event['stdout'].strip.lines.map(&:chomp) - @outputs.keys.select { |key| key.is_a? String }.each do |host| - line = rows.find { |row| row =~ /#{host}/ } - publish_data_for(host, [header, line].join("\n"), 'stdout') - - # If the task has been rescued, it won't consider a failure - if @exit_statuses[host].to_i != 0 && failures[host].to_i <= 0 && unreachable[host].to_i <= 0 && rescued[host].to_i > 0 - publish_exit_status_for(host, 0) - end + def process_playbook_stats_event(event) + event.parse_failures + header, *rows = event.output.strip.lines.map(&:chomp) + + @outputs.keys.select { |key| key.is_a? String }.each do |host| + line = rows.find { |row| row =~ /#{host}/ } + publish_data_for(host, [header, line].join("\n"), 'stdout') + + # If the task has been rescued, it won't consider a failure + if @exit_statuses[host].to_i != 0 && event.has_failures_for_host(host) + publish_exit_status_for(host, 0) end - else - broadcast_data(event['stdout'] + "\n", 'stdout') end + end + def fail_all_other_tasks # If the run ends early due to an error - fail all other tasks - if event['event'] == 'error' - @outputs.keys.select { |key| key.is_a? String }.each do |host| - @exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0 - end + @outputs.keys.select { |key| key.is_a? String }.each do |host| + @exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0 end end