Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Fixes #36341 - Refactor ansible runner artifacts processing #84

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions lib/smart_proxy_ansible/artifacts_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
require 'json'

module Proxy
module Ansible
# Helper for Artifacts Processor
class ArtifactsProcessor
ARTIFACTS_DIR = 'artifacts'.freeze

attr_reader :last_file_num

def initialize
@current_file_index = 1
@last_file_num = 0
end

def process_artifacts(root)
@events_files = []
@uuid ||= find_uuid(root)
return unless @uuid

job_event_dir = File.join(root, ARTIFACTS_DIR, @uuid, 'job_events')

loop do
files = Dir["#{job_event_dir}/*.json"].map do |file|
num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial')
[file, num]
end
files_with_nums = files.select { |(_, num)| num && num >= @current_file_index }.sort_by(&:last)
break if files_with_nums.empty?

@events_files.concat(files_with_nums)
@current_file_index = files_with_nums.last.last + 1
end
@current_file_index - 1
end

def get_next_event
file_path = @events_files[@last_file_num][0]
@last_file_num += 1
json_event = parse_json_event(file_path)
ArtifactEvent.new(json_event)
end

private

def find_uuid(root)
f = Dir["#{root}/#{ARTIFACTS_DIR}/*"].first
File.basename(f) if f
end

def parse_json_event(file_path)
JSON.parse(File.read(file_path))
rescue JSON::ParserError => e
raise ArgumentError, "ERROR: Could not parse value as JSON. Please check the value is a valid JSON #{file_path}."
end
end

class ArtifactEvent
attr_reader :json_event, :host, :type, :output, :exit_status

def initialize(json_event)
@json_event = json_event
@host = extract_host_name(@json_event)
@type = @json_event['event']
@output = @json_event['stdout']
end

def set_exit_status
case @type
when 'runner_on_ok'
@exit_status = 0
when 'runner_on_unreachable'
@exit_status = 1
when 'runner_on_failed'
@exit_status = 2 if @json_event.dig('event_data', 'ignore_errors').nil?
end
end

def parse_failures
@failures = {
failures: extract_event_data('failures'),
unreachable: extract_event_data('dark'),
rescued: extract_event_data('rescued')
}
end

def has_failures_for_host(host)
@failures[:failures][host].to_i <= 0 &&
@failures[:unreachable][host].to_i <= 0 &&
@failures[:rescued][host].to_i > 0
end

private

def extract_host_name(event)
hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr')
hostname.to_s unless hostname.nil? || hostname.empty?
end

def extract_event_data(data_type)
@json_event.dig('event_data', data_type) || {}
end
end
end
end
1 change: 1 addition & 0 deletions lib/smart_proxy_ansible/configuration_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ def load_classes
require 'smart_proxy_dynflow'
require 'smart_proxy_dynflow/continuous_output'
require 'smart_proxy_ansible/task_launcher/ansible_runner'
require 'smart_proxy_ansible/artifacts_processor'
require 'smart_proxy_ansible/runner/ansible_runner'

Proxy::Dynflow::TaskLauncherRegistry.register('ansible-runner',
Expand Down
111 changes: 45 additions & 66 deletions lib/smart_proxy_ansible/runner/ansible_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def initialize(input, suspended_action:, id: nil)
@passphrase = action_input['secrets']['key_passphrase']
@execution_timeout_interval = action_input[:execution_timeout_interval]
@cleanup_working_dirs = action_input.fetch(:cleanup_working_dirs, true)
@artifacts_processor = ArtifactsProcessor.new
end

def start
Expand Down Expand Up @@ -79,90 +80,68 @@ def initialize_command(*command)
private

def process_artifacts
@counter ||= 1
@uuid ||= if (f = Dir["#{@root}/artifacts/*"].first)
File.basename(f)
end
return unless @uuid
job_event_dir = File.join(@root, 'artifacts', @uuid, 'job_events')
loop do
files = Dir["#{job_event_dir}/*.json"].map do |file|
num = File.basename(file)[/\A\d+/].to_i unless file.include?('partial')
[file, num]
end
files_with_nums = files.select { |(_, num)| num && num >= @counter }.sort_by(&:last)
break if files_with_nums.empty?
logger.debug("[foreman_ansible] - processing event files: #{files_with_nums.map(&:first).inspect}}")
files_with_nums.map(&:first).each { |event_file| handle_event_file(event_file) }
@counter = files_with_nums.last.last + 1
end
end
total_files = @artifacts_processor.process_artifacts(@root)

def handle_event_file(event_file)
logger.debug("[foreman_ansible] - parsing event file #{event_file}")
begin
event = JSON.parse(File.read(event_file))
if (hostname = hostname_for_event(event))
handle_host_event(hostname, event)
while @artifacts_processor.last_file_num < total_files
event = @artifacts_processor.get_next_event

if @targets.key?(event.host)
handle_host_event(event)
else
handle_broadcast_data(event)
end
true
rescue JSON::ParserError => e
logger.error("[foreman_ansible] - Error parsing runner event at #{event_file}: #{e.class}: #{e.message}")
logger.debug(e.backtrace.join("\n"))
end
end

def hostname_for_event(event)
hostname = event.dig('event_data', 'host') || event.dig('event_data', 'remote_addr')
return nil if hostname.nil? || hostname.empty?
def handle_host_event(event)
log_event("for host: #{event.host.inspect}", event.json_event)
publish_data_for(event.host, "#{event.output}\n", 'stdout') if event.output
handle_exit_status(event)
end

unless @targets.key?(hostname)
logger.warn("handle_host_event: unknown host #{hostname} for event '#{event['event']}', broadcasting")
return nil
def handle_exit_status(event)
event.set_exit_status
return if event.exit_status.nil?

if event.exit_status == 0
publish_exit_status_for(event.host, 0) if @exit_statuses[event.host].nil?
else
publish_exit_status_for(event.host, event.exit_status)
end
hostname
end

def handle_host_event(hostname, event)
log_event("for host: #{hostname.inspect}", event)
publish_data_for(hostname, event['stdout'] + "\n", 'stdout') if event['stdout']
case event['event']
when 'runner_on_ok'
publish_exit_status_for(hostname, 0) if @exit_statuses[hostname].nil?
when 'runner_on_unreachable'
publish_exit_status_for(hostname, 1)
when 'runner_on_failed'
publish_exit_status_for(hostname, 2) if event.dig('event_data', 'ignore_errors').nil?
def handle_broadcast_data(event)
logger.warn("handle_host_event: unknown host #{event.host} for event '#{event.type}', broadcasting")
log_event("broadcast", event.json_event)

if event.type == 'playbook_on_stats'
process_playbook_stats_event(event)
else
broadcast_data(event.output + "\n", 'stdout')
end

fail_all_other_tasks if event.type == 'error'
end

def handle_broadcast_data(event)
log_event("broadcast", event)
if event['event'] == 'playbook_on_stats'
failures = event.dig('event_data', 'failures') || {}
unreachable = event.dig('event_data', 'dark') || {}
rescued = event.dig('event_data', 'rescued') || {}
header, *rows = event['stdout'].strip.lines.map(&:chomp)
@outputs.keys.select { |key| key.is_a? String }.each do |host|
line = rows.find { |row| row =~ /#{host}/ }
publish_data_for(host, [header, line].join("\n"), 'stdout')

# If the task has been rescued, it won't consider a failure
if @exit_statuses[host].to_i != 0 && failures[host].to_i <= 0 && unreachable[host].to_i <= 0 && rescued[host].to_i > 0
publish_exit_status_for(host, 0)
end
def process_playbook_stats_event(event)
event.parse_failures
header, *rows = event.output.strip.lines.map(&:chomp)

@outputs.keys.select { |key| key.is_a? String }.each do |host|
line = rows.find { |row| row =~ /#{host}/ }
publish_data_for(host, [header, line].join("\n"), 'stdout')

# If the task has been rescued, it won't consider a failure
if @exit_statuses[host].to_i != 0 && event.has_failures_for_host(host)
publish_exit_status_for(host, 0)
end
else
broadcast_data(event['stdout'] + "\n", 'stdout')
end
end

def fail_all_other_tasks
# If the run ends early due to an error - fail all other tasks
if event['event'] == 'error'
@outputs.keys.select { |key| key.is_a? String }.each do |host|
@exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0
end
@outputs.keys.select { |key| key.is_a? String }.each do |host|
@exit_statuses[host] = 4 if @exit_statuses[host].to_i == 0
end
end

Expand Down