Skip to content

Commit

Permalink
Implement Map state
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed May 20, 2024
1 parent 10e59de commit 69e0eed
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require_relative "floe/workflow/choice_rule/and"
require_relative "floe/workflow/choice_rule/data"
require_relative "floe/workflow/context"
require_relative "floe/workflow/item_processor"
require_relative "floe/workflow/path"
require_relative "floe/workflow/payload_template"
require_relative "floe/workflow/reference_path"
Expand Down
27 changes: 27 additions & 0 deletions lib/floe/workflow/item_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# frozen_string_literal: true

module Floe
class Workflow
class ItemProcessor
attr_reader :processor_config, :payload, :states, :start_at

def initialize(payload, name = nil)
@payload = payload
@name = name

raise Floe::InvalidWorkflowError, "Missing field \"States\" for state [#{name}]" if payload["States"].nil?
raise Floe::InvalidWorkflowError, "Missing field \"StartAt\" for state [#{name}]" if payload["StartAt"].nil?
raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field for state [#{name}]" unless payload["States"].key?(payload["StartAt"])

@processor_config = payload.fetch("ProcessorConfig", "INLINE")
@states = payload["States"].to_a.map { |state_name, state| State.build!(self, state_name, state) }
@states_by_name = @states.to_h { |state| [state.name, state] }
end

def value(_context, input = {})
# TODO: Run the states to get the output
input
end
end
end
end
63 changes: 61 additions & 2 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,71 @@
# frozen_string_literal: true

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"

module Floe
class Workflow
module States
class Map < Floe::Workflow::State
def initialize(*)
include InputOutputMixin
include NonTerminalMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :item_processor, :items_path,
:item_reader, :item_selector, :item_batcher, :result_writer,
:max_concurrency, :tolerated_failure_percentage, :tolerated_failure_count

def initialize(workflow, name, payload)
super

raise Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [#{name}]" if payload["ItemProcessor"].nil?

@next = payload["Next"]
@end = !!payload["End"]
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
@result_path = ReferencePath.new(payload.fetch("ResultPath", "$"))
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@retry = payload["Retry"].to_a.map { |retrier| Retrier.new(retrier) }
@catch = payload["Catch"].to_a.map { |catcher| Catcher.new(catcher) }
@item_processor = ItemProcessor.new(payload["ItemProcessor"], name)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_batcher = payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]
@tolerated_failure_count = payload["ToleratedFailureCount"]

validate_state!
end

def process_input(input)
input = super(input)
items_path.value(context, input)
end

def finish
input = process_input(context.input)
result = item_processor.value(context, input)
context.output = process_output(context.input, result)
super
raise NotImplementedError
end

def end?
@end
end

def running?
false
end

private

def validate_state!
validate_state_next!
end
end
end
Expand Down
25 changes: 25 additions & 0 deletions spec/workflow/item_processor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
RSpec.describe Floe::Workflow::ItemProcessor do
it "raises an exception for missing States field" do
payload = {"StartAt" => "Missing"}
expect { described_class.new(payload, "Map") }
.to raise_error(Floe::InvalidWorkflowError, "Missing field \"States\" for state [Map]")
end

it "raises an exception for missing StartAt field" do
payload = {"States" => {}}
expect { described_class.new(payload, "Map") }
.to raise_error(Floe::InvalidWorkflowError, "Missing field \"StartAt\" for state [Map]")
end

it "raises an exception if StartAt isn't in States" do
payload = {"StartAt" => "First", "States" => {"Second" => {"Type" => "Succeed"}}}
expect { described_class.new(payload, "Map") }
.to raise_error(Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field for state [Map]")
end

it "raises an exception if a Next state isn't in States" do
payload = {"StartAt" => "First", "States" => {"First" => {"Type" => "Pass", "Next" => "Last"}}}
expect { described_class.new(payload, "Map") }
.to raise_error(Floe::InvalidWorkflowError, "\"Next\" [Last] not in \"States\" for state [First]")
end
end
114 changes: 114 additions & 0 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
RSpec.describe Floe::Workflow::States::Map do
let(:input) { {} }
let(:ctx) { Floe::Workflow::Context.new(:input => input) }
let(:state) { workflow.current_state }
let(:input) do
{
"ship-date" => "2016-03-14T01:59:00Z",
"detail" => {
"delivery-partner" => "UQS",
"shipped" => [
{"prod" => "R31", "dest-code" => 9511, "quantity" => 1344},
{"prod" => "S39", "dest-code" => 9511, "quantity" => 40},
{"prod" => "R31", "dest-code" => 9833, "quantity" => 12},
{"prod" => "R40", "dest-code" => 9860, "quantity" => 887},
{"prod" => "R40", "dest-code" => 9511, "quantity" => 1220}
]
}
}
end
let(:workflow) do
payload = {
"Validate-All" => {
"Type" => "Map",
"InputPath" => "$.detail",
"ItemsPath" => "$.shipped",
"MaxConcurrency" => 0,
"ItemProcessor" => {
"StartAt" => "Validate",
"States" => {
"Validate" => {
"Type" => "Pass",
"OutputPath" => "$.Payload",
"End" => true
}
}
},
"ResultPath" => "$.detail.result",
"End" => true,
}
}
make_workflow(ctx, payload)
end

describe "#initialize" do
it "raises an InvalidWorkflowError with a missing ItemProcessor" do
payload = {
"Validate-All" => {
"Type" => "Map",
"End" => true
}
}

expect { make_workflow(ctx, payload) }
.to raise_error(Floe::InvalidWorkflowError, "Missing \"InputProcessor\" field in state [Validate-All]")
end

it "raises an InvalidWorkflowError with a missing Next and End" do
payload = {
"Validate-All" => {
"Type" => "Map",
"ItemProcessor" => {
"StartAt" => "Validate",
"States" => {"Validate" => {"Type" => "Succeed"}}
}
}
}

expect { make_workflow(ctx, payload) }
.to raise_error(Floe::InvalidWorkflowError, "Missing \"Next\" field in state [Validate-All]")
end

it "raises an InvalidWorkflowError if a state in ItemProcessor attempts to transition to a state in the outer workflow" do
payload = {
"StartAt" => "MapState",
"States" => {
"MapState" => {
"Type" => "Map",
"Next" => "PassState",
"ItemProcessor" => {
"StartAt" => "Validate",
"States" => {
"Validate" => {
"Type" => "Pass",
"Next" => "PassState"
}
}
}
},
"PassState" => {
"Type" => "Pass",
"Next" => "SucceedState"
},
"SucceedState" => {
"Type" => "Succeed"
}
}
}

expect { Floe::Workflow.new(payload, ctx) }
.to raise_error(Floe::InvalidWorkflowError, "\"Next\" [PassState] not in \"States\" for state [Validate]")
end
end

it "#end?" do
expect(state.end?).to be true
end

describe "#run_nonblock!" do
it "has no next" do
state.run_nonblock!
expect(ctx.next_state).to eq(nil)
end
end
end

0 comments on commit 69e0eed

Please sign in to comment.