From 8b0590d5a04036624090edb2c8717e3bc26f42ba Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 27 Oct 2023 14:31:04 -0400 Subject: [PATCH] Validate that the workflow payload is correct --- lib/floe.rb | 1 + lib/floe/workflow.rb | 4 +++ lib/floe/workflow/path.rb | 3 ++ lib/floe/workflow/state.rb | 4 +++ lib/floe/workflow/states/choice.rb | 18 ++++++++++ .../workflow/states/non_terminal_mixin.rb | 14 ++++++++ lib/floe/workflow/states/pass.rb | 10 ++++++ lib/floe/workflow/states/task.rb | 8 +++++ lib/floe/workflow/states/wait.rb | 8 +++++ spec/workflow/path_spec.rb | 9 +++++ spec/workflow/states/choice_spec.rb | 20 +++++++++++ spec/workflow/states/task_spec.rb | 26 +++++++------- spec/workflow/states/wait_spec.rb | 8 ++--- spec/workflow_spec.rb | 35 +++++++++++++++++-- 14 files changed, 149 insertions(+), 19 deletions(-) create mode 100644 lib/floe/workflow/states/non_terminal_mixin.rb diff --git a/lib/floe.rb b/lib/floe.rb index b691b9ac..17bd1d43 100644 --- a/lib/floe.rb +++ b/lib/floe.rb @@ -26,6 +26,7 @@ require_relative "floe/workflow/states/choice" require_relative "floe/workflow/states/fail" require_relative "floe/workflow/states/map" +require_relative "floe/workflow/states/non_terminal_mixin" require_relative "floe/workflow/states/parallel" require_relative "floe/workflow/states/pass" require_relative "floe/workflow/states/succeed" diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 5f288f31..5f9093da 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -38,6 +38,10 @@ def initialize(payload, context = nil, credentials = {}) credentials = JSON.parse(credentials) if credentials.kind_of?(String) context = Context.new(context) unless context.kind_of?(Context) + raise Floe::InvalidWorkflowError, "Missing field \"States\"" if payload["States"].nil? + raise Floe::InvalidWorkflowError, "Missing field \"StartAt\"" if payload["StartAt"].nil? + raise Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field" unless payload["States"].key?(payload["StartAt"]) + @payload = payload @context = context @credentials = credentials diff --git a/lib/floe/workflow/path.rb b/lib/floe/workflow/path.rb index 4234da76..2268ff0f 100644 --- a/lib/floe/workflow/path.rb +++ b/lib/floe/workflow/path.rb @@ -11,6 +11,9 @@ def value(payload, context, input = {}) def initialize(payload) @payload = payload + + raise Floe::InvalidWorkflowError, "Path [#{payload}] must be a string" if payload.nil? || !payload.kind_of?(String) + raise Floe::InvalidWorkflowError, "Path [#{payload}] must start with \"$\"" if payload[0] != "$" end def value(context, input = {}) diff --git a/lib/floe/workflow/state.rb b/lib/floe/workflow/state.rb index 12a2b697..05f4bd31 100644 --- a/lib/floe/workflow/state.rb +++ b/lib/floe/workflow/state.rb @@ -8,6 +8,7 @@ class State class << self def build!(workflow, name, payload) state_type = payload["Type"] + raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil? begin klass = Floe::Workflow::States.const_get(state_type) @@ -27,6 +28,9 @@ def initialize(workflow, name, payload) @payload = payload @type = payload["Type"] @comment = payload["Comment"] + + raise Floe::InvalidWorkflowError, "Missing \"Type\" field in state [#{name}]" if payload["Type"].nil? + raise Floe::InvalidWorkflowError, "State name [#{name}] must be less than or equal to 80 characters" if name.length > 80 end def run!(_input = nil) diff --git a/lib/floe/workflow/states/choice.rb b/lib/floe/workflow/states/choice.rb index d4aa1cf6..ed4afadc 100644 --- a/lib/floe/workflow/states/choice.rb +++ b/lib/floe/workflow/states/choice.rb @@ -9,6 +9,8 @@ class Choice < Floe::Workflow::State def initialize(workflow, name, payload) super + validate_state! + @choices = payload["Choices"].map { |choice| ChoiceRule.build(choice) } @default = payload["Default"] @@ -33,6 +35,22 @@ def running? def end? false end + + private + + def validate_state! + validate_state_choices! + validate_state_default! + end + + def validate_state_choices! + raise Floe::InvalidWorkflowError, "Choice state must have \"Choices\"" unless payload.key?("Choices") + raise Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array" unless payload["Choices"].kind_of?(Array) && !payload["Choices"].empty? + end + + def validate_state_default! + raise Floe::InvalidWorkflowError, "\"Default\" not in \"States\"" unless workflow.payload["States"].include?(payload["Default"]) + end end end end diff --git a/lib/floe/workflow/states/non_terminal_mixin.rb b/lib/floe/workflow/states/non_terminal_mixin.rb new file mode 100644 index 00000000..b0ba756e --- /dev/null +++ b/lib/floe/workflow/states/non_terminal_mixin.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module Floe + class Workflow + module States + module NonTerminalMixin + def validate_state_next! + raise Floe::InvalidWorkflowError, "Missing \"Next\" field in state [#{name}]" if @next.nil? && !@end + raise Floe::InvalidWorkflowError, "\"Next\" [#{@next}] not in \"States\" for state [#{name}]" if @next && !workflow.payload["States"].key?(@next) + end + end + end + end +end diff --git a/lib/floe/workflow/states/pass.rb b/lib/floe/workflow/states/pass.rb index 6594ea4b..0a006260 100644 --- a/lib/floe/workflow/states/pass.rb +++ b/lib/floe/workflow/states/pass.rb @@ -4,6 +4,8 @@ module Floe class Workflow module States class Pass < Floe::Workflow::State + include NonTerminalMixin + attr_reader :end, :next, :result, :parameters, :input_path, :output_path, :result_path def initialize(workflow, name, payload) @@ -17,6 +19,8 @@ def initialize(workflow, name, payload) @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) @result_path = ReferencePath.new(payload.fetch("ResultPath", "$")) + + validate_state! end def start(input) @@ -36,6 +40,12 @@ def running? def end? @end end + + private + + def validate_state! + validate_state_next! + end end end end diff --git a/lib/floe/workflow/states/task.rb b/lib/floe/workflow/states/task.rb index 124201c8..1fc3a27f 100644 --- a/lib/floe/workflow/states/task.rb +++ b/lib/floe/workflow/states/task.rb @@ -4,6 +4,8 @@ module Floe class Workflow module States class Task < Floe::Workflow::State + include NonTerminalMixin + attr_reader :credentials, :end, :heartbeat_seconds, :next, :parameters, :result_selector, :resource, :timeout_seconds, :retry, :catch, :input_path, :output_path, :result_path @@ -25,6 +27,8 @@ def initialize(workflow, name, payload) @parameters = PayloadTemplate.new(payload["Parameters"]) if payload["Parameters"] @result_selector = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"] @credentials = PayloadTemplate.new(payload["Credentials"]) if payload["Credentials"] + + validate_state! end def start(input) @@ -72,6 +76,10 @@ def end? attr_reader :runner + def validate_state! + validate_state_next! + end + def success? runner.success?(context.state["RunnerContext"]) end diff --git a/lib/floe/workflow/states/wait.rb b/lib/floe/workflow/states/wait.rb index 38dfaa86..1cccb394 100644 --- a/lib/floe/workflow/states/wait.rb +++ b/lib/floe/workflow/states/wait.rb @@ -6,6 +6,8 @@ module Floe class Workflow module States class Wait < Floe::Workflow::State + include NonTerminalMixin + attr_reader :end, :next, :seconds, :input_path, :output_path def initialize(workflow, name, payload) @@ -20,6 +22,8 @@ def initialize(workflow, name, payload) @input_path = Path.new(payload.fetch("InputPath", "$")) @output_path = Path.new(payload.fetch("OutputPath", "$")) + + validate_state! end def start(input) @@ -41,6 +45,10 @@ def end? private + def validate_state! + validate_state_next! + end + def please_hold(input) wait( :seconds => @seconds_path ? @seconds_path.value(context, input).to_i : @seconds, diff --git a/spec/workflow/path_spec.rb b/spec/workflow/path_spec.rb index 0ca09255..d2c52d8d 100644 --- a/spec/workflow/path_spec.rb +++ b/spec/workflow/path_spec.rb @@ -1,4 +1,13 @@ RSpec.describe Floe::Workflow::Path do + it "raises an exception if the payload isn't a string" do + expect { described_class.new(nil) }.to raise_error(Floe::InvalidWorkflowError, "Path [] must be a string") + expect { described_class.new(0) }.to raise_error(Floe::InvalidWorkflowError, "Path [0] must be a string") + end + + it "raises an exception if the first character isn't a $" do + expect { described_class.new("foo") }.to raise_error(Floe::InvalidWorkflowError, "Path [foo] must start with \"$\"") + end + describe "#value" do context "referencing the global context" do it "with a missing value" do diff --git a/spec/workflow/states/choice_spec.rb b/spec/workflow/states/choice_spec.rb index 6286a6cb..2b51658a 100644 --- a/spec/workflow/states/choice_spec.rb +++ b/spec/workflow/states/choice_spec.rb @@ -28,6 +28,26 @@ ) end + it "raises an exception if Choices is missing" do + payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}} + expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Choice state must have \"Choices\"") + end + + it "raises an exception if Choices is not an array" do + payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => {}, "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}} + expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array") + end + + it "raises an exception if Choices is an empty array" do + payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => [], "Default" => "DefaultState"}, "DefaultState" => {"type" => "Succeed"}}} + expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Choices\" must be a non-empty array") + end + + it "raises an exception if Default isn't a valid state" do + payload = {"StartAt" => "Choice", "States" => {"Choice" => {"Type" => "Choice", "Choices" => [{"Variable" => "$.foo", "NumericEquals" => 1, "Next" => "FirstMatchState"}], "Default" => "MissingState"}}} + expect { Floe::Workflow.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"Default\" not in \"States\"") + end + it "#end?" do expect(state.end?).to eq(false) end diff --git a/spec/workflow/states/task_spec.rb b/spec/workflow/states/task_spec.rb index df6dfbb0..b47aec9e 100644 --- a/spec/workflow/states/task_spec.rb +++ b/spec/workflow/states/task_spec.rb @@ -13,7 +13,7 @@ describe "Input" do context "with no InputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) } it "passes the whole context to the resource" do expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "hello, world!") @@ -23,7 +23,7 @@ end context "with an InputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo"}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "InputPath" => "$.foo", "End" => true}}) } it "filters the context passed to the resource" do expect_run_async({"bar" => "baz"}, :output => nil) @@ -33,7 +33,7 @@ end context "with Parameters" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) } it "passes the interpolated parameters to the resource" do expect_run_async({"var1" => "baz"}, :output => nil) @@ -44,7 +44,7 @@ end describe "Output" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "End" => true}}) } it "uses the last line as output if it is JSON" do expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "ABCD\nHELLO\n{\"response\":[\"192.168.1.2\"]}") @@ -73,7 +73,7 @@ end context "ResultSelector" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultSelector" => {"ip_addrs.$" => "$.response"}, "End" => true}}) } it "filters the results" do expect_run_async({"foo" => {"bar" => "baz"}, "bar" => {"baz" => "foo"}}, :output => "ABCD\nHELLO\n{\"response\":[\"192.168.1.2\"],\"exit_code\":0}") @@ -85,7 +85,7 @@ end context "ResultPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs"}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.ip_addrs", "End" => true}}) } it "inserts the response into the input" do expect_run_async(input, :output => "[\"192.168.1.2\"]") @@ -101,7 +101,7 @@ end context "OutputPath" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "ResultPath" => "$.data.ip_addrs", "OutputPath" => output_path, "End" => true}}) } context "with the default '$'" do let(:output_path) { "$" } @@ -134,7 +134,7 @@ end describe "Retry" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => retriers, "TimeoutSeconds" => 2, "End" => true}}) } context "with specific errors" do let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}] } @@ -150,7 +150,7 @@ end context "with multiple retriers" do - let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"]}] } + let(:retriers) { [{"ErrorEquals" => ["States.Timeout"], "MaxAttempts" => 1}, {"ErrorEquals" => ["Exception"], "End" => true}] } it "resets the retrier if a different exception is raised" do expect_run_async(input, :error => "States.Timeout") @@ -209,7 +209,7 @@ end context "with a Catch" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Retry" => [{"ErrorEquals" => ["States.Timeout"]}], "Catch" => [{"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) } it "retry preceeds catch" do expect_run_async(input, :error => "States.Timeout") @@ -233,7 +233,7 @@ describe "Catch" do context "with specific errors" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}], "End" => true}}) } it "catches the exception" do expect_run_async(input, :output => "States.Timeout", :success => false) @@ -255,7 +255,7 @@ end context "with a State.ALL catcher" do - let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}]}}) } + let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Catch" => [{"ErrorEquals" => ["States.Timeout"], "Next" => "FirstState"}, {"ErrorEquals" => ["States.ALL"], "Next" => "FailState"}], "End" => true}}) } it "catches a more specific exception" do expect_run_async(input, :output => "States.Timeout", :success => false) @@ -278,7 +278,7 @@ describe "#end?" do it "with a normal state" do - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "ChoiceState"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => resource, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) state = workflow.current_state expect(state.end?).to be false end diff --git a/spec/workflow/states/wait_spec.rb b/spec/workflow/states/wait_spec.rb index bf10a5c9..8def03a0 100644 --- a/spec/workflow/states/wait_spec.rb +++ b/spec/workflow/states/wait_spec.rb @@ -20,7 +20,7 @@ describe "#running?" do context "with seconds" do - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Seconds" => 1, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -36,7 +36,7 @@ context "with secondsPath" do let(:input) { {"expire" => "1"} } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "SecondsPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -52,7 +52,7 @@ context "with timestamp" do let(:expiry) { Time.now.utc + 1 } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "Timestamp" => expiry.iso8601, "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy @@ -69,7 +69,7 @@ context "with timestamp" do let(:expiry) { Time.now.utc + 1 } let(:input) { {"expire" => expiry.iso8601} } - let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}}) } + let(:workflow) { make_workflow(ctx, {"WaitState" => {"Type" => "Wait", "TimestampPath" => "$.expire", "Next" => "SuccessState"}, "SuccessState" => {"Type" => "Succeed"}}) } it "is running? before finished" do state.start(ctx.input) expect(state.running?).to be_truthy diff --git a/spec/workflow_spec.rb b/spec/workflow_spec.rb index 3051b7fd..4dbeddb3 100644 --- a/spec/workflow_spec.rb +++ b/spec/workflow_spec.rb @@ -22,6 +22,37 @@ expect(ctx.state_name).to eq("FirstState") expect(ctx.input).to eq(input) end + + it "raises an exception for missing States" do + payload = {"Comment" => "Test", "StartAt" => "Nothing"} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"States\"") + end + + it "raises an exception for missing StartAt" do + payload = {"Comment" => "Test", "States" => {}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing field \"StartAt\"") + end + + it "raises an exception for StartAt not in States" do + payload = {"Comment" => "Test", "StartAt" => "Foo", "States" => {"Bar" => {"Type" => "Succeed"}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "\"StartAt\" not in the \"States\" field") + end + + it "raises an exception for a State missing a Type field" do + payload = {"Comment" => "Test", "StartAt" => "First", "States" => {"First" => {}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, "Missing \"Type\" field in state [First]") + end + + it "raises an exception for an invalid State name" do + state_name = Array.new(81).map { "a" }.join + payload = {"Comment" => "Test", "StartAt" => state_name, "States" => {state_name => {"Type" => "Succeed"}}} + + expect { described_class.new(payload) }.to raise_error(Floe::InvalidWorkflowError, /must be less than or equal to 80 characters/) + end end describe "#run!" do @@ -226,7 +257,7 @@ context "with a state that is running" do it "returns Try again" do ctx.state["EnteredTime"] = Time.now.utc.iso8601 - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}}) expect(workflow.current_state).to receive(:running?).and_return(true) expect(workflow.step_nonblock_wait(:timeout => 0)).to eq(Errno::EAGAIN) end @@ -253,7 +284,7 @@ context "with a state that is running" do it "returns false" do ctx.state["EnteredTime"] = Time.now.utc.iso8601 - workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest"}}) + workflow = make_workflow(ctx, {"FirstState" => {"Type" => "Task", "Resource" => "docker://agrare/hello-world:latest", "End" => true}}) expect(workflow.current_state).to receive(:running?).and_return(true) expect(workflow.step_nonblock_ready?).to be_falsey end