Skip to content

Commit

Permalink
Fixes #36341 - Refactor AnsibleRunner and Introduce ArtifactsProcessor
Browse files Browse the repository at this point in the history
This commit enhances the AnsibleRunner to improve functionality,
and introduces the ArtifactsProcessor for better artifact handling.
  • Loading branch information
nofaralfasi committed Sep 27, 2023
1 parent 6cd18e6 commit d9102e9
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 66 deletions.
105 changes: 105 additions & 0 deletions lib/smart_proxy_ansible/artifacts_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
require 'json'

module Proxy
module Ansible
# Helper for Artifacts Processor
class ArtifactsProcessor
ARTIFACTS_DIR = 'artifacts'.freeze

attr_reader :last_file_num

def initialize
@current_file_index = 1
@last_file_num = 0
end

def process_artifacts(root)
@events_files = []
@uuid ||= find_uuid(root)
return unless @uuid

job_event_dir = File.join(root, ARTIFACTS_DIR, @uuid, 'job_events')

loop do
files = Dir["#{job_event_dir}/*.json"].map do |file|
num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial')
[file, num]
end
files_with_nums = files.select { |(_, num)| num && num >= @current_file_index }.sort_by(&:last)
break if files_with_nums.empty?

@events_files.concat(files_with_nums)
@current_file_index = files_with_nums.last.last + 1
end
@current_file_index - 1
end

def get_next_event
file_path = @events_files[@last_file_num][0]
@last_file_num += 1
json_event = parse_json_event(file_path)
ArtifactEvent.new(json_event)
end

private

def find_uuid(root)
f = Dir["#{root}/#{ARTIFACTS_DIR}/*"].first
File.basename(f) if f
end

def parse_json_event(file_path)
JSON.parse(File.read(file_path))
rescue JSON::ParserError => e
raise ArgumentError, "ERROR: Could not parse value as JSON. Please check the value is a valid JSON #{file_path}."
end
end

class ArtifactEvent
attr_reader :json_event, :host, :type, :output, :exit_status

def initialize(json_event)
@json_event = json_event
@host = extract_host_name(@json_event)
@type = @json_event['event']
@output = @json_event['stdout']
end

def set_exit_status
case @type
when 'runner_on_ok'
@exit_status = 0
when 'runner_on_unreachable'
@exit_status = 1
when 'runner_on_failed'
@exit_status = 2 if @json_event.dig('event_data', 'ignore_errors').nil?
end
end

def parse_failures
@failures = {
failures: extract_event_data('failures'),
unreachable: extract_event_data('dark'),
rescued: extract_event_data('rescued')
}
end

def has_failures_for_host(host)
@failures[:failures][host].to_i <= 0 &&
@failures[:unreachable][host].to_i <= 0 &&
@failures[:rescued][host].to_i > 0
end

private

def extract_host_name(event)
hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr')
hostname.to_s unless hostname.nil? || hostname.empty?
end

def extract_event_data(data_type)
@json_event.dig('event_data', data_type) || {}
end
end
end
end
1 change: 1 addition & 0 deletions lib/smart_proxy_ansible/configuration_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ def load_classes
require 'smart_proxy_dynflow'
require 'smart_proxy_dynflow/continuous_output'
require 'smart_proxy_ansible/task_launcher/ansible_runner'
require 'smart_proxy_ansible/artifacts_processor'
require 'smart_proxy_ansible/runner/ansible_runner'

Proxy::Dynflow::TaskLauncherRegistry.register('ansible-runner',
Expand Down
111 changes: 45 additions & 66 deletions lib/smart_proxy_ansible/runner/ansible_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def initialize(input, suspended_action:, id: nil)
@passphrase = action_input['secrets']['key_passphrase']
@execution_timeout_interval = action_input[:execution_timeout_interval]
@cleanup_working_dirs = action_input.fetch(:cleanup_working_dirs, true)
@artifacts_processor = ArtifactsProcessor.new
end

def start
Expand Down Expand Up @@ -79,90 +80,68 @@ def initialize_command(*command)
private

def process_artifacts
@counter ||= 1
@uuid ||= if (f = Dir["#{@root}/artifacts/*"].first)
File.basename(f)
end
return unless @uuid
job_event_dir = File.join(@root, 'artifacts', @uuid, 'job_events')
loop do
files = Dir["#{job_event_dir}/*.json"].map do |file|
num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial')
[file, num]
end
files_with_nums = files.select { |(_, num)| num && num >= @counter }.sort_by(&:last)
break if files_with_nums.empty?
logger.debug("[foreman_ansible] - processing event files: #{files_with_nums.map(&:first).inspect}}")
files_with_nums.map(&:first).each { |event_file| handle_event_file(event_file) }
@counter = files_with_nums.last.last + 1
end
end
total_files = @artifacts_processor.process_artifacts(@root)

def handle_event_file(event_file)
logger.debug("[foreman_ansible] - parsing event file #{event_file}")
begin
event = JSON.parse(File.read(event_file))
if (hostname = hostname_for_event(event))
handle_host_event(hostname, event)
while @artifacts_processor.last_file_num < total_files
event = @artifacts_processor.get_next_event

if @targets.key?(event.host)
handle_host_event(event)
else
handle_broadcast_data(event)
end
true
rescue JSON::ParserError => e
logger.error("[foreman_ansible] - Error parsing runner event at #{event_file}: #{e.class}: #{e.message}")
logger.debug(e.backtrace.join("\n"))
end
end

def hostname_for_event(event)
hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr')
return nil if hostname.nil? || hostname.empty?
def handle_host_event(event)
log_event("for host: #{event.host.inspect}", event.json_event)
publish_data_for(event.host, "#{event.output}\n", 'stdout') if event.output
handle_exit_status(event)
end

unless @targets.key?(hostname)
logger.warn("handle_host_event: unknown host #{hostname} for event '#{event['event']}', broadcasting")
return nil
def handle_exit_status(event)
event.set_exit_status
return if event.exit_status.nil?

if event.exit_status == 0
publish_exit_status_for(event.host, 0) if @exit_statuses[event.host].nil?
else
publish_exit_status_for(event.host, event.exit_status)
end
hostname
end

def handle_host_event(hostname, event)
log_event("for host: #{hostname.inspect}", event)
publish_data_for(hostname, event['stdout'] + "\n", 'stdout') if event['stdout']
case event['event']
when 'runner_on_ok'
publish_exit_status_for(hostname, 0) if @exit_statuses[hostname].nil?
when 'runner_on_unreachable'
publish_exit_status_for(hostname, 1)
when 'runner_on_failed'
publish_exit_status_for(hostname, 2) if event.dig('event_data', 'ignore_errors').nil?
def handle_broadcast_data(event)
logger.warn("handle_host_event: unknown host #{event.host} for event '#{event.type}', broadcasting")
log_event("broadcast", event.json_event)

if event.type == 'playbook_on_stats'
process_playbook_stats_event(event)
else
broadcast_data(event.output + "\n", 'stdout')
end

fail_all_other_tasks if event.type == 'error'
end

def handle_broadcast_data(event)
log_event("broadcast", event)
if event['event'] == 'playbook_on_stats'
failures = event.dig('event_data', 'failures') || {}
unreachable = event.dig('event_data', 'dark') || {}
rescued = event.dig('event_data', 'rescued') || {}
header, *rows = event['stdout'].strip.lines.map(&:chomp)
@outputs.keys.select { |key| key.is_a? String }.each do |host|
line = rows.find { |row| row =~ /#{host}/ }
publish_data_for(host, [header, line].join("\n"), 'stdout')

# If the task has been rescued, it won't consider a failure
if @exit_statuses[host].to_i != 0 && failures[host].to_i <= 0 && unreachable[host].to_i <= 0 && rescued[host].to_i > 0
publish_exit_status_for(host, 0)
end
def process_playbook_stats_event(event)
event.parse_failures
header, *rows = event.output.strip.lines.map(&:chomp)

@outputs.keys.select { |key| key.is_a? String }.each do |host|
line = rows.find { |row| row =~ /#{host}/ }
publish_data_for(host, [header, line].join("\n"), 'stdout')

# If the task has been rescued, it won't consider a failure
if @exit_statuses[host].to_i != 0 && event.has_failures_for_host(host)
publish_exit_status_for(host, 0)
end
else
broadcast_data(event['stdout'] + "\n", 'stdout')
end
end

def fail_all_other_tasks
# If the run ends early due to an error - fail all other tasks
if event['event'] == 'error'
@outputs.keys.select { |key| key.is_a? String }.each do |host|
@exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0
end
@outputs.keys.select { |key| key.is_a? String }.each do |host|
@exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0
end
end

Expand Down

0 comments on commit d9102e9

Please sign in to comment.