Robust and Fast Functional IO Toolkit

Raffiot is small (almost) dependency-free python library providing some usual functional tools. It currently provides

  • an easy-to-use IO monad which is stack-safe, fast, support asynchronous, concurrent, parallel programming, has many other features.
  • a Resource data type for easy but reliable resource management.
  • a Result data structure to represent errors

Demo

For a demo, just type this in a terminal:

curl https://raw.githubusercontent.com/chrilves/raffiot.py/main/demos/raffiot_demo.sh | /bin/sh

This demo runs 4 computations in parallel. It demonstrates how simple concurrent and parallel programing is in raffiot.

Note that this command will install raffiot in your current Python environment

Documentation

This Guide

This guide will teach you how to use Raffiot through examples. Just use the left panel or the right arrow on this page to jump to the next section.

API

The API is online at https://chrilves.github.io/raffiot.py/api/index.html.

Features

  • pure python: Raffiot is written entirely in Python 3.7+.
  • small: it is just a few small files.
  • (almost) dependency-free: it only depends on typing-extensions (for the @final annotation).
  • crystal clear code

IO

  • stack safe: you just won't run into stack overflows anymore.
  • fast: you won't notice the overhead.
  • dependency injection made easy: make some context visible from anywhere.
  • simple asynchronous and concurrent programming: full support of synchronous, asynchronous and concurrent programming with the same simple API.
  • railway-oriented programming: clean and simple failure management.
  • distinction between expected and unexpected failures: some failures are part of your program's normal behaviour (errors) while others are show something terribly wrong happened (panics). Yes, that's heavily inspired by Rust.

Resource

Python has the with construction, but Resource goes a step further.

  • easy user-defined resource creation: just provide some open and close function.
  • composability: the resource you want to create depends on another resource? Not a problem, you can compose resources the way you want. It scales.
  • failures handling in resources: Resource has everything IO has, including its wonderful failure management.

Result

Did I mention Railway-Oriented Programming? Result is represent the 3 possible result of a computation:

  • Ok(value): the computation successfully computed the this value.
  • Errors(errors): the computation failed on some expected failures errors, probably from the business domain.
  • Panic(exceptions, errors): the computation failed on some unexpected failures exceptions. Note that there may be some domain errors too.

First Steps with IO

Raffiot is available as a pip package (and soon conda). For now just type this in a terminal:

$ pip install -U raffiot

This guide will teach you how to use Raffiot by exploring most of its features via the Python interactive shell (also known as Python's REPL):

$ python
Python 3.9.1 (default, Dec 13 2020, 11:55:53) 
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

Start by importing Raffiot:

>>> from raffiot import *

Hello World!

Let's start by the classic "Hello World!":

>>> main : IO[None,None,None] = io.defer(print, "Hello World!")

As you can see, nothing is printed yet! The defer function delays the execution of print("Hello World!") until the the value main is run.

Very important: a value of type IO, like main, is the description of some computation, very much like the text of a Python script. Nothing is executed until the (value of type) IO is actually run, very much like the code of Python script is only executed when this script is run.

Inspecting the value main, gives you:

>>> main
Defer((<built-in function print>, ('Hello World!',), {}))

Running an IO is very simple! Just call its run method like:

>>> main.run(None)
Hello World!
Ok(success=None)
>>> main.run(None)
Hello World!
Ok(success=None)

As you can see, every call to run printed Hello World! and returned the value Ok(None). Ok means the computation was successful, None is the return value of computation.

defer : doing something later

The first argument of defer is the function you want to call later. The following arguments are the function's normal arguments. For example, to call datetime.now() later, just create the IO:

>>> from datetime import *
>>> now : IO[None,None,datetime] = io.defer(datetime.now)

Every time you run it, it will call datetime.now() and give you its result:

>>> now.run(None)
Ok(success=datetime.datetime(2021, 2, 19, 18, 38, 42, 572766))
>>> now.run(None)
Ok(success=datetime.datetime(2021, 2, 19, 18, 38, 47, 896153))

In the type IO[R,E,A], A is the type of values returned when the computation is successful. now being of type IO[None,None,datetime], it returns values of type datetime.

Likewise, you can define the print_io function that prints its arguments later:

>>> def print_io(*args, **kwargs) -> IO[None, None, None]:
...     return io.defer(print, *args, **kwargs)

Note that calling print_io("Hello") will not print anything but return an IO. To actually print Hello you need to run the IO:

>>> print_io("Hello", "World", "!")
Defer((<built-in function print>, ('Hello', 'World', '!'), {}))
>>> print_io("Hello", "World", "!").run(None)
Hello World !
Ok(success=None)

This ability to represent any computation as a value is one of the main strength of an IO. It means you can work with computation like any other value: composing them, storing them in variables, in lists, etc.

then : doing something sequentially.

You will often need to execute some IO sequentially. The method then compose some values of type IO, running them one by one. The return value is the one of the last IO:

>>> main : IO[None,None,datetime] = print_io("First line").then(
...   print_io("Second line"),
...   print_io("Third line"),
...   now
... )
>>> main.run(None)
First line
Second line
Third line
Ok(success=datetime.datetime(2021, 2, 20, 15, 52, 11, 205845))

You may sometimes prefer the analogous function io.sequence that behaves like the method then.

map : transforming results.

You can transform the return value of an IO using the map method. It is very similar to the map function on lists, but works on IO. Just provide map some function. It will use this function to transform the IO return value:

>>> date_str : IO[None,None,str] = now.map(lambda d: d.isoformat())
>>> date_str
Map((Defer((<built-in method now of type object at 0x7ff733070bc0>,
 (), {})), <function <lambda> at 0x7ff7338d9280>))
>>> date_str.run(None)
Ok(success='2021-02-19T23:54:46.297503')

flat_map : chaining IOs.

map transform the return value of an IO. So transforming the return value of date_str with print_io will give you an IO whose return value is also an IO. When you will run it, instead if executing the inner IO, it will return it to you:

>>> main : IO[None,None,IO[None,None,None]] = date_str.map(lambda x: print_io(x))
>>> main.run(None)
Ok(success=Defer((<built-in function print>, ('2021-02-20T15:54:38.444494',), {})))

When you want to use the result of some IO into some other IO, use flat_map:

>>> main: IO[None,None,None] = date_str.flat_map(lambda x: print_io(x))
>>> main.run(None)
2021-02-20T15:55:13.940717
Ok(success=None)

Here the return value of date_str is given to print_io via x and both IO are executed, returning the result of print_io(x).

flatten : concatenating an IO of IO.

Instead of having used flat_map, you could have used map and then flatten to reduce the IO of IO into a single layer of IO:

>>> main : IO[None,None,None]= date_str.map(lambda x: print_io(x)).flatten()
>>> main.run(None)
2021-02-20T15:58:44.244715
Ok(success=None)

Most of the time, you will use flat_map because it is simpler to use. But now you know where its name comes from: flatten of map.

pure : just a value.

pure is very simple: the result of the computation is the very same argument of pure:

>>> main : IO[None,None,int] = io.pure(5)
>>> main
Pure(5)
>>> main.run(None)
Ok(success=5)

It is very useful when some functions/method expect some IO but you want to provide a constant.

defer_io : computing an IO later.

defer_io is very much like defer but for functions returning IO:

>>> main : IO[None,None,IO[None,None,None]] = io.defer(print_io, "Hello", "World", "!")
>>> main.run(None)
Ok(success=Defer((<built-in function print>, ('Hello', 'World', '!'), {})))
>>> main : IO[None,None,None] = io.defer_io(print_io, "Hello", "World", "!")
>>> main.run(None)
Hello World !
Ok(success=None)

Like flat_map is faltten of map, defer_io is flatten of defer. It is useful to defer the call of function returning IO.

Use Case: Stack-Safety

Let's see one of the main feature of IO: it is stack safe! When you run the following function in Python, even if the argument times is small, the computation will fail miserably because it blew the stack:

>>> def print_date(times: int) -> None:
...   if times > 0:
...     d = datetime.now()
...     print(d.isoformat())
...     print_date(times - 1)
>>> print_date(1000)
<LOTS OF DATES>
2021-02-20T16:20:37.188880
2021-02-20T16:20:37.188883
2021-02-20T16:20:37.188886
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 5, in print_date
  File "<stdin>", line 5, in print_date
  File "<stdin>", line 5, in print_date
  [Previous line repeated 992 more times]
  File "<stdin>", line 4, in print_date
RecursionError: maximum recursion depth exceeded while calling a Python object
2021-02-20T16:20:37.188889

On the contrary, the equivalent function using IO will never blew the stack, even for very high values of times:

>>> def print_date_io(times: int) -> IO[None, None, None]:
...   if times > 0:
...     return (
...       now
...       .flat_map(lambda d: print_io(d.isoformat()))
...       .flat_map(lambda _: print_date_io(times - 1))
...     )
...   else
...     return io.pure(None)
>>> print_date(1000000)
<LOTS OF DATES>
2021-02-20T16:23:22.968454
2021-02-20T16:23:22.968464
Ok(success=None)

With IO, you can use recursion without fear! Just remember to wrap your computation in an IO (using defer, defer_io, map, flat_mapand others) to benefit from IO's safety.

Failures

All the computation we have seen until now were successful. We will now see how IO's failure management works. But before, I need to present you the Result[E,A] type.

The Result[E,A] type

Result[E,A] represent the result of a computation. A computation can either be successful, returning a value of type A, or have failed on some expected errors of type E, or failed one some unexpected exceptions. IO and Result make a distinction between expected failures, called errors, and unexpected failures, called panics.

For an operating system, an application crashing is an expected error. A well designed operating system is expected to be prepared to such errors. It has to deal with situations like this nicely and continue running normally. Errors are part of the normal life of your program. An error means that some operation failed but your program is still healthy.

On the contrary, memory corruption inside the kernel is an unexpected error. The system can not continue running normally. The failure may have made the computation dangerous and/or the result wrong. The only option is terminating the system as smoothly as possible. Panics should never happen, but sometimes even the most improbable events do occur. When panics happen, consider your computation lost. Terminate it doing as little damage as possible.

Raffiot is designed to report all encountered failures. Failures are never silently ignored but exposed via the Result type. This is why the Result type works with lists of domain failures (errors) and unexpected failures (panics). In addition, Raffiot saves the stack trace of every exception encountered so that you always know where the exception was risen, even with multiprocessing!

For the rest of this section, you will need these imports.

>>> from raffiot import *

Ok(success:A) : A Success

When a computation successfully return some value a of type A, it actually returns the value Ok(a) of type Result[E,A]. The value a can be obtained by Ok(a).success:

>>> from typing import Any
>>> r : Result[Any,int] = Ok(5)
>>> r
Ok(success=5)
>>> r.success
5

Errors(errors:List[E]) : Some Expected Failures

When a computation fail because of an error e of type E, it actually return a value Errors(errors=[e]) of type Result[E,Any]. The type E can be any type you want. Choose as type E a type that fit your business domain errors the best. The list of all errors encountered can be obtained by Errors([e]).errors:

>>> r : Result[int,Any] = Errors([5])
>>> r
Errors(errors=[5])
>>> r.errors
5

Note that you must ALWAYS provide a list to Errors! So, to avoid bugs, please use the method result.error(e: E) -> Result[E,Any]when you want to raise a single error orresult.errors(e1:E, e2:E, ...) -> Result[E,Any]` when you want to raise several errors:

>>> result.error(5)
Errors(errors=[5])
>>> result.errors(5, 2)
Errors(errors=[5, 2])

Traced Exceptions: Never lose a stack trace!

Raffiot lift every exception encountered during a computation into a TracedException. This is a very simple type:

@dataclass
class TracedException(Exception):
    exception: Exception
    stack_trace: str

It packs an exception with its stack trace. To the best of our knowledge, Python does not seem to store the stack trace with the exception but only keep the stack trace of the last one encountered. This is a huge problem because in a parallel or concurrent computation, many exception can be raised. If you use multiprocessing, they can even be raises on a different process!

When you have an exception exn, there are two ways to build a TracedException:

  • If you are in the except clause of the try-except block that caught the exception exn, you can call the static method TracedException.in_except_clause(exn) to capture the stack trace of the exception. Note that it only works in the except clause!
  • If you are not in the except clause of the try-except block that caught the exception exn, you can call the static method TracedException.with_stack_trace(exn) to capture the current stack trace.

Panic(exceptions: List[TracedException], errors: List[E]) : Some Unexpected Failures

When a computation fail because of a exception exn, it actually return a value Panic(exceptions=[TracedException(exception=exn, stack_trace="...")],errors=[]) of type Result[Any,Any]. Note that a Panic stores a list of traced exceptions and not a single one. The original exception exn can be obtained by Panic([TracedException(exn)],[]).exceptions[0].exception and its stack trace by Panic([TracedException(exn)],[]).exceptions[0].stack_trace.

>>> r : Result[Any,Any] = Panic(
...   exceptions=[TracedException.with_stack_trace(Exception("BOOM!"))],
...   errors=[]
... )
>>> r
Panic(
  exceptions=[TracedException(exception=Exception('BOOM!'), stack_trace='...')],
  errors=[]
)
>>> r.exceptions
[TracedException(exception=Exception('BOOM!'), stack_trace='...')]
>>> r.errors
[]
>>> r.exceptions[0].exception
Exception('BOOM!')
>>> print(r.exceptions[0].stack_trace)
  File "<stdin>", line 2, in <module>
  File "/raffiot.py/raffiot/utils.py", line 63, in with_stack_trace
    return TracedException(exception=exn, stack_trace="".join(format_stack()))

When an unexpected failures happen, there may have already been some expected failures. This is why the Panic case have a list of errors slot. The list of exceptions should never be empty (otherwise it shouldn't be a Panic). Using the Panic constructor is error-prone as you must always provide lists for the exceptionsand errorsfield. Instead you can use the helper function result.panic:

>>> result.panic(Exception("BOOM"))
Panic(exceptions=[TracedException(exception=Exception('BOOM'), stack_trace='...')], errors=[])
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"))
Panic(exceptions=[
    TracedException(exception=Exception('BOOM 1'), stack_trace='...'),
    TracedException(exception=Exception('BOOM 2'), stack_trace='...')
  ],
  errors=[])
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"), errors=[5,2])
Panic(exceptions=[
    TracedException(exception=Exception('BOOM 1'), stack_trace='...'),
    TracedException(exception=Exception('BOOM 2'), stack_trace='...')
  ],
  errors=[5, 2]
)

fold : transforming a Result[E,A]

To transform a Result[E,A], use the method fold. It takes as argument three functions. The first one is called when the result is an Ok. The second is called when it are some Errors. The third is called on Panic. When called, each of these function receive as argument the value/list of errors/list of traced exceptions and errors (depending on the case) stored in the result:

>>> Ok(5).fold(
...   lambda s: f"success: {s}",
...   lambda e: f"errors: {e}",
...   lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'success: 5'
>>> result.errors(7,5).fold(
...   lambda s: f"success: {s}",
...   lambda e: f"errors: {e}",
...   lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'error: [7,5]'
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"), errors=[5,2]).fold(
...   lambda s: f"success: {s}",
...   lambda e: f"errors: {e}",
...   lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'traced exeptions: [
    TracedException(exception=Exception(\'BOOM 1\'), stack_trace=\'...\'),
    TracedException(exception=Exception(\'BOOM 2\'), stack_trace=\'...\')
  ],
  errors: [5, 2]'

raise_on_panic : reporting panics as exceptions

Raffiot's functions and methods never raise exception but instead return a Panic. When you need a failed computation to raise the exception, call raise_on_panic on the result:

>>> Ok(5).raise_on_panic()
Ok(success=5)
>>> result.error(7).raise_on_panic()
Errors(errors=[7])
>>> result.panic(Exception("BOOM!")).raise_on_panic()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/dev/raffiot.py/raffiot/result.py", line 441, in raise_on_panic
    raise MultipleExceptions.merge(*self.exceptions, errors=self.errors)
raffiot.utils.TracedException: BOOM!
  File "<stdin>", line 1, in <module>
  File "/dev/raffiot.py/raffiot/result.py", line 559, in panic
    exceptions=TracedException.ensure_list_traced(exceptions),
  File "/dev/raffiot.py/raffiot/utils.py", line 76, in ensure_list_traced
    return [cls.ensure_traced(exn) for exn in exceptions]
  File "/dev/raffiot.py/raffiot/utils.py", line 76, in <listcomp>
    return [cls.ensure_traced(exn) for exn in exceptions]
  File "/dev/raffiot.py/raffiot/utils.py", line 70, in ensure_traced
    return cls.with_stack_trace(exception)
  File "/dev/raffiot.py/raffiot/utils.py", line 66, in with_stack_trace
    return TracedException(exception=exn, stack_trace="".join(format_stack()))

raise_on_panic and unsafe_get are the only functions/methods raising exceptions. Never expect other functions to raise exception on failures, they will return an Errors or Panic instead:

>>> main : IO[None,None,None] = io.panic(Exception("BOOM!"))
>>> main.run(None)
Panic(exceptions=[
  TracedException(exception=Exception('BOOM!'), stack_trace='...')
  ], errors=[])
>>> main.run(None).raise_on_panic()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/raffiot.py/raffiot/result.py", line 441, in raise_on_panic
    raise MultipleExceptions.merge(*self.exceptions, errors=self.errors)
raffiot.utils.TracedException: BOOM!
  File "<stdin>", line 1, in <module>
  File "/raffiot.py/raffiot/io.py", line 467, in panic
    TracedException.ensure_list_traced(exceptions),
  File "raffiot.py/raffiot/utils.py", line 76, in ensure_list_traced
    return [cls.ensure_traced(exn) for exn in exceptions]
  File "raffiot.py/raffiot/utils.py", line 76, in <listcomp>
    return [cls.ensure_traced(exn) for exn in exceptions]
  File "raffiot.py/raffiot/utils.py", line 70, in ensure_traced
    return cls.with_stack_trace(exception)
  File "/raffiot.py/raffiot/utils.py", line 66, in with_stack_trace
    return TracedException(exception=exn, stack_trace="".join(format_stack()))

Errors: Expected failures in IO

IO has built-in support errors. Remember that we call error the expected failures. Errors can be of any type you want. You should usually take as error type the one that fit your business domain errors the best. In the type IO[R,E,A], E is the type of errors.

error : this failure was expected, we're still in safe zone!

To raise an error, simply call io.error:

>>> from typing import Any, List

>>> main_str : IO[None,str,Any] = io.error("Oops")
>>> main_str.run(None)
Errors(errors=['Oops'])

>>> main_int : IO[None,int,Any] = io.error(5)
>>> main_int.run(None)
Errors(errors=[5])

>>> main_list : IO[None,List[int],Any] = io.error([1,2,3])
>>> main_list.run(None)
Error(error=[[1, 2, 3]])

errors : raising several errors at once.

To raise several errors, simply call io.errors:

>>> from typing import Any, List

>>> main_str : IO[None,str,Any] = io.errors("Oops1", "Oops2")
>>> main_str.run(None)
Errors(errors=['Oops1', 'Oops2'])

>>> main_int : IO[None,int,Any] = io.errors(5,7)
>>> main_int.run(None)
Errors(errors=[5, 7])

But beware: the iterable arguments will be treated as collections of errors and not a single error. For example the error [1,2,3] will be treated by io.errors as three errors 1, 2 and 3. So when you want to raise iterable errors, use io.error instead:

>>> main_list : IO[None,List[int],Any] = io.errors([1,2,3])
>>> main_list.run(None)
Errors(errors=[1, 2, 3])

catch : reacting to expected failures to continue.

To react to errors, just call the method catch. It is very similar to a try-exept block. It takes as argument a function called the error handler. When the computation fails because of some errors le: List[E], the error handler is called with le as argument. The result is then the error handler's result.

>>> def main(i: int) -> IO[None,str,int]:
...   return (
...   io.error(i)
...   .catch(lambda x:
...     io.pure(2*x[0])
...     if x[0] % 2 == 0
...     else io.error("Oops"))
... )
>>> main(5).run(None)
Errors(errors=['Oops'])
>>> main(6).run(None)
Ok(success=12)

If the computation is successful, or if it failed on a panic, then catch has no effect. Note that the error handler can itself raise errors and panics.

map_error : transforming expected failures.

It is often useful to transform an error. For example you may want to add some useful information about the context: what was the request that led to this error, what were the arguments of the operation that failed, etc.

To transform an error, call map_error. It behaves like map, but on errors:

>>> main : IO[None,int,Any] = io.error([1,2]).map_error(lambda l: l[0] + l[1])
>>> main.run(None)
Errors(errors=[3])

If the computation is successful or if it fails on a panic, then map_error has no effect.

Panics: Unexpected Failures in IO

The type of panics is always the Python type of exception Exception. Panics can be raise either manually, by calling io.panic, or when run encounters an exception. The method run on IO never raises exceptions! Every exception p raised during the execution of run are caught and transformed into panics result.panic(p).

ALL exceptions are caught

All the functions and method that accept functions as arguments run them inside a try-except block to catch every raised exception. All exception caught this way are transformed into panics:

>>> main : IO[None,None,float] = io.pure(0).map(lambda x: 1/x)
>>> main.run(None)
Panic(exceptions=[TracedException(
    exception=ZeroDivisionError('division by zero'),
    stack_trace='...')
  ], errors=[])

Remember that panics are unexpected failures and unexpected failures should never happen. In an ideal world you should never give to map, flat_map, defer and others functions that may raise exceptions. In an ideal world, panics would never occur. But we do not live in an ideal world, so map, flat_map and others covers your back by catching any exception for your own safety.

panic : something went terribly wrong, we're in unsafe zone.

It is sometimes useful to manually raise panics. For example when you encounter a problematic situation you though were impossible but still did happen. If your program is not designed to handle this situation, you should raise a panic.

>>> main : IO[None,None,Any] = io.panic(Exception("BOOM!"))
>>> main.run(None)
Panic(exceptions=[TracedException(
    exception=Exception('BOOM!'),
    stack_trace='...')
  ], errors=[])

The function io.panic accepts several exceptions as arguments and even domain errors with the errors keyword argument.

recover : stopping the computation safely after an unexpected failure.

Even in case of a panic, you have a chance to perform some emergency actions to recover. For example, you may want to restart the computation on panics. To react to panics, just call recover. It is very similar to a try-exept block. It takes as argument a function, called the panic handler. If the computation fails because of a panic with exceptions lexn: List[TracedException], and errors lerr: List[E], then the panic handler is called with lexn as first argument and lerras its second argument. The result is then the the handler's result.

>>> main : IO[None,None,Any] = (
...   io.panic(Exception("BOOM!"))
...   .recover(lambda lexn, lerr: io.pure(
...     f"Recovered from traced exceptions: {lexn} and errors {lerr}")
...   )
... )
>>> main.run(None)
Ok(success='Recovered from traced exceptions:[TracedException('
  'exception=Exception(\'BOOM!\'), stack_trace=\'...')] and errors []')

map_panic : transforming traced exceptions.

It is often useful to transform a panic. For example you may want to add some useful information about the context: what was the request that led to this error, what were the arguments of the operation that failed, etc.

To transform all the exceptions of a Panic, call map_panic. It behaves like map, but on exceptions contained in Panic:

>>> main : IO[None, None, None] = (
...   io.pure(0)
...   .map(lambda x: 1/x)
...   .map_panic(lambda traced_exception:
...     TracedException(
...       exception=Exception("BOOM"),
...       stack_trace=traced_exception.stack_trace
...     )
...   )
... )
>>> main.run(None)
Panic(exceptions=[TracedException(exception=Exception('BOOM'),
 stack_trace='...')], errors=[])

More tools

Here are some very useful functions. They can all be expressed using the functions and methods seen above but they deserve being seeing in details:

attempt : failures as values.

The method attemp transform an IO[R,E,A] into IO[R,None,Result[E,A]]. If the original computation is successful, the transformed one returns an Ok(Ok(...)). If the original computation fails on some errors, the transformed one returns Ok(Errors(...)). If the original computation fails on a panic, the transformed one returns Ok(Panic(...)):

>>> io_ok : IO[None,None,Result[None,int]] = io.pure(5).attempt()
>>> io_ok.run(None)
Ok(success=Ok(success=5))
>>> io_error : IO[None,None,Result[int,None]] = io.error(7).attempt()
>>> io_error.run(None)
Ok(success=Errors(errors=[7]))
>>> io_panic : IO[None,None,Result[None,None]] = io.panic(Exception("BOOM!")).attempt()
>>> io_panic.run(None)
Ok(success=Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
 stack_trace='...')], errors=[]))

It is hugely useful when you want to do different actions depending on the result of a computation. Calling attempt and flat_map is easier than combining flat_map, catch and recover correctly.

from_result : from Result[E,A] to IO[None,E,A]

The function io.from_result does almost the opposite of attempt. It transform a Result[E,A] into the corresponding IO[None,E,A]:

>>> io_ok : IO[None,None,int] = io.from_result(Ok(5))
>>> io_ok.run(None)
>>> io_error : IO[None,int,None] = io.from_result(result.error(5))
>>> io_error.run(None)
Errors(errors=[5])
>>> io_panic : IO[None,None,None] = io.from_result(result.panic(
...   TracedException(Exception("BOOM!"),"")))
>>> io_panic.run(None)
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
  stack_trace='')], errors=[])

from_result is often useful after an attempt to restore the state of the computation.

finally_ : doing something unconditionally.

The method finally_ is the finally clause of a try-except-finally. It executed a IO after the current one, discard its result and restore the result of the current one. The IO executed after is actually a function taking as argument a Result[R,E] from the preceding IO. It enables to perform different actions depending on the result of the first computation:

>>> io.pure(5).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Ok(success=5)
Ok(success=5)
>>> io.error(7).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Errors(errors=[7])
Errors(errors=[7])
>>> io.panic(Exception("BOOM!")).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
  stack_trace='...')], errors=[])
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
  stack_trace='...')], errors=[])

on_failure : reaction to both errors and panics.

Calling both methods catch and recover is sometimes annoying. When you need to react to any failure, call on_failure. It takes as argument a function called the failure handler. When a computation fails, it calls the failure handler with the failure passed as argument as a Result[E,None].

This Result[E,None] is never Ok because on_failure call the handler only on failures.

>>> io.pure(5).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=5)
>>> io.error(7).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=12)
>>> io.panic(Exception("BOOM!")).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=12)

Context

The time has come to talk about the third type parameter of an IO. In the type IO[R,E,A], R is the type of the context. The context is a value that is always accessible to any IO. You can think of it as a local global variable. I assure you this sentence make sense!

run : executing an IO in a given context.

We have called the method run many times. And every time we gave it None as argument. The argument you give to run is actually the context value. You take any value you want as the context. Usually the context is a value you want to be accessible from everywhere.

Global variables are indeed accessible from everywhere but they are very annoying because they can only have one value at a time. Furthermore every change made to the global variable by a part of your code will affect all other parts reading this global variable.

On the opposite side, local variable are nice because every part of your code can have its own dedicated local variable. But they are annoying because to make them accessible everywhere, you have to pass it to every functions as parameters. This is error prone and pollutes your code.

Given an IO named main, when you call run with some context r, the value r is accessible from everywhere in main. The context behaves like a global variable inside the running IO. But you can pass every call to run a different context. The context behaves like a local variable between different calls to run.

read : accessing the shared context.

To access the context, just call the function io.read. Its result is the context:

>>> main : IO[int,None,int] = io.read
>>> main.run(5)
Ok(success=5)
>>> main.run(77)
Ok(success=77)

This example is indeed trivial. But imagine that main can be a very big program. You can call io.read anywhere in main to get the context. It saves you the huge burden of passing the context from run to every function until it reaches the location of io.read.

contra_map_read : transforming the shared context.

As I said, the context behaves as a local global variable. With io.read you have seen its global behaviour. The method contra_map_read transforms the context, but only for the IO it is being called on. Note that the context is transformed before the IO is executed.

The method contra_map_read is very useful when you need to alter the context. For example, your program may start with None as context, read its configuration to instantiate the services it wants to inject and then change to context to pass these services everywhere for dependency injection.

>>> main : IO[str,None,int] = io.read.contra_map_read(lambda s: len(s))
>>> main.run("Hello")
Ok(success=5)

defer_read : doing something context-dependent later.

The function io.defer_read is useful when you it is easier to implement an IO as a normal function and then transform it into an IO. io.defer_read, like io.defer, takes as first argument the function you want to execute later. But unlike io.defer, this function:

  • has access to the context
  • returns a Result[E,A]

The last point is important because it means the function can raise errors while io.defer can only raise panics.

>>> def f(context:int, i: int) -> Result[str,int]:
...   if context > 0:
...     return result.ok(context + i)
...   else:
...     return result.error("Ooups!")
>>> main : IO[int,None,int] = io.defer_read(f, 5)
>>> main.run(10)
Ok(success=15)
>>> main.run(-1)
Error(error='Ooups!')

defer_read_io : computing a context-dependent IO later.

The function io.defer_read_io is the IO counterpart of io.defer_read. It is useful when the context-aware function you want to execute later returns an IO.

>>> def f(context:int, i:int) -> IO[None,str,int]:
...   if context > 0:
...     return io.pure(context + i)
...   else:
...     return io.error("Ooups!")
>>> main : IO[int,None,int] = io.defer_read_io(f, 5)
>>> main.run(10)
Ok(success=15)
>>> main.run(-1)
Errors(errors=['Ooups!'])

Use Case: Dependency Injection

As a demonstration of how dependency injection works in Raffiot, create and fill the file dependency_injection.py as below:

from raffiot import *

import sys
from dataclasses import dataclass
from typing import List

@dataclass
class NotFound(Exception):
  url: str

class Service:
  def get(self, path: str) -> IO[None,NotFound,str]:
    pass

class HttpService(Service):
  def __init__(self, host: str, port: int) -> None:
    self.host = host
    self.port = port
  
  def get(self, path: str) -> IO[None,NotFound,str]:
    url = f"http://{self.host}:{self.port}/{path}"

    if path == "index.html":
      response = io.pure(f"HTML Content of url {url}")
    elif path == "index.md":
      response = io.pure(f"Markdown Content of url {url}")
    else:
      response = io.error(NotFound(url))
    return io.defer(print, f"Opening url {url}").then(response)

class LocalFileSytemService(Service):
  def get(self, path: str) -> IO[None,NotFound,str]:
    url = f"/{path}"

    if path == "index.html":
      response = io.pure(f"HTML Content of file {url}")
    elif path == "index.md":
      response = io.pure(f"Markdown Content of file {url}")
    else:
      response = io.error(NotFound(url))
    return io.defer(print, f"Opening file {url}").then(response)


main : IO[Service,NotFound,List[str]] = (
  io.read
  .flat_map(lambda service:
    service.get("index.html")
    .flat_map(lambda x:
      service.get("index.md")
      .flat_map(lambda y: io.defer(print, "Result = ", [x,y]))
    )
  )
)

if len(sys.argv) >= 2 and sys.argv[1] == "http":
  service = HttpService("localhost", 80)
else:
  service = LocalFileSytemService()

main.run(service)

Running the program with the HTTPService gives:

$ python dependency_injection.py http
Opening url http://localhost:80/index.html
Opening url http://localhost:80/index.md
Result =  ['HTML Content of url http://localhost:80/index.html', 'Markdown Content of url http://localhost:80/index.md']

While running it with the LocalFileSytemService gives:

$ python dependency_injection.py localfs
Opening file /index.html
Opening file /index.md
Result =  ['HTML Content of file /index.html', 'Markdown Content of file /index.md']

Once again, main is still a very short IO, so it may be not trivial how much the context is a life saver. But imagine the main IO to be one of your real program. It would be much bigger, and passing the context as global or local variables would be a much bigger problem.

Combining a list of IOs

Remember that an IO is the description of some computation, like a source code. As such, it can be manipulated like any value, and so stored in lists. This section covers the operations you have on lists of IOs.

zip : from a list of IO to an IO of list

The zip function and method transform a list of IO into an IO returning a list. Each value of the resulting list is the value returned by the IO at the same location in the input list.

The whole computation fails on the first failure encountered.

>>> from typing import List
>>> main : IO[None,None,List] = io.zip(io.pure(8), io.pure("Hello"))
>>> main.run(None)
Ok(success=[8, 'Hello'])
>>> main : IO[None,str,List] = io.zip(io.pure(8), io.error("Oups"))
>>> main.run(None)
Errors(errors=['Oups'])

sequence : running IOs in sequence

The function io.sequence is like the method then: it executes a list of IO sequentially, returning the value of the last IO and failing on the first failure encountered:

>>> main : IO[None,None,int] = io.sequence(
...   io.defer(print, "Hello"),
...   io.defer(print, "World"),
...   io.pure(12)  
... )
>>> main.run(None)
Hello
World
Ok(success=12)

traverse : almost like map

The function io.traverse is very much like the function map on lists. Like the map on lists it applies a function to every element of a list. But unlike the map on lists the function it applies to every element returns an IO.

io.traverse is useful when you need to execute a function returning an IO to every element of a list. It returns an IO computing the list of values returned by each call.

Like zip and sequence, it fails on the first failure encountered.

>>> from typing import List
>>> def add_context(i: int) -> IO[int,None,int]:
...   return io.read.flat_map(lambda c:
...     io.defer(print, f"context = {c}, argument = {i}")
...     .then(io.pure(c + i))
...   )
>>> main : IO[int,None,List[int]] = io.traverse(range(10), add_context)
>>> main.run(10)
context = 10, argument = 0
context = 10, argument = 1
context = 10, argument = 2
context = 10, argument = 3
context = 10, argument = 4
context = 10, argument = 5
context = 10, argument = 6
context = 10, argument = 7
context = 10, argument = 8
context = 10, argument = 9
Ok(success=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19])

Asynchronous and Concurrent Programming

For now you know that an IO is very nice for stack safety, dependency injection, failure management, and code-as-date manipulations. There is another big feature IO has: simple asynchronous and concurrent programming.

run : the second and third argument.

An IO is executed on a pool of threads. Until now we only gave io.run one argument: the context. But io.run accepts three arguments! The second one is the number of threads in the pool and the third one is how long a thread goes to sleep when idle.

The number of threads in the pool is fixed so you should never call a blocking function inside one of the pool's thread. Create a new thread and run the blocking operation inside using async_.

When one of thread has no fibers to run, it call time.sleep to avoid wasting precious CPU cycles doing nothing. The third parameter of run is the amount of time an idle thread sleeps (a float of the number of seconds to sleep).

Because of the infamous Python's Global Interpreter Lock Python can not run thread in parallel. So if your code only uses one 100% of a single core, this is normal. You know the story: Python is single-threaded.

To use n thread with an idle time of idle_time seconds, just give n and idle_time to io.run:

>>> io.pure(5).run(None, 50, 0.01)
Ok(success=5)

Asynchronous Programming

A call to some function is called synchronous when the thread making the call actually waits for the call to return a value. This is annoying because the thread could be used to perform useful computations instead of just waiting.

On the contrary, a call is said asynchronous when the tread making the call does not wait for the call to finish but run useful computations in the mean time.

The notorious expression callback Hell kindly expresses how asynchronous programming can be error-prone, hard to write and hard to read.

Asynchronous programming is all about callbacks, but fortunately, programming models were created to hide much of its complexity under a clean and simple interface. The famous Promise of JavaScript is such an interface. The async/await syntax of many languages, including Python, is also such an interface. So is Raffiot's IO. But unlike the async/await syntax, synchronous and asynchronous code can be transparently mixed with IO.

async_ : running something asynchronously

Calling a function f usually looks like this:

>>> def f():
...   print("f is running")
...   return 3
>>> def main():
...   print("f not started yet")
...   result = f()
...   print(f"f finished and returned {result}")
>>> main()
f not started yet
f is running
f finished and returned 3

When the function main calls f, it waits for f to finish. When f finishes, main resumes its computation with the result of f.

Asynchronous functions, like apply_async do not work this way. Calling an asynchronous function fasync usually looks like this.

>>> import time
>>> from multiprocessing import Pool
>>>
>>> def f():
...  print("f is running")
...  return 3
>>>
>>> with Pool(4) as pool: 
...   def fasync(callback):
...     pool.apply_async(f, callback = callback)
...   
...   def main():
...     print("fasync not started yet")
...   
...     def callback(result):
...       print(f"fasync finished and returned {result}")
...     
...     fasync(callback)
...     print("fasync started")
...    
...   main()
...   time.sleep(0.5)
fasync not started yet
fasync started
f is running
fasync finished and returned 3

As you can seen, the function main does not wait that f finishes but continues its execution printing fasync started. The function main can not get the result of f so it defines a function, called a callback, to process the result of when it finishes.

With Raffiot's IO you would write:

>>> import time
>>> from multiprocessing import Pool
>>> from raffiot import *
>>>   
>>> def f():
...   print("f is running")
...   return 3
>>>  
>>> with Pool(4) as pool: 
...   f_io : IO[None,None,int] = (
...     io.async_(
...       lambda r, k:
...         pool.apply_async(f, callback = lambda r: k(Ok(r)))
...     )
...   )
... 
...   main : IO[None,None,None] = io.sequence(
...     io.defer(print, "fasync not started yet"),
...     f_io.flat_map(lambda result:
...       io.defer(print, f"fasync finished and returned {result}")
...     ),
...     io.defer(print, "fasync started")
...   )
... 
...   main.run(None)
fasync not started yet
f is running
fasync finished and returned 3
fasync started

Concurrent Programming

Concurrent programming is about running things "in parallel". Raffiot can run a large number of concurrent computation simply and safely:

parallel : running concurrent tasks

The function io.parallel runs a list of IOs in parallel. Remember that because of Python's Global Interpreter Lock only one thread executing Python's code can be running at any time. But your code involves a lot of primitives written in C/C++/etc, then you might get lucky and use all of your cores.

parallel returns a list of values called fibers. A fiber represents a tasks running in parallel/concurrently. Every fiber in the returned list correspond to the IO at the same location in the argument list. For example in

io.parallel(ios).flat_map(lambda fibers: ...)

for every index i, fibers[i] is the fiber representing the computation of the IO ios[i] running in parallel/concurrently.

>>> import time
>>> def task(i: int) -> IO[None,None,None] :
>>>   return io.defer(print, f"Task {i}: Begin").then(
...     io.defer(time.sleep, 1),
...     io.defer(print, f"Task {i}: End")
...   )
>>> main : IO[None,None,None] = (
...   io.parallel([task(i) for i in range(6)])
...   .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 4: Begin
Task 2: Begin
Finished
Task 5: Begin
Task 0: End
Task 1: End
Task 3: End
Task 4: End
Task 2: End
Task 5: End
Ok(success=None)

As you can see, main does not wait for the IOs running in parallel/concurrently to continue its execution.

wait : waiting for concurrent tasks to end

Sometimes you want to wait for a parallel/concurrent computation to finish. Remember that parallel/concurrent computation are represented by the fibers returned by io.parallel.

To wait for some fibers to finish, just call io.wait with the list of fibers you want to wait on. The result of wait is the list of all the fibers results (of type Result[E,A]). For example, in

io.wait(fibers).flat_map(lambda results: ...)

for any index i, result[i] of type Result[E,A] is the result of the computation represented by the fiber fibers[i].

>>> main : IO[None,None,None] = (
...   io.parallel([task(i) for i in range(6)])
...   .flat_map(lambda fibers: io.wait(fibers))
...   .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 5: Begin
Task 4: Begin
Task 2: Begin
Task 0: End
Task 3: End
Task 1: End
Task 5: End
Task 4: End
Task 2: End
Finished
Ok(success=None)

yield_ : letting other task progress

Remember that an IO runs on a pool of thread. There there is more IOs to run than the number of threads to run on, there is a chance that some IO will not get executed. An IO can explicitly release its thread for a moment to let other tasks a chance to progress.

Call io.yield_ to release the current thread. The IO will make a break and continue its execution later.

>>> main : IO[None,None,None] = io.defer(print, "Hello").then(
...   io.yield_,
...   io.defer(print, "World!") 
... )
>>> main.run(None)
Hello
World!
Ok(success=None)

Controlling Concurrency

Sometimes you want to prevent some fibers to run concurrently. For example you may want to avoid several fibers modifying variables at the same time or avoiding too many fibers to access some resources.

reentrant_lock: only one fiber at a time.

The primitive resource.reentrant_lock ensures that only one fiber can run a portion of code at a time:

resource.reentrant_lock: IO[Any, None, Resource[Any, None, None]]

Let's take an example. The class Shared represents any class defining mutable objects. In our example, calling the set method change the object's attribute value:

>>> from raffiot import *
>>> from typing import Any
>>>  
>>> class Shared:
...     def __init__(self):
...         self._value = 0
...  
...     def get(self) -> int:
...         return self._value
...  
...     def set(self, i: int) -> None:
...         self._value = i
>>>  
>>> shared_object = Shared()

The increment IO does exactly as its name suggests: it reads the shared object attribute value using the method get, wait for one second and set the attribute with value + 1 using the set method:

>>> increment: IO[Any,None,None] = (
...   io.defer(shared_object.get)
...     .flat_map(lambda value:
...       io.sleep(1)
...         .then(io.defer(shared_object.set, value + 1))
...     )
... )
>>> shared_object.get()
0
>>> increment.run(None)
Ok(success=None)
>>> shared_object.get()
1

Running increment several times concurrently is unsafe:

>>> shared_object.get()
1
>>> io.parallel(increment, increment).run(None)
Ok(success=...)
>>> shared_object.get()
2

Although the value was 1 and increment has been called twice, its final value is 2 instead of the expected 3. The reason if the issue is they both have read the value 1 at the same time, and so both written value + 1 == 2 instead of 3. We need to prevent one instance of increment to run if another one is already running. We can do so using reentrant_lock:

>>> shared_object.get()
2
>>> resource.reentrant_lock.flat_map(lambda lock:
...   io.parallel(
...     lock.with_(increment),
...     lock.with_(increment)
...   )
>>> ).run(None)
Ok(success=[...])
>>> shared_object.get()
4

reentrant_lock gives us a lock which is a Resource. The two instances of increment still runs in parallel, but inside a lock.with_(an_io) which prevent them from running at the same time. The first instance to take the lock forces the second one to wait it releases it.

You will learn more about Resource in the section Resource Management, but for now just remember that you can prevent some fibers to run concurrently by creating a lock with reentrant_lock and using lock.with_ to wrap portion of the code you want to avoid being accessed concurrently. The type Resource is used to ensure that the lock will always be released, even if the computation fails.

Note: every call to reentrant_lock gives back a different lock.

Unlike the python equivalent threading.Lock, Raffiot's locks do not block threads, they only block fibers.

In addition, the these locks are reentrant, which means that the fiber that have the lock can still acquire it without blocking.

semaphore: limited resource.

The primitive resource.semaphore is useful to simulate limited resources.

Imagine you have to call an API for which it is forbidden to make more than n concurrent calls, semaphore is the way to go:

resource.semaphote(tokens: int): IO[Any, None, Resource[Any, None, None]]

The parameter tokens is the number of fibers the semaphore will allow to run concurrently:

>>> from raffiot import *
>>> from typing import Any
>>>  
>>> def fiber(sem, i:int) -> IO[Any, None, None]:
...   return sem.with_(io.defer(print, f"Fiber {i} running!"))
>>>  
>>> resource.semaphore(5).flat_map(lambda sem:
...   io.parallel([fiber(sem, i) for i in range(100)])
... ).run(None)

Even though there are 100 fibers running concurrently, there will be only 5 concurrent calls to print.

Time

Time functions enable you to schedule some computations in the future or to stop a fiber until a point in time is reached.

sleep: making a break

To pause an IO for some time, just call io.sleep with the number of seconds you want the IO paused:

>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> main : IO[None, None, None] = now.then(io.sleep(2), now)
>>> main.run(None)
1615136436.7838593
1615136438.785897
Ok(success=None)

Calling io.sleep(0) does nothing. The IO is guaranteed to be paused for at least the time you requested, but it may sleep longer! Especially when threads are busy.

sleep_until: waking up in the future.

To pause an IO until some determined time in the future, call io.sleep_until with the desired epoch:

>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> time()
1615136688.9909387
>>> main : IO[None, None, None] = now.then(io.sleep_until(1615136788), now)
>>> main.run(None)
1615136713.6873975
1615136788.0037072
Ok(success=None)

Calling io.sleep_until with an epoch in the past does nothing. The IO is guaranteed to be paused until the epoch you requested is reached but it can sleep longer! Especially when threads are busy.

Resource Management

Resource management is the safe creation and release of resources: files, connection handlers, etc. Resource management ensures all created resources will be released this avoiding resource leaks.

Consider the code below accessing a database:

>>> connection = connect_to_database()
>>> connection.exectute_sql_query(...)
>>> connection.close()

If the execution of the SQL query raise an exception, the connection is never closed. Having too many unused connection opened may forbid other parts of the code from creating new connections, or may slow down the database or even crash the application.

Fortunately Python has a built-in support for resource management with the with syntax:

>>> with connect_to_database() as connection:
...   connection.exectute_sql_query(...)

Sometimes the resource you want to create depends on another resource. For example the connection configuration to the database could be stored in a configuration file that need to be opened and closed:

>>> with open_config_file() as file_content:
...   config = read_config(file_content)
...   with connect_to_database(config) as connection:
...     connection.exectute_sql_query(...)

Once again Python's with-statement covers this case nicely. But there are two issues with Python's built-in resource management:

  1. it is not trivial to pack two dependent resources into one.
  2. it is not trivial to create your own resources.

This is where Raffiot's resource management comes in. Creating a Resource is as simple as providing a function to create the resource and one to release it. You can also lift any Python's "with-enabled" resource to Raffiot's Resource by a single function call.

Resource do compose very well too with the same API as IO. In fact Resource is built upon IO. It has almost all of its functionalities. Here are the imports for this section:

>>> from raffiot import *
>>> from typing import Tuple, List, Any

Let's start by defining an IO that generates a random string every time it runs:

>>> import random
>>> import string
>>> rnd_str : IO[None, None, str] = (
...   io.defer(lambda:
...     ''.join(
...       random.choice(string.ascii_uppercase + string.digits)
...      for _ in range(8)
...     )
...   )
... )
>>> rnd_str.run(None)
Ok(success='CCQN80YY')
>>> rnd_str.run(None)
Ok(success='5JEOGVZS')
>>> rnd_str.run(None)
Ok(success='ZNLWSH1B')
>>> rnd_str.run(None)
Ok(success='MENS91RD')

from_open_close_io : Creating a Resource from open/close IOs

The Resource we want to define will create and print a new string every time it is used. Releasing it will simply be printing it. A Resource is essentially two computations:

  • one creating the resource
  • one releasing the resource

Let's start by the IO creating and printing the string:

>>> rs_open : IO[None, None, str] = (
...   rnd_str.flat_map(lambda s: io.sequence(
...     io.defer(print, f"Opening {s}"),
...     io.pure(s)
...   ))
... )

Now the function releasing the string (i.e. printing it):

>>> def rs_close(s: str, cs: ComputationStatus) -> IO[None, None, None]:
...   return io.defer(print, f"Closing {s}")

The first function argument is the created resource. The second one indicates whether the computation was successful. It can be either ComputationStatus.SUCCEEDED or ComputationStatus.FAILED.

From there, creating a Resource is as simple as a single call to resource.from_open_close_io:

>>> rs : Resource[None,None,str] = resource.from_open_close_io(rs_open, rs_close)

That wasn't that hard, isn't it?

use : using a resource

Now that we have a Resource, we want to use it. To do so, just call the method use. You need to give it a function taking as argument the resource created and retuning an IO that used this resource. The result is an IO:

>>> io_ok : IO[None,None,int] = rs.use(lambda s: io.pure(5))
>>> io_ok.run(None)
Opening B9G0G96J
Closing B9G0G96J
Ok(success=5)

As you can see a random string is created and released. The result is an IO whose result is the result of the inner IO.

>>> io_error : IO[None,None,Any] = rs.use(lambda s: io.error("Oups!"))
>>> io_error.run(None)
Opening R9A1YSJ3
Closing R9A1YSJ3
Error(error='Oups!')

If the inner IO fails, the string is still released!

>>> io_panic : IO[None,None,None] = rs.use(lambda s: io.panic(Exception("BOOM!")))
>>> io_panic.run(None)
Opening N1H4A63V
Closing N1H4A63V
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
  stack_trace='...')], errors=[])

If the inner IO panics, the string is still released too!

Note: the with_(an_io) method is a nice alias for use(lambda _: an_io).

map, flat_map, defer, async_ and others.

Resource supports almost the same API as IO. It includes map, flat_map, defer, zip, etc. It means, for example, that you can create resources in parallel, or simply create a list of resources (if one fails, all fails), etc.

lift_io : from IO to Resource

Actually, any IO[E,E,A] can be lifted into a Resource[R,E,A]. The releasing function is just a no-op. It brings a lot of expressiveness and safety to resource creation:

>>> rs : Resource[None,None,None] = resource.lift_io(io.defer(print, "Hello World!"))
>>> rs.use(lambda none: io.pure(5)).run(None)
Hello World!
Ok(success=5)

from_open_close : Resource from open and close functions

Sometimes it is easier to create a Resource from usual Python's functions rather than from the open/close IOs. To do so, just use the function resource.from_open_close:

>>> def rs_open() -> str:
...   s = ''.join(
...     random.choice(string.ascii_uppercase + string.digits)
...     for _ in range(8)
...   )
...   print(f"Opening {s}")
...   return s
>>> def rs_close(s: str, cs: ComputationStatus) -> None:
...   print(f"Closing {s}")
>>> rs : Resource[None,None,str] = resource.from_open_close(rs_open, rs_close)

Once again, the he first argument of rs_close is the created resource and the second one indicates whether the computation was successful: ComputationStatus.SUCCEEDED or ComputationStatus.FAILED.

from_with : Resource from with

If the resource you want to create already support Python's with-statement, then you're lucky: you just have to make one single call to resource.from_with

>>> rs : Resource[None,None,str] = resource.from_with(io.defer(open, "hello.txt", "w"))

Creating a Resource directly

A Resource[R,E,A] is essentially an IO[R,E,Tuple[A, Callable[[ComputationStatus], IO[R,E,Any]]]]. When the IO runs, it returns a pair Tuple[A, Callable[[ComputationStatus], IO[R,Any,Any]]]. The fist member of the pair is the created resource of type A. The second member of the pair is the release function. Its argument is a ComputationStatus indicating whether the computation was successful. It must return an IO that perform the release of the resource.

Note that any failure encountered when releasing the resource makes the IO to fail too.

>>> create : IO[None,None,Tuple[str, IO[None,Any,Any]]] = (
...   rnd_str.flat_map(lambda filename:
...     io.defer(print, f"Opening {filename}").map(lambda file:
...       (file, lambda computationStatus: io.defer(print, f"Closing {filename}"))
...     )
...   )
... )
>>> rs : Resource[None,None,str] = Resource(create)

Use Case : Temporary File

This is a complete use case of a Resource creating a random file.

>>> from io import TextIOWrapper
>>> create : IO[None,None,Tuple[TextIOWrapper, IO[None,None,None]]] = (
...   rnd_str
...   .flat_map(lambda filename:
...     io.defer(print, f"Opening {filename}")
...     .then(io.defer(open, filename, "w"))
...     .map(lambda file:
...       ( file,
...         lambda computationStatus: io.defer(print, f"Closing {filename}")
...         .then(io.defer(file.close))
...       )
...     )
...    )
... )
>>> rs : Resource[None,None,TextIOWrapper] = Resource(create)
>>> io_ok : IO[None,None,None] = (
...   rs.use(lambda file:
...     io.defer(file.write, "Hello World!")
...   )
... )
>>> io_ok.run(None)
Opening 6E21M413
Closing 6E21M413
Ok(success=12)
>>> io_error : IO[None,None,None] = (
...   rs.use(lambda file:
...     io.defer(file.write, "Hello World!")
...     .then(io.error("Oups!"))
...   )
... )
>>> io_error.run(None)
Opening R9A1YSJ3
Closing R9A1YSJ3
Errors(errors=['Oups!'])
>>> io_panic : IO[None,None,None] = (
...   rs.use(lambda file:
...     io.defer(file.write, "Hello World!")
...     .then(io.panic(Exception("BOOM!")))
...   )
... )
>>> io_panic.run(None)
Opening 5YZ02058
Closing 5YZ02058
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
  stack_trace='...')], errors=[])

Variables

IO is heavily expression based but Python does not allow to define local variables in expressions. Raffiot provides two types to work around this limitation: Val and Var. A Val is an immutable variable while a Var is a mutable variable.

Immutable Variables

Immutable variables can be created very easily by using the class Val:

>>> from raffiot import *
>>> Val(5)
Val(value=5)

Val has most of the usual operations like map, flat_map, zip, etc. Please have a look the the API for a complete list of methods.

For example, Python's forbids this syntax:

>>> lambda x:
>>>   y = x + 5
>>>   print(f"y={y}")

because it only allows lambdas to contain one single expression. To simulate this behaviour, you can use:

>>> lambda x: Val(x+5).map(lambda y: print(f"y={y}"))

We totally agree that the first syntax is far superior, but until Python allows lambda to declare local variables, this is probably the best we can get.

Mutable Variables

Raffiot's mutable variable are not only mutable variables. They are designed to play nicely with concurrency. Every access to the a Var is exclusive, which means only one fiber is allowed to read or modify the variable at any time.

Var.create, Var.create_rs: the only ways to create a mutable variable

Because a Var is not just a value, it can only be created either by the IO Var.create(a:A) of type IO[Any,None,Var[A]] or the Resource Var.create(a:A) of type Resource[Any,None,Var[A]]:

>>> from typing import Any
>>> main: IO[Any,None,None] = (
...   Var.create(5).flat_map(lambda var1: var1.set(7).then(var1.get()))
... )
>>> main.run(None)
Ok(success=7)
>>> main_rs: Resource[Any,None,None] = (
...   Var.create_rs(5).flat_map(lambda var1: var1.set_rs(7).then(var1.get_rs()))
... )
>>> main_rs.use(io.pure).run(None)
Ok(success=7)

As you can see, you can use the methods get/set to get/set the current value as an IO or get_rs/set_rs to get/set the current value as a Resource.

Alternatively, you can use the method get_and_set and get_and_set_rs to assign a new value to the variable and get the previous value at the same time.

update: modifying the current value

When you need to update the current value of a variable var:Var[A], use the update method. It takes as input a function f: Callable[[A], Tuple[A,B]]. This function will receive the current value of the variable and must return a pair (new_value, returned). The new_value will become the new value of the variable. The value returned serves to output some value if you want to.

>>> main: IO[Any,None,None] = (
...   Var.create(5).flat_map(lambda v: v.update(lambda x: (x+1, 2*x)))
... )
>>> main.run(None)
Ok(success=UpdateResult(old_value=5, new_value=6, returned=10))

The functions update_io and update_rs are respectively for IO and Resource functions.

traverse: creating a new variable from another one.

The traverse methods create a new variable using an existing one:

>>> main: IO[Any,None,None] = (
...   Var.create(5)
...      .flat_map(lambda var1: var1.traverse(lambda x: io.defer(print, x)
...                                                       .then(io.pure(x+1))
...                                          )
...                                  .flat_map(lambda var2: var2.get())
...       )
... )
5
Ok(success=6)

zip: Getting the values of a group of variables

The zip class method regroup the values a list of variable. All concurrent access to the variables are forbidden during the zip to ensure consistent reading of the variables values:*

>>> main: IO[Any,None,None] = (
...   io.zip(Var.create(0), Var.create(1))
...     .flat_map(lambda vars:
...       Var.zip(vars[0], vars[1])
...     )
... )
>>> main.run(None)
Ok(success=[0, 1])

Note: the zip_with instance method is equivalent.

ap: Combining the values of a group of variables

The zip class method regroup the values a list of variable. All concurrent access to the variables are forbidden during the zip to ensure consistent reading of the variables values:*

>>> main: IO[Any,None,None] = (
...   Var.create(lambda x, y: x + y)
...      .flat_map(lambda var_fun:
...         io.zip(Var.create(2), Var.create(3))
...           .flat_map(lambda vars:
...             var_fun.ap(vars[0], vars[1])
...           )
...     )
... )
>>> main.run(None)
Ok(success=5)