Processes and pipes in OOP

This article is partially based on John Hughes' (Arrows)[https://www.haskell.org/arrows/index.html] and the idea of unix processes on the OOP land.

The idea of Arrows is a way to compose functions in such a way that it look like a circuit. This is great, because we can encode a pipeline process just like we would write a diagram.

(Add arrow's diagram picture)

Also, we know how to compose processes on the terminal, and they are pretty simple.

Each process has 4 components:

  • arguments
  • stdin
  • stdout
  • stderr

We can imagine a pipeline that may looks like this:

echo "1" | add --value 4 - | sub --value 1
// 4

Looking at add, it has an argument -s 2, it use stdin with the -syntax, might use stderr and the result is piped to sub through stdout, and received as stdin on sub.

Let's start with a simple function composition...

(we are going to use compose = flip (.) for the entire article)

def compose(*fs)
  ->(arg) { fs.reduce(arg) do |acc, f| f.call(acc); end }
end

compose(->(x) { x + 1 }, ->(y) { y + 2 }).call(2) == 5

This will give use the simpliest composition we know...

Some processes might need some configuration so it can use later on when we run the computation. We can use a closure for this.

adder = ->(y) { ->(x) { x + y } }

compose(adder.call(2), ->(y) { y + 2 }).call(2) == 6

Now that we know how to configure our function, next step is improve it using a class.

class Adder
  def self.make(x); self.new(2).method(:calc); end

  attr_accessor :value

  def initialize(i); @value = i; end

  def calc(x); @value + x; end
end

compose(Adder.make(2), Adder.make(1)).call(2).value == 5

Given a bounded methods to the compose function it still works!!

The return type can be anything on this simple pipeline, but it doesn't help when we need to deal with error and exceptions.

We are going to add 3 classes to simulate the unix pipeline:

  • Result (stdin - stdout)

Can be used to initialize the pipeline and hold the result of the computation.

  • Failure (stdin - stdout)

Indicates that we failed, but no exception was thrown.

  • Except (stderr)

We failed miserable...

class BaseValue
  attr_accessor :value
  def initialize(value); @value = value; end
end

class Result < BaseValue; end
class Failure < BaseValue; end
class Except < BaseValue; end

class Adder
  def self.make(x); self.new(2).method(:calc); end
  def initialize(i); @i = i; end

  def calc(x)
	Result.new(@i + x.value)
  end
end

class Double
  def self.make; self.new.method(:calc); end
  def initialize; end

  def calc(x)
	Result.new(2 * x.value)
  end
end

compose(Adder.make(2), Double.make, Result.new(2)).value == 8

This will help us to propagate the value across the entire pipeline.

We can use than to run the continuation when we match the correct result type.

class Result < BaseValue
  def on_success(f); f.call(value); end
  def on_failure(f); self; end
  def on_exception(f); self; end
end

class Failure < BaseValue
  def on_success(f); self; end
  def on_failure(f); f.call(value); end
  def on_exception(f); self; end
end

class Except < BaseValue
  def on_success(f); self; end
  def on_failure(f); self; end
  def on_exception(f); f.call(value); end
end

Result.new(2).on_success(-> (x) { Result.new(x + 1) }).value == 3
Failure.new(2).on_success(-> {}).value == 2

The only thing missing is the pipe.

We can write a class that can glue everything togheter.

class Pipe
  attr_accessor :work
  def initialize(initial_work); @work = initial_work; end

  def id(*x); x; end

  def pipe(work, on_failure = nil, on_exception = nil)
	pw = @work
	handle_failure = (on_failure or self.id)
	handle_exception = (on_exception or self.id)

	@work = ->(*input) {
	  pw.call(
		*input
	  ).on_success(
		work
	  ).on_failure(
		handle_failure
	  ).on_exception(
		handle_exception
	  )
	}

	self
  end

  def run(*input)
	@work.call(*input)
  end
end

Here is a simple usage of this lib (non-sense)...

class CreateUser
  def self.make(service)
    self.new(service).method(:execute)
  end

  def initialize(service)
    @service = service
  end

  def execute(data)
    Result.new(@service.create_user(data))
  rescue => e
    Except.new(e)
  end
end

class SendWelcomeEmail
  def self.make(mailerService)
    self.new(mailerService).method(:execute)
  end

  def initialize(service)
    @service = service
  end

  def execute(user)
    mailer = @service.prepare_email(user)
    if mailer.send
      Result.new(user)
    else
      Failure.new({ error: 'failed_to_send_email', 'user': user })
  rescue => e
    Except.new(e)
  end
end

class EnqueueEmail
  ...
  def execute(data) do
    if @mailer_job.enqueue_mail(data)
      Result.new(data.user)
    else
      Failure.new({ error: 'failed_to_enqueue_email', user: data.user })
  end
  ... 
end

Pipe.new(CreateUser.make(UserService.new))
	.pipe(SendWelcomeEmail.make(MailerService.new))
	.pipe(->(user) { user }, ReenqueueEmail.make())
	.run(user_input)