Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions lib/mars/gate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,44 @@

module MARS
class Gate < Runnable
def initialize(name = "Gate", condition:, branches:, **kwargs)
class << self
def condition(&block)
@condition_block = block
end

attr_reader :condition_block

def branch(key, runnable)
branches_map[key] = runnable
end

def branches_map
@branches_map ||= {}
end
end

def initialize(name = "Gate", condition: nil, branches: nil, **kwargs)
super(name: name, **kwargs)

@condition = condition
@branches = branches
@condition = condition || self.class.condition_block
@branches = branches || self.class.branches_map
end

def run(input)
result = condition.call(input)
branch = branches[result]

return input unless branch

branches[result] || input
Halt.new(resolve_branch(branch).run(input))
end

private

attr_reader :condition, :branches

def resolve_branch(branch)
branch.is_a?(Class) ? branch.new : branch
end
end
end
11 changes: 11 additions & 0 deletions lib/mars/halt.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module MARS
class Halt
attr_reader :result

def initialize(result)
@result = result
end
end
end
26 changes: 16 additions & 10 deletions lib/mars/workflows/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,22 @@ def initialize(name, steps:, aggregator: nil, **kwargs)

def run(input)
errors = []
results = Async do |workflow|
tasks = @steps.map do |step|
results = execute_steps(input, errors)

raise AggregateError, errors if errors.any?

has_halt = results.any?(Halt)
result = aggregator.run(results)
has_halt ? Halt.new(result) : result
end

private

attr_reader :steps, :aggregator

def execute_steps(input, errors)
Async do |workflow|
tasks = steps.map do |step|
workflow.async do
step.run(input)
rescue StandardError => e
Expand All @@ -23,15 +37,7 @@ def run(input)

tasks.map(&:wait)
end.result

raise AggregateError, errors if errors.any?

aggregator.run(results)
end

private

attr_reader :steps, :aggregator
end
end
end
8 changes: 3 additions & 5 deletions lib/mars/workflows/sequential.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ def initialize(name, steps:, **kwargs)

def run(input)
@steps.each do |step|
result = step.run(input)
input = step.run(input)

if result.is_a?(Runnable)
input = result.run(input)
if input.is_a?(Halt)
input = input.result
break
else
input = result
end
end

Expand Down
6 changes: 3 additions & 3 deletions spec/mars/aggregator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.describe MARS::Aggregator do
describe "#run" do
context "when called without a block" do
context "when called without an operation" do
let(:aggregator) { described_class.new }

it "returns the input as is" do
Expand All @@ -11,10 +11,10 @@
end
end

context "when initialized with a block operation" do
context "when initialized with an operation" do
let(:aggregator) { described_class.new("Aggregator", operation: lambda(&:join)) }

it "executes the block and returns its value" do
it "executes the operation and returns its value" do
result = aggregator.run(%w[a b c])
expect(result).to eq("abc")
end
Expand Down
136 changes: 89 additions & 47 deletions spec/mars/gate_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,82 +2,124 @@

RSpec.describe MARS::Gate do
describe "#run" do
let(:gate) { described_class.new("TestGate", condition: condition, branches: branches) }
context "with constructor-based configuration" do
let(:short_step) do
Class.new(MARS::Runnable) do
def run(input)
"short: #{input}"
end
end.new
end

context "with simple boolean condition" do
let(:condition) { ->(input) { input > 5 } }
let(:false_branch) { instance_spy(MARS::Runnable) }
let(:branches) { { false => false_branch } }
let(:long_step) do
Class.new(MARS::Runnable) do
def run(input)
"long: #{input}"
end
end.new
end

it "returns the input when no branch matches" do
result = gate.run(10)
expect(result).to eq(10)
let(:gate) do
described_class.new(
"LengthGate",
condition: ->(input) { input.length > 5 ? :long : :short },
branches: { short: short_step, long: long_step }
)
end

it "returns the false branch when condition is false" do
result = gate.run(3)
it "returns a Halt wrapping the branch result" do
result = gate.run("hi")
expect(result).to be_a(MARS::Halt)
expect(result.result).to eq("short: hi")
end

expect(result).to eq(false_branch)
it "executes the other branch for different input" do
result = gate.run("longstring")
expect(result).to be_a(MARS::Halt)
expect(result.result).to eq("long: longstring")
end

it "does not run the false branch when condition is false" do
gate.run(3)
it "returns input when no branch matches" do
gate = described_class.new(
"NoMatch",
condition: ->(_input) { :unknown },
branches: { short: short_step }
)

expect(false_branch).not_to have_received(:run)
expect(gate.run("hello")).to eq("hello")
end
end

context "with string-based condition" do
let(:condition) { ->(input) { input.length > 5 ? "long" : "short" } }
let(:long_branch) { instance_spy(MARS::Runnable) }
let(:short_branch) { instance_spy(MARS::Runnable) }
let(:branches) { { "long" => long_branch, "short" => short_branch } }

it "routes to long branch for long strings" do
result = gate.run("longstring")
context "with class-level DSL" do
let(:short_step_class) do
Class.new(MARS::Runnable) do
def run(input)
"quick: #{input}"
end
end
end

expect(result).to eq(long_branch)
let(:long_step_class) do
Class.new(MARS::Runnable) do
def run(input)
"deep: #{input}"
end
end
end

it "routes to short branch for short strings" do
result = gate.run("hi")
it "uses condition and branch DSL" do
short_cls = short_step_class
long_cls = long_step_class

gate_class = Class.new(described_class) do
condition { |input| input.length < 5 ? :short : :long }
branch :short, short_cls
branch :long, long_cls
end

expect(result).to eq(short_branch)
gate = gate_class.new("DSLGate")
expect(gate.run("hi").result).to eq("quick: hi")
expect(gate.run("longstring").result).to eq("deep: longstring")
end
end

context "with complex condition logic" do
let(:condition) do
lambda do |input|
case input
when 0..10 then "low"
when 11..50 then "medium"
else "high"
end
end
let(:low_step) do
Class.new(MARS::Runnable) { def run(input) = "low:#{input}" }.new
end

let(:low_branch) { instance_spy(MARS::Runnable) }
let(:medium_branch) { instance_spy(MARS::Runnable) }
let(:high_branch) { instance_spy(MARS::Runnable) }
let(:branches) { { "low" => low_branch, "medium" => medium_branch, "high" => high_branch } }
let(:medium_step) do
Class.new(MARS::Runnable) { def run(input) = "med:#{input}" }.new
end

it "routes to low branch" do
result = gate.run(5)
let(:high_step) do
Class.new(MARS::Runnable) { def run(input) = "high:#{input}" }.new
end

expect(result).to eq(low_branch)
let(:gate) do
described_class.new(
"SeverityGate",
condition: lambda { |input|
case input
when 0..10 then :low
when 11..50 then :medium
else :high
end
},
branches: { low: low_step, medium: medium_step, high: high_step }
)
end

it "routes to medium branch" do
result = gate.run(25)
it "routes to low branch" do
expect(gate.run(5).result).to eq("low:5")
end

expect(result).to eq(medium_branch)
it "routes to medium branch" do
expect(gate.run(25).result).to eq("med:25")
end

it "routes to high branch" do
result = gate.run(100)

expect(result).to eq(high_branch)
expect(gate.run(100).result).to eq("high:100")
end
end
end
Expand Down
21 changes: 21 additions & 0 deletions spec/mars/workflows/parallel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ def run(_input)
expect(workflow.run(42)).to eq([])
end

it "propagates Halt to parent workflow when a step halts" do
gate = MARS::Gate.new(
"AlwaysBranch",
condition: ->(_input) { :branch },
branches: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
end.new
}
)
add_five = add_step_class.new(5)

workflow = described_class.new("halt_workflow", steps: [gate, add_five])

result = workflow.run(10)
# Aggregator runs on all results, but output is wrapped in Halt
expect(result).to be_a(MARS::Halt)
end

it "propagates errors from steps" do
add_step = add_step_class.new(5)
error_step = error_step_class.new("Step failed", "error_step_one")
Expand Down
21 changes: 21 additions & 0 deletions spec/mars/workflows/sequential_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,27 @@ def run(_input)
expect(workflow.run(42)).to eq(42)
end

it "halts when a step returns a Halt" do
add_five = add_step_class.new(5)
gate = MARS::Gate.new(
"AlwaysBranch",
condition: ->(_input) { :branch },
branches: {
branch: Class.new(MARS::Runnable) do
def run(input)
"branched:#{input}"
end
end.new
}
)
multiply_three = multiply_step_class.new(3)

workflow = described_class.new("halt_workflow", steps: [add_five, gate, multiply_three])

# 10 + 5 = 15, gate branches -> "branched:15", multiply_three is never reached
expect(workflow.run(10)).to eq("branched:15")
end

it "propagates errors from steps" do
add_step = add_step_class.new(5)
error_step = error_step_class.new("Step failed")
Expand Down