Skip to content

Commit

Permalink
Validate that the workflow payload is correct
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Oct 30, 2023
1 parent 76851e3 commit 101276e
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 19 deletions.
4 changes: 4 additions & 0 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def initialize(payload, context = nil, credentials = {})
credentials = JSON.parse(credentials) if credentials.kind_of?(String)
context = Context.new(context) unless context.kind_of?(Context)

raise Floe::InvalidWorkflowError, "Missing field \"States\"" if payload["States"].nil?
raise Floe::InvalidWorkflowError, "Missing field \"StartAt\"" if payload["StartAt"].nil?
raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field" unless payload["States"].keys.include?(payload["StartAt"])

@payload = payload
@context = context
@credentials = credentials
Expand Down
3 changes: 3 additions & 0 deletions lib/floe/workflow/path.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def value(payload, context, input = {})

def initialize(payload)
@payload = payload

raise Floe::InvalidWorkflowError, "Path [#{payload}] must be a string" if payload.nil? || !payload.kind_of?(String)
raise Floe::InvalidWorkflowError, "Path [#{payload}] must start with \"$\"" if payload[0] != "$"
end

def value(context, input = {})
Expand Down
4 changes: 4 additions & 0 deletions lib/floe/workflow/state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class State
class << self
def build!(workflow, name, payload)
state_type = payload["Type"]
raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil?

begin
klass = Floe::Workflow::States.const_get(state_type)
Expand All @@ -27,6 +28,9 @@ def initialize(workflow, name, payload)
@payload = payload
@type = payload["Type"]
@comment = payload["Comment"]

raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil?
raise Floe::InvalidWorkflowError, "State name [#{name}] must be less than or equal to 80 characters" if name.length > 80
end

def run!(_input = nil)
Expand Down
3 changes: 3 additions & 0 deletions lib/floe/workflow/states/pass.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def initialize(workflow, name, payload)
@end = !!payload["End"]
@result = payload["Result"]

raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end
raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next)

@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))
Expand Down
3 changes: 3 additions & 0 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def initialize(workflow, name, payload)
@parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"]
@result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
@credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"]

raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end
raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next)
end

def start(input)
Expand Down
3 changes: 3 additions & 0 deletions lib/floe/workflow/states/wait.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def initialize(workflow, name, payload)

@input_path = Path.new(payload.fetch("InputPath", "$"))
@output_path = Path.new(payload.fetch("OutputPath", "$"))

raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end
raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next)
end

def start(input)
Expand Down
9 changes: 9 additions & 0 deletions spec/workflow/path_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
RSpec.describe Floe::Workflow::Path do
describe "#value" do
it "raises an exception if the payload isn't a string" do
expect { described_class.new(nil) }.to raise_error(Floe::InvalidWorkflowError, "Path [] must be a string")
expect { described_class.new(0) }.to raise_error(Floe::InvalidWorkflowError, "Path [0] must be a string")
end

it "raises an exception if the first character isn't a $" do
expect { described_class.new("foo") }.to raise_error(Floe::InvalidWorkflowError, "Path [foo] must start with \"$\"")
end

context "referencing the global context" do
it "with a missing value" do
expect(described_class.new("$$.foo").value({})).to be_nil
Expand Down
26 changes: 13 additions & 13 deletions spec/workflow/states/task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

describe "Input" do
context "with no InputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) }

it "passes the whole context to the resource" do
expect(mock_runner)
Expand All @@ -34,7 +34,7 @@
end

context "with an InputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo"}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo", "End" => true}}) }

it "filters the context passed to the resource" do
expect(mock_runner)
Expand All @@ -47,7 +47,7 @@
end

context "with Parameters" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) }

it "passes the interpolated parameters to the resource" do
expect(mock_runner)
Expand All @@ -61,7 +61,7 @@
end

describe "Output" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) }

it "uses the last line as output if it is JSON" do
expect(mock_runner)
Expand Down Expand Up @@ -101,7 +101,7 @@
end

context "ResultSelector" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}, "End" => true}}) }

it "filters the results" do
expect(mock_runner)
Expand All @@ -116,7 +116,7 @@
end

context "ResultPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs"}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs", "End" => true}}) }

it "inserts the response into the input" do
expect(mock_runner)
Expand All @@ -135,7 +135,7 @@
end

context "OutputPath" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path, "End" => true}}) }

context "with the default '$'" do
let(:output_path) { "$" }
Expand Down Expand Up @@ -174,7 +174,7 @@
end

describe "Retry" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2, "End" => true}}) }

context "with specific errors" do
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}] }
Expand All @@ -195,7 +195,7 @@
end

context "with multiple retriers" do
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"]}] }
let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"], "End" => true}] }

it "resets the retrier if a different exception is raised" do
expect(mock_runner).to receive(:running?).and_return(false)
Expand Down Expand Up @@ -266,7 +266,7 @@
end

context "with a Catch" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) }
let(:success) { false }

it "retry preceeds catch" do
Expand Down Expand Up @@ -299,7 +299,7 @@
let(:success) { false }

context "with specific errors" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}], "End" => true}}) }

it "catches the exception" do
expect(mock_runner).to receive("output").and_return("States.Timeout")
Expand Down Expand Up @@ -327,7 +327,7 @@
end

context "with a State.ALL catcher" do
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) }
let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) }

it "catches a more specific exception" do
expect(mock_runner).to receive("output").and_return("States.Timeout")
Expand Down Expand Up @@ -356,7 +356,7 @@

describe "#end?" do
it "with a normal state" do
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "ChoiceState"}})
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}})
state = workflow.current_state
expect(state.end?).to be false
end
Expand Down
8 changes: 4 additions & 4 deletions spec/workflow/states/wait_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

describe "#running?" do
context "with seconds" do
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -36,7 +36,7 @@

context "with secondsPath" do
let(:input) { {"expire" => "1"} }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -52,7 +52,7 @@

context "with timestamp" do
let(:expiry) { Time.now.utc + 1 }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand All @@ -69,7 +69,7 @@
context "with timestamp" do
let(:expiry) { Time.now.utc + 1 }
let(:input) { {"expire" => expiry.iso8601} }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}}) }
let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) }
it "is running? before finished" do
state.start(ctx.input)
expect(state.running?).to be_truthy
Expand Down
35 changes: 33 additions & 2 deletions spec/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,37 @@
expect(ctx.state_name).to eq("FirstState")
expect(ctx.input).to eq(input)
end

it "raises an exception for missing States" do
payload = {"Comment" => "Test", "StartAt" => "Nothing"}

expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"States\"")
end

it "raises an exception for missing StartAt" do
payload = {"Comment" => "Test", "States" => {}}

expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"StartAt\"")
end

it "raises an exception for StartAt not in States" do
payload = {"Comment" => "Test", "StartAt" => "Foo", "States" => {"Bar" => {"Type" => "Succeed"}}}

expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field")
end

it "raises an exception for a State missing a Type field" do
payload = {"Comment" => "Test", "StartAt" => "First", "States" => {"First" => {}}}

expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing \"Type\" field in state [First]")
end

it "raises an exception for an invalid State name" do
state_name = 81.times.map { "a" }.join
payload = {"Comment" => "Test", "StartAt" => state_name, "States" => {state_name => {"Type" => "Succeed"}}}

expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, /must be less than or equal to 80 characters/)
end
end

describe "#run!" do
Expand Down Expand Up @@ -226,7 +257,7 @@
context "with a state that is running" do
it "returns Try again" do
ctx.state["EnteredTime"] = Time.now.utc.iso8601
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}})
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}})
expect(workflow.current_state).to receive(:running?).and_return(true)
expect(workflow.step_nonblock_wait(:timeout => 0)).to eq(Errno::EAGAIN)
end
Expand All @@ -253,7 +284,7 @@
context "with a state that is running" do
it "returns false" do
ctx.state["EnteredTime"] = Time.now.utc.iso8601
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}})
workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}})
expect(workflow.current_state).to receive(:running?).and_return(true)
expect(workflow.step_nonblock_ready?).to be_falsey
end
Expand Down

0 comments on commit 101276e

Please sign in to comment.