From 101276e33c66a5cf2844eb1eeab943a8d23f5842 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 27 Oct 2023 14:31:04 -0400 Subject: [PATCH] Validate that the workflow payload is correct --- lib/floe/workflow.rb | 4 ++++ lib/floe/workflow/path.rb | 3 +++ lib/floe/workflow/state.rb | 4 ++++ lib/floe/workflow/states/pass.rb | 3 +++ lib/floe/workflow/states/task.rb | 3 +++ lib/floe/workflow/states/wait.rb | 3 +++ spec/workflow/path_spec.rb | 9 ++++++++ spec/workflow/states/task_spec.rb | 26 +++++++++++------------ spec/workflow/states/wait_spec.rb | 8 +++---- spec/workflow_spec.rb | 35 +++++++++++++++++++++++++++++-- 10 files changed, 79 insertions(+), 19 deletions(-) diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 5f288f312..874312b5a 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -38,6 +38,10 @@ def initialize(payload, context = nil, credentials = {}) credentials = JSON.parse(credentials) if credentials.kind_of?(String) context = Context.new(context) unless context.kind_of?(Context) + raise Floe::InvalidWorkflowError, "Missing field \"States\"" if payload["States"].nil? + raise Floe::InvalidWorkflowError, "Missing field \"StartAt\"" if payload["StartAt"].nil? + raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field" unless payload["States"].keys.include?(payload["StartAt"]) + @payload = payload @context = context @credentials = credentials diff --git a/lib/floe/workflow/path.rb b/lib/floe/workflow/path.rb index 4234da760..2268ff0fa 100644 --- a/lib/floe/workflow/path.rb +++ b/lib/floe/workflow/path.rb @@ -11,6 +11,9 @@ def value(payload, context, input = {}) def initialize(payload) @payload = payload + + raise Floe::InvalidWorkflowError, "Path [#{payload}] must be a string" if payload.nil? || !payload.kind_of?(String) + raise Floe::InvalidWorkflowError, "Path [#{payload}] must start with \"$\"" if payload[0] != "$" end def value(context, input = {}) diff --git a/lib/floe/workflow/state.rb b/lib/floe/workflow/state.rb index 12a2b6978..05f4bd31f 100644 --- a/lib/floe/workflow/state.rb +++ b/lib/floe/workflow/state.rb @@ -8,6 +8,7 @@ class State class << self def build!(workflow, name, payload) state_type = payload["Type"] + raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil? begin klass = Floe::Workflow::States.const_get(state_type) @@ -27,6 +28,9 @@ def initialize(workflow, name, payload) @payload = payload @type = payload["Type"] @comment = payload["Comment"] + + raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil? + raise Floe::InvalidWorkflowError, "State name [#{name}] must be less than or equal to 80 characters" if name.length > 80 end def run!(_input = nil) diff --git a/lib/floe/workflow/states/pass.rb b/lib/floe/workflow/states/pass.rb index 6594ea4b4..598af7c94 100644 --- a/lib/floe/workflow/states/pass.rb +++ b/lib/floe/workflow/states/pass.rb @@ -13,6 +13,9 @@ def initialize(workflow, name, payload) @end = !!payload["End"] @result = payload["Result"] + raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end + raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next) + @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) diff --git a/lib/floe/workflow/states/task.rb b/lib/floe/workflow/states/task.rb index 124201c88..f6c0bb305 100644 --- a/lib/floe/workflow/states/task.rb +++ b/lib/floe/workflow/states/task.rb @@ -25,6 +25,9 @@ def initialize(workflow, name, payload) @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] + + raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end + raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next) end def start(input) diff --git a/lib/floe/workflow/states/wait.rb b/lib/floe/workflow/states/wait.rb index 38dfaa864..55793eacc 100644 --- a/lib/floe/workflow/states/wait.rb +++ b/lib/floe/workflow/states/wait.rb @@ -20,6 +20,9 @@ def initialize(workflow, name, payload) @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) + + raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end + raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\ for state [#{name}]" if @next && !workflow.payload["States"].keys.include?(@next) end def start(input) diff --git a/spec/workflow/path_spec.rb b/spec/workflow/path_spec.rb index 0ca092556..fd35e37f1 100644 --- a/spec/workflow/path_spec.rb +++ b/spec/workflow/path_spec.rb @@ -1,5 +1,14 @@ RSpec.describe Floe::Workflow::Path do describe "#value" do + it "raises an exception if the payload isn't a string" do + expect { described_class.new(nil) }.to raise_error(Floe::InvalidWorkflowError, "Path [] must be a string") + expect { described_class.new(0) }.to raise_error(Floe::InvalidWorkflowError, "Path [0] must be a string") + end + + it "raises an exception if the first character isn't a $" do + expect { described_class.new("foo") }.to raise_error(Floe::InvalidWorkflowError, "Path [foo] must start with \"$\"") + end + context "referencing the global context" do it "with a missing value" do expect(described_class.new("$$.foo").value({})).to be_nil diff --git a/spec/workflow/states/task_spec.rb b/spec/workflow/states/task_spec.rb index f2f9af540..506b14ec2 100644 --- a/spec/workflow/states/task_spec.rb +++ b/spec/workflow/states/task_spec.rb @@ -21,7 +21,7 @@ describe "Input" do context "with no InputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) } it "passes the whole context to the resource" do expect(mock_runner) @@ -34,7 +34,7 @@ end context "with an InputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo"}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo", "End" => true}}) } it "filters the context passed to the resource" do expect(mock_runner) @@ -47,7 +47,7 @@ end context "with Parameters" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) } it "passes the interpolated parameters to the resource" do expect(mock_runner) @@ -61,7 +61,7 @@ end describe "Output" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) } it "uses the last line as output if it is JSON" do expect(mock_runner) @@ -101,7 +101,7 @@ end context "ResultSelector" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}, "End" => true}}) } it "filters the results" do expect(mock_runner) @@ -116,7 +116,7 @@ end context "ResultPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs"}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs", "End" => true}}) } it "inserts the response into the input" do expect(mock_runner) @@ -135,7 +135,7 @@ end context "OutputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path, "End" => true}}) } context "with the default '$'" do let(:output_path) { "$" } @@ -174,7 +174,7 @@ end describe "Retry" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2, "End" => true}}) } context "with specific errors" do let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}] } @@ -195,7 +195,7 @@ end context "with multiple retriers" do - let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"]}] } + let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"], "End" => true}] } it "resets the retrier if a different exception is raised" do expect(mock_runner).to receive(:running?).and_return(false) @@ -266,7 +266,7 @@ end context "with a Catch" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) } let(:success) { false } it "retry preceeds catch" do @@ -299,7 +299,7 @@ let(:success) { false } context "with specific errors" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}], "End" => true}}) } it "catches the exception" do expect(mock_runner).to receive("output").and_return("States.Timeout") @@ -327,7 +327,7 @@ end context "with a State.ALL catcher" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) } it "catches a more specific exception" do expect(mock_runner).to receive("output").and_return("States.Timeout") @@ -356,7 +356,7 @@ describe "#end?" do it "with a normal state" do - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "ChoiceState"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) state = workflow.current_state expect(state.end?).to be false end diff --git a/spec/workflow/states/wait_spec.rb b/spec/workflow/states/wait_spec.rb index bf10a5c98..8def03a0f 100644 --- a/spec/workflow/states/wait_spec.rb +++ b/spec/workflow/states/wait_spec.rb @@ -20,7 +20,7 @@ describe "#running?" do context "with seconds" do - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -36,7 +36,7 @@ context "with secondsPath" do let(:input) { {"expire" => "1"} } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -52,7 +52,7 @@ context "with timestamp" do let(:expiry) { Time.now.utc + 1 } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -69,7 +69,7 @@ context "with timestamp" do let(:expiry) { Time.now.utc + 1 } let(:input) { {"expire" => expiry.iso8601} } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy diff --git a/spec/workflow_spec.rb b/spec/workflow_spec.rb index 3051b7fdc..7460604c0 100644 --- a/spec/workflow_spec.rb +++ b/spec/workflow_spec.rb @@ -22,6 +22,37 @@ expect(ctx.state_name).to eq("FirstState") expect(ctx.input).to eq(input) end + + it "raises an exception for missing States" do + payload = {"Comment" => "Test", "StartAt" => "Nothing"} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"States\"") + end + + it "raises an exception for missing StartAt" do + payload = {"Comment" => "Test", "States" => {}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"StartAt\"") + end + + it "raises an exception for StartAt not in States" do + payload = {"Comment" => "Test", "StartAt" => "Foo", "States" => {"Bar" => {"Type" => "Succeed"}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field") + end + + it "raises an exception for a State missing a Type field" do + payload = {"Comment" => "Test", "StartAt" => "First", "States" => {"First" => {}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing \"Type\" field in state [First]") + end + + it "raises an exception for an invalid State name" do + state_name = 81.times.map { "a" }.join + payload = {"Comment" => "Test", "StartAt" => state_name, "States" => {state_name => {"Type" => "Succeed"}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, /must be less than or equal to 80 characters/) + end end describe "#run!" do @@ -226,7 +257,7 @@ context "with a state that is running" do it "returns Try again" do ctx.state["EnteredTime"] = Time.now.utc.iso8601 - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}}) expect(workflow.current_state).to receive(:running?).and_return(true) expect(workflow.step_nonblock_wait(:timeout => 0)).to eq(Errno::EAGAIN) end @@ -253,7 +284,7 @@ context "with a state that is running" do it "returns false" do ctx.state["EnteredTime"] = Time.now.utc.iso8601 - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}}) expect(workflow.current_state).to receive(:running?).and_return(true) expect(workflow.step_nonblock_ready?).to be_falsey end