diff --git a/lib/floe.rb b/lib/floe.rb index c4d11f34..ea03e5ab 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -34,6 +34,7 @@ require_relative "floe/workflow/states/non_terminal_mixin" require_relative "floe/workflow/states/parallel" require_relative "floe/workflow/states/pass" +require_relative "floe/workflow/states/retry_catch_mixin" require_relative "floe/workflow/states/succeed" require_relative "floe/workflow/states/task" require_relative "floe/workflow/states/wait" diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 41e6a021..182c817b 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -2,6 +2,7 @@ require_relative "input_output_mixin" require_relative "non_terminal_mixin" +require_relative "retry_catch_mixin" module Floe class Workflow @@ -9,6 +10,7 @@ module States class Map < Floe::Workflow::State include InputOutputMixin include NonTerminalMixin + include RetryCatchMixin attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path, :result_selector, :retry, :catch, :item_processor, :items_path, @@ -36,8 +38,8 @@ def initialize(workflow, name, payload) @item_batcher = payload["ItemBatcher"] @result_writer = payload["ResultWriter"] @max_concurrency = payload["MaxConcurrency"]&.to_i - @tolerated_failure_percentage = payload["ToleratedFailurePercentage"] - @tolerated_failure_count = payload["ToleratedFailureCount"] + @tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i + @tolerated_failure_count = payload["ToleratedFailureCount"]&.to_i validate_state!(workflow) end @@ -58,8 +60,14 @@ def start(context) end def finish(context) - result = context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx).output } - context.output = process_output(context, result) + if success?(context) + result = each_item_processor(context).map(&:output) + context.output = process_output(context, result) + else + error = parse_error(context) + retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error) + end + super end @@ -77,15 +85,15 @@ def end? end def ready?(context) - !context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) } + !context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) } end def wait_until(context) - context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min + each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min end def waiting?(context) - context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) } + each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) } end def running?(context) @@ -93,11 +101,30 @@ def running?(context) end def ended?(context) - context.state["ItemProcessorContext"].all? { |ctx| Context.new(ctx).ended? } + each_item_processor(context).all?(&:ended?) + end + + def success?(context) + contexts = each_item_processor(context) + num_failed = contexts.count(&:failed?) + total = contexts.count + + return true if num_failed.zero? || total.zero? + return true if tolerated_failure_percentage && tolerated_failure_percentage == 100 + # Some have failed, check the tolerated_failure thresholds to see if + # we should fail the whole state. + return true if tolerated_failure_count && num_failed < tolerated_failure_count + return true if tolerated_failure_percentage && (100 * num_failed / total.to_f) < tolerated_failure_percentage + + false end private + def each_item_processor(context) + context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) } + end + def step_nonblock!(context) item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]]) item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context) @@ -109,6 +136,10 @@ def step_nonblock!(context) end end + def parse_error(context) + each_item_processor(context).detect(&:failed?)&.output&.dig("Error") + end + def validate_state!(workflow) validate_state_next!(workflow) end diff --git a/lib/floe/workflow/states/retry_catch_mixin.rb b/lib/floe/workflow/states/retry_catch_mixin.rb new file mode 100644 index 00000000..f19ac075 --- /dev/null +++ b/lib/floe/workflow/states/retry_catch_mixin.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module Floe + class Workflow + module States + module RetryCatchMixin + def find_retrier(error) + self.retry.detect { |r| r.match_error?(error) } + end + + def find_catcher(error) + self.catch.detect { |c| c.match_error?(error) } + end + + def retry_state!(context, error) + retrier = find_retrier(error["Error"]) if error + return if retrier.nil? + + # If a different retrier is hit reset the context + if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals + context["State"]["RetryCount"] = 0 + context["State"]["Retrier"] = retrier.error_equals + end + + context["State"]["RetryCount"] += 1 + + return if context["State"]["RetryCount"] > retrier.max_attempts + + wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"])) + context.next_state = context.state_name + context.output = error + logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}") + true + end + + def catch_error!(context, error) + catcher = find_catcher(error["Error"]) if error + return if catcher.nil? + + context.next_state = catcher.next + context.output = catcher.result_path.set(context.input, error) + logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]") + + true + end + + def fail_workflow!(context, error) + # next_state is nil, and will be set to nil again in super + # keeping in here for completeness + context.next_state = nil + context.output = error + logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]") + end + end + end + end +end diff --git a/lib/floe/workflow/states/task.rb b/lib/floe/workflow/states/task.rb index 719f513e..500669a6 100644 --- a/lib/floe/workflow/states/task.rb +++ b/lib/floe/workflow/states/task.rb @@ -1,11 +1,16 @@ # frozen_string_literal: true +require_relative "input_output_mixin" +require_relative "non_terminal_mixin" +require_relative "retry_catch_mixin" + module Floe class Workflow module States class Task < Floe::Workflow::State include InputOutputMixin include NonTerminalMixin + include RetryCatchMixin attr_reader :credentials, :end, :heartbeat_seconds, :next, :parameters, :result_selector, :resource, :timeout_seconds, :retry, :catch, @@ -82,54 +87,6 @@ def success?(context) runner.success?(context.state["RunnerContext"]) end - def find_retrier(error) - self.retry.detect { |r| r.match_error?(error) } - end - - def find_catcher(error) - self.catch.detect { |c| c.match_error?(error) } - end - - def retry_state!(context, error) - retrier = find_retrier(error["Error"]) if error - return if retrier.nil? - - # If a different retrier is hit reset the context - if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals - context["State"]["RetryCount"] = 0 - context["State"]["Retrier"] = retrier.error_equals - end - - context["State"]["RetryCount"] += 1 - - return if context["State"]["RetryCount"] > retrier.max_attempts - - wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"])) - context.next_state = context.state_name - context.output = error - logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}") - true - end - - def catch_error!(context, error) - catcher = find_catcher(error["Error"]) if error - return if catcher.nil? - - context.next_state = catcher.next - context.output = catcher.result_path.set(context.input, error) - logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]") - - true - end - - def fail_workflow!(context, error) - # next_state is nil, and will be set to nil again in super - # keeping in here for completeness - context.next_state = nil - context.output = error - logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]") - end - def parse_error(output) return if output.nil? return output if output.kind_of?(Hash) diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb index 8b6b1812..a6c2ad73 100644 --- a/spec/workflow/states/map_spec.rb +++ b/spec/workflow/states/map_spec.rb @@ -17,6 +17,9 @@ } } end + + let(:tolerated_failure_count) { nil } + let(:tolerated_failure_percentage) { nil } let(:workflow) do payload = { "Validate-All" => { @@ -38,6 +41,10 @@ "End" => true, } } + + payload["Validate-All"]["ToleratedFailureCount"] = tolerated_failure_count if tolerated_failure_count + payload["Validate-All"]["ToleratedFailurePercentage"] = tolerated_failure_percentage if tolerated_failure_percentage + make_workflow(ctx, payload) end @@ -145,4 +152,117 @@ end end end + + describe "#running?" do + before { state.start(ctx) } + + context "with all iterations ended" do + before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } } + + it "returns false" do + expect(state.running?(ctx)).to be_falsey + end + end + + context "with some iterations not ended" do + before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc } + + it "returns true" do + expect(state.running?(ctx)).to be_truthy + end + end + end + + describe "#ended?" do + before { state.start(ctx) } + + context "with all iterations ended" do + before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } } + + it "returns true" do + expect(state.ended?(ctx)).to be_truthy + end + end + + context "with some iterations not ended" do + before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc } + + it "returns false" do + expect(state.ended?(ctx)).to be_falsey + end + end + end + + describe "#success?" do + before { state.start(ctx) } + + context "with no failed iterations" do + it "returns true" do + expect(state.success?(ctx)).to be_truthy + end + end + + context "with no iterations" do + let(:input) { {"detail" => {"shipped" => []}} } + + it "returns true" do + expect(state.success?(ctx)).to be_truthy + end + end + + context "with all iterations failed" do + before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["State"] = {"Output" => {"Error" => "FAILED!"}}} } + + it "returns false" do + expect(state.success?(ctx)).to be_falsey + end + end + + context "with mixed successful and failed iterations" do + before do + ctx.state["ItemProcessorContext"][0]["State"] = {"Output" => {"Error" => "FAILED!"}} + ctx.state["ItemProcessorContext"][2]["State"] = {"Output" => {"Error" => "FAILED!"}} + end + + it "returns true" do + expect(state.success?(ctx)).to be_falsey + end + + context "with ToleratedFailureCount" do + context "greater than the number of failures" do + let(:tolerated_failure_count) { 3 } + + it "returns false" do + expect(state.success?(ctx)).to be_truthy + end + end + + context "less than the number of failures" do + let(:tolerated_failure_count) { 1 } + + it "returns true" do + expect(state.success?(ctx)).to be_falsey + end + end + end + + context "with ToleratedFailurePercentage" do + context "greater than the number of failures" do + let(:tolerated_failure_percentage) { 50 } + + it "returns false" do + expect(state.success?(ctx)).to be_truthy + end + end + + context "less than the number of failures" do + let(:tolerated_failure_percentage) { 10 } + + it "returns true" do + expect(state.success?(ctx)).to be_falsey + end + end + end + end + end end