From 69e0eed2e078bf35bc4e0287783bedd083b3d78a Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Mon, 20 May 2024 11:29:56 -0400 Subject: [PATCH] Implement Map state --- lib/floe.rb | 1 + lib/floe/workflow/item_processor.rb | 27 +++++++ lib/floe/workflow/states/map.rb | 63 ++++++++++++++- spec/workflow/item_processor_spec.rb | 25 ++++++ spec/workflow/states/map_spec.rb | 114 +++++++++++++++++++++++++++ 5 files changed, 228 insertions(+), 2 deletions(-) create mode 100644 lib/floe/workflow/item_processor.rb create mode 100644 spec/workflow/item_processor_spec.rb create mode 100644 spec/workflow/states/map_spec.rb diff --git a/lib/floe.rb b/lib/floe.rb index 7b4368c17..84ce0e46a 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -15,6 +15,7 @@ require_relative "floe/workflow/choice_rule/and" require_relative "floe/workflow/choice_rule/data" require_relative "floe/workflow/context" +require_relative "floe/workflow/item_processor" require_relative "floe/workflow/path" require_relative "floe/workflow/payload_template" require_relative "floe/workflow/reference_path" diff --git a/lib/floe/workflow/item_processor.rb b/lib/floe/workflow/item_processor.rb new file mode 100644 index 000000000..0e45db99d --- /dev/null +++ b/lib/floe/workflow/item_processor.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Floe + class Workflow + class ItemProcessor + attr_reader :processor_config, :payload, :states, :start_at + + def initialize(payload, name = nil) + @payload = payload + @name = name + + raise Floe::InvalidWorkflowError, "Missing field \"States\" for state [#{name}]" if payload["States"].nil? + raise Floe::InvalidWorkflowError, "Missing field \"StartAt\" for state [#{name}]" if payload["StartAt"].nil? + raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field for state [#{name}]" unless payload["States"].key?(payload["StartAt"]) + + @processor_config = payload.fetch("ProcessorConfig", "INLINE") + @states = payload["States"].to_a.map { |state_name, state| State.build!(self, state_name, state) } + @states_by_name = @states.to_h { |state| [state.name, state] } + end + + def value(_context, input = {}) + # TODO: Run the states to get the output + input + end + end + end +end diff --git a/lib/floe/workflow/states/map.rb b/lib/floe/workflow/states/map.rb index 15c1bdfae..245a18063 100644 --- a/lib/floe/workflow/states/map.rb +++ b/lib/floe/workflow/states/map.rb @@ -1,12 +1,71 @@ # frozen_string_literal: true +require_relative "input_output_mixin" +require_relative "non_terminal_mixin" + module Floe class Workflow module States class Map < Floe::Workflow::State - def initialize(*) + include InputOutputMixin + include NonTerminalMixin + + attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path, + :result_selector, :retry, :catch, :item_processor, :items_path, + :item_reader, :item_selector, :item_batcher, :result_writer, + :max_concurrency, :tolerated_failure_percentage, :tolerated_failure_count + + def initialize(workflow, name, payload) + super + + raise Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [#{name}]" if payload["ItemProcessor"].nil? + + @next = payload["Next"] + @end = !!payload["End"] + @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] + @input_path = Path.new(payload.fetch("InputPath", "$")) + @output_path = Path.new(payload.fetch("OutputPath", "$")) + @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) + @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] + @retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) } + @catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) } + @item_processor = ItemProcessor.new(payload["ItemProcessor"], name) + @items_path = ReferencePath.new(payload.fetch("ItemsPath", "$")) + @item_reader = payload["ItemReader"] + @item_selector = payload["ItemSelector"] + @item_batcher = payload["ItemBatcher"] + @result_writer = payload["ResultWriter"] + @max_concurrency = payload["MaxConcurrency"]&.to_i + @tolerated_failure_percentage = payload["ToleratedFailurePercentage"] + @tolerated_failure_count = payload["ToleratedFailureCount"] + + validate_state! + end + + def process_input(input) + input = super(input) + items_path.value(context, input) + end + + def finish + input = process_input(context.input) + result = item_processor.value(context, input) + context.output = process_output(context.input, result) super - raise NotImplementedError + end + + def end? + @end + end + + def running? + false + end + + private + + def validate_state! + validate_state_next! end end end diff --git a/spec/workflow/item_processor_spec.rb b/spec/workflow/item_processor_spec.rb new file mode 100644 index 000000000..0c55d5c5e --- /dev/null +++ b/spec/workflow/item_processor_spec.rb @@ -0,0 +1,25 @@ +RSpec.describe Floe::Workflow::ItemProcessor do + it "raises an exception for missing States field" do + payload = {"StartAt" => "Missing"} + expect { described_class.new(payload, "Map") } + .to raise_error(Floe::InvalidWorkflowError, "Missing field \"States\" for state [Map]") + end + + it "raises an exception for missing StartAt field" do + payload = {"States" => {}} + expect { described_class.new(payload, "Map") } + .to raise_error(Floe::InvalidWorkflowError, "Missing field \"StartAt\" for state [Map]") + end + + it "raises an exception if StartAt isn't in States" do + payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}} + expect { described_class.new(payload, "Map") } + .to raise_error(Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field for state [Map]") + end + + it "raises an exception if a Next state isn't in States" do + payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}} + expect { described_class.new(payload, "Map") } + .to raise_error(Floe::InvalidWorkflowError, "\"Next\" [Last] not in \"States\" for state [First]") + end +end diff --git a/spec/workflow/states/map_spec.rb b/spec/workflow/states/map_spec.rb new file mode 100644 index 000000000..e2afad29b --- /dev/null +++ b/spec/workflow/states/map_spec.rb @@ -0,0 +1,114 @@ +RSpec.describe Floe::Workflow::States::Map do + let(:input) { {} } + let(:ctx) { Floe::Workflow::Context.new(:input => input) } + let(:state) { workflow.current_state } + let(:input) do + { + "ship-date" => "2016-03-14T01:59:00Z", + "detail" => { + "delivery-partner" => "UQS", + "shipped" => [ + {"prod" => "R31", "dest-code" => 9511, "quantity" => 1344}, + {"prod" => "S39", "dest-code" => 9511, "quantity" => 40}, + {"prod" => "R31", "dest-code" => 9833, "quantity" => 12}, + {"prod" => "R40", "dest-code" => 9860, "quantity" => 887}, + {"prod" => "R40", "dest-code" => 9511, "quantity" => 1220} + ] + } + } + end + let(:workflow) do + payload = { + "Validate-All" => { + "Type" => "Map", + "InputPath" => "$.detail", + "ItemsPath" => "$.shipped", + "MaxConcurrency" => 0, + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "OutputPath" => "$.Payload", + "End" => true + } + } + }, + "ResultPath" => "$.detail.result", + "End" => true, + } + } + make_workflow(ctx, payload) + end + + describe "#initialize" do + it "raises an InvalidWorkflowError with a missing ItemProcessor" do + payload = { + "Validate-All" => { + "Type" => "Map", + "End" => true + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [Validate-All]") + end + + it "raises an InvalidWorkflowError with a missing Next and End" do + payload = { + "Validate-All" => { + "Type" => "Map", + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => {"Validate" => {"Type" => "Succeed"}} + } + } + } + + expect { make_workflow(ctx, payload) } + .to raise_error(Floe::InvalidWorkflowError, "Missing \"Next\" field in state [Validate-All]") + end + + it "raises an InvalidWorkflowError if a state in ItemProcessor attempts to transition to a state in the outer workflow" do + payload = { + "StartAt" => "MapState", + "States" => { + "MapState" => { + "Type" => "Map", + "Next" => "PassState", + "ItemProcessor" => { + "StartAt" => "Validate", + "States" => { + "Validate" => { + "Type" => "Pass", + "Next" => "PassState" + } + } + } + }, + "PassState" => { + "Type" => "Pass", + "Next" => "SucceedState" + }, + "SucceedState" => { + "Type" => "Succeed" + } + } + } + + expect { Floe::Workflow.new(payload, ctx) } + .to raise_error(Floe::InvalidWorkflowError, "\"Next\" [PassState] not in \"States\" for state [Validate]") + end + end + + it "#end?" do + expect(state.end?).to be true + end + + describe "#run_nonblock!" do + it "has no next" do + state.run_nonblock! + expect(ctx.next_state).to eq(nil) + end + end +end