Skip to content

Commit

Permalink
Merge pull request #282 from agrare/map_state_add_tolerated_failure
Browse files Browse the repository at this point in the history
Map state add tolerated failure
  • Loading branch information
kbrock authored Oct 8, 2024
2 parents 56737fb + f516eaf commit 8299cc4
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 56 deletions.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/parallel"
require_relative "floe/workflow/states/pass"
require_relative "floe/workflow/states/retry_catch_mixin"
require_relative "floe/workflow/states/succeed"
require_relative "floe/workflow/states/task"
require_relative "floe/workflow/states/wait"
Expand Down
47 changes: 39 additions & 8 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"
require_relative "retry_catch_mixin"

module Floe
class Workflow
module States
class Map < Floe::Workflow::State
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :item_processor, :items_path,
Expand Down Expand Up @@ -36,8 +38,8 @@ def initialize(workflow, name, payload)
@item_batcher = payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]
@tolerated_failure_count = payload["ToleratedFailureCount"]
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
@tolerated_failure_count = payload["ToleratedFailureCount"]&.to_i

validate_state!(workflow)
end
Expand All @@ -58,8 +60,14 @@ def start(context)
end

def finish(context)
result = context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx).output }
context.output = process_output(context, result)
if success?(context)
result = each_item_processor(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

Expand All @@ -77,27 +85,46 @@ def end?
end

def ready?(context)
!context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) }
!context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) }
end

def wait_until(context)
context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min
each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min
end

def waiting?(context)
context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) }
each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
context.state["ItemProcessorContext"].all? { |ctx| Context.new(ctx).ended? }
each_item_processor(context).all?(&:ended?)
end

def success?(context)
contexts = each_item_processor(context)
num_failed = contexts.count(&:failed?)
total = contexts.count

return true if num_failed.zero? || total.zero?
return true if tolerated_failure_percentage && tolerated_failure_percentage == 100
# Some have failed, check the tolerated_failure thresholds to see if
# we should fail the whole state.
return true if tolerated_failure_count && num_failed < tolerated_failure_count
return true if tolerated_failure_percentage && (100 * num_failed / total.to_f) < tolerated_failure_percentage

false
end

private

def each_item_processor(context)
context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) }
end

def step_nonblock!(context)
item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]])
item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context)
Expand All @@ -109,6 +136,10 @@ def step_nonblock!(context)
end
end

def parse_error(context)
each_item_processor(context).detect(&:failed?)&.output&.dig("Error")
end

def validate_state!(workflow)
validate_state_next!(workflow)
end
Expand Down
57 changes: 57 additions & 0 deletions lib/floe/workflow/states/retry_catch_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

module Floe
class Workflow
module States
module RetryCatchMixin
def find_retrier(error)
self.retry.detect { |r| r.match_error?(error) }
end

def find_catcher(error)
self.catch.detect { |c| c.match_error?(error) }
end

def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?

# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
context["State"]["Retrier"] = retrier.error_equals
end

context["State"]["RetryCount"] += 1

return if context["State"]["RetryCount"] > retrier.max_attempts

wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")

true
end

def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end
end
end
end
end
53 changes: 5 additions & 48 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# frozen_string_literal: true

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"
require_relative "retry_catch_mixin"

module Floe
class Workflow
module States
class Task < Floe::Workflow::State
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :credentials, :end, :heartbeat_seconds, :next, :parameters,
:result_selector, :resource, :timeout_seconds, :retry, :catch,
Expand Down Expand Up @@ -82,54 +87,6 @@ def success?(context)
runner.success?(context.state["RunnerContext"])
end

def find_retrier(error)
self.retry.detect { |r| r.match_error?(error) }
end

def find_catcher(error)
self.catch.detect { |c| c.match_error?(error) }
end

def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?

# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
context["State"]["Retrier"] = retrier.error_equals
end

context["State"]["RetryCount"] += 1

return if context["State"]["RetryCount"] > retrier.max_attempts

wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")

true
end

def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end

def parse_error(output)
return if output.nil?
return output if output.kind_of?(Hash)
Expand Down
120 changes: 120 additions & 0 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
}
}
end

let(:tolerated_failure_count) { nil }
let(:tolerated_failure_percentage) { nil }
let(:workflow) do
payload = {
"Validate-All" => {
Expand All @@ -38,6 +41,10 @@
"End" => true,
}
}

payload["Validate-All"]["ToleratedFailureCount"] = tolerated_failure_count if tolerated_failure_count
payload["Validate-All"]["ToleratedFailurePercentage"] = tolerated_failure_percentage if tolerated_failure_percentage

make_workflow(ctx, payload)
end

Expand Down Expand Up @@ -145,4 +152,117 @@
end
end
end

describe "#running?" do
before { state.start(ctx) }

context "with all iterations ended" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } }

it "returns false" do
expect(state.running?(ctx)).to be_falsey
end
end

context "with some iterations not ended" do
before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc }

it "returns true" do
expect(state.running?(ctx)).to be_truthy
end
end
end

describe "#ended?" do
before { state.start(ctx) }

context "with all iterations ended" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } }

it "returns true" do
expect(state.ended?(ctx)).to be_truthy
end
end

context "with some iterations not ended" do
before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc }

it "returns false" do
expect(state.ended?(ctx)).to be_falsey
end
end
end

describe "#success?" do
before { state.start(ctx) }

context "with no failed iterations" do
it "returns true" do
expect(state.success?(ctx)).to be_truthy
end
end

context "with no iterations" do
let(:input) { {"detail" => {"shipped" => []}} }

it "returns true" do
expect(state.success?(ctx)).to be_truthy
end
end

context "with all iterations failed" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["State"] = {"Output" => {"Error" => "FAILED!"}}} }

it "returns false" do
expect(state.success?(ctx)).to be_falsey
end
end

context "with mixed successful and failed iterations" do
before do
ctx.state["ItemProcessorContext"][0]["State"] = {"Output" => {"Error" => "FAILED!"}}
ctx.state["ItemProcessorContext"][2]["State"] = {"Output" => {"Error" => "FAILED!"}}
end

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end

context "with ToleratedFailureCount" do
context "greater than the number of failures" do
let(:tolerated_failure_count) { 3 }

it "returns false" do
expect(state.success?(ctx)).to be_truthy
end
end

context "less than the number of failures" do
let(:tolerated_failure_count) { 1 }

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end
end
end

context "with ToleratedFailurePercentage" do
context "greater than the number of failures" do
let(:tolerated_failure_percentage) { 50 }

it "returns false" do
expect(state.success?(ctx)).to be_truthy
end
end

context "less than the number of failures" do
let(:tolerated_failure_percentage) { 10 }

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end
end
end
end
end
end

0 comments on commit 8299cc4

Please sign in to comment.