Robust and Fast Functional IO Toolkit
Raffiot is small (almost) dependency-free python library providing some usual functional tools. It currently provides
- an easy-to-use
IO
monad which is stack-safe, fast, support asynchronous, concurrent, parallel programming, has many other features. - a
Resource
data type for easy but reliable resource management. - a
Result
data structure to represent errors
Demo
For a demo, just type this in a terminal:
curl https://raw.githubusercontent.com/chrilves/raffiot.py/main/demos/raffiot_demo.sh | /bin/sh
This demo runs 4 computations in parallel. It demonstrates how simple concurrent and parallel programing is in raffiot.
Note that this command will install raffiot in your current Python environment
Documentation
This Guide
This guide will teach you how to use Raffiot through examples. Just use the left panel or the right arrow on this page to jump to the next section.
API
The API is online at https://chrilves.github.io/raffiot.py/api/index.html.
Features
- pure python: Raffiot is written entirely in Python 3.7+.
- small: it is just a few small files.
- (almost) dependency-free: it only depends on
typing-extensions
(for the@final
annotation). - crystal clear code
IO
- stack safe: you just won't run into stack overflows anymore.
- fast: you won't notice the overhead.
- dependency injection made easy: make some context visible from anywhere.
- simple asynchronous and concurrent programming: full support of synchronous, asynchronous and concurrent programming with the same simple API.
- railway-oriented programming: clean and simple failure management.
- distinction between expected and unexpected failures: some failures are part of your program's normal behaviour (errors) while others are show something terribly wrong happened (panics). Yes, that's heavily inspired by Rust.
Resource
Python has the with
construction, but Resource
goes a step further.
- easy user-defined resource creation: just provide some open and close function.
- composability: the resource you want to create depends on another resource? Not a problem, you can compose resources the way you want. It scales.
- failures handling in resources:
Resource
has everythingIO
has, including its wonderful failure management.
Result
Did I mention Railway-Oriented Programming? Result
is represent the 3 possible
result of a computation:
Ok(value)
: the computation successfully computed the thisvalue
.Errors(errors)
: the computation failed on some expected failureserrors
, probably from the business domain.Panic(exceptions, errors)
: the computation failed on some unexpected failuresexceptions
. Note that there may be some domain errors too.
First Steps with IO
Raffiot is available as a pip package (and soon conda). For now just type this in a terminal:
$ pip install -U raffiot
This guide will teach you how to use Raffiot by exploring most of its features via the Python interactive shell (also known as Python's REPL):
$ python
Python 3.9.1 (default, Dec 13 2020, 11:55:53)
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
Start by importing Raffiot:
>>> from raffiot import *
Hello World!
Let's start by the classic "Hello World!":
>>> main : IO[None,None,None] = io.defer(print, "Hello World!")
As you can see, nothing is printed yet! The defer
function delays the
execution of print("Hello World!")
until the the value main
is run.
Very important: a value of type IO
, like main
, is the description
of some computation, very much like the text of a Python script.
Nothing is executed until the (value of type) IO
is actually run, very
much like the code of Python script is only executed when this script is
run.
Inspecting the value main
, gives you:
>>> main
Defer((<built-in function print>, ('Hello World!',), {}))
Running an IO
is very simple! Just call its run
method like:
>>> main.run(None)
Hello World!
Ok(success=None)
>>> main.run(None)
Hello World!
Ok(success=None)
As you can see, every call to run
printed Hello World!
and returned
the value Ok(None)
. Ok
means the computation was successful, None
is the return value of computation.
defer
: doing something later
The first argument of defer
is the function you want to call later.
The following arguments are the function's normal arguments.
For example, to call datetime.now()
later, just create the IO
:
>>> from datetime import *
>>> now : IO[None,None,datetime] = io.defer(datetime.now)
Every time you run it, it will call datetime.now()
and give you
its result:
>>> now.run(None)
Ok(success=datetime.datetime(2021, 2, 19, 18, 38, 42, 572766))
>>> now.run(None)
Ok(success=datetime.datetime(2021, 2, 19, 18, 38, 47, 896153))
In the type IO[R,E,A]
, A
is the type of values returned when the computation is successful.
now
being of type IO[None,None,datetime]
, it returns values
of type datetime
.
Likewise, you can define the print_io
function that prints its arguments
later:
>>> def print_io(*args, **kwargs) -> IO[None, None, None]:
... return io.defer(print, *args, **kwargs)
Note that calling print_io("Hello")
will not print anything but return
an IO
. To actually print Hello
you need to run the IO
:
>>> print_io("Hello", "World", "!")
Defer((<built-in function print>, ('Hello', 'World', '!'), {}))
>>> print_io("Hello", "World", "!").run(None)
Hello World !
Ok(success=None)
This ability to represent any computation as a value is one of the main
strength of an IO
. It means you can work with computation like any
other value: composing them, storing them in variables, in lists, etc.
then
: doing something sequentially.
You will often need to execute some IO
sequentially.
The method then
compose some values of type IO
, running
them one by one. The return value is the one of the last IO
:
>>> main : IO[None,None,datetime] = print_io("First line").then(
... print_io("Second line"),
... print_io("Third line"),
... now
... )
>>> main.run(None)
First line
Second line
Third line
Ok(success=datetime.datetime(2021, 2, 20, 15, 52, 11, 205845))
You may sometimes prefer the analogous function io.sequence
that behaves like the method then
.
map
: transforming results.
You can transform the return value of an IO
using the map
method.
It is very similar to the map
function on lists, but works on IO
.
Just provide map
some function. It will use this function to transform
the IO
return value:
>>> date_str : IO[None,None,str] = now.map(lambda d: d.isoformat())
>>> date_str
Map((Defer((<built-in method now of type object at 0x7ff733070bc0>,
(), {})), <function <lambda> at 0x7ff7338d9280>))
>>> date_str.run(None)
Ok(success='2021-02-19T23:54:46.297503')
flat_map
: chaining IOs.
map
transform the return value of an IO
. So transforming the
return value of date_str
with print_io
will give you an IO
whose return value is also an IO
. When you will run it, instead
if executing the inner IO
, it will return it to you:
>>> main : IO[None,None,IO[None,None,None]] = date_str.map(lambda x: print_io(x))
>>> main.run(None)
Ok(success=Defer((<built-in function print>, ('2021-02-20T15:54:38.444494',), {})))
When you want to use the result of some IO
into some other IO
,
use flat_map
:
>>> main: IO[None,None,None] = date_str.flat_map(lambda x: print_io(x))
>>> main.run(None)
2021-02-20T15:55:13.940717
Ok(success=None)
Here the return value of date_str
is given to print_io
via x
and
both IO
are executed, returning the result of print_io(x)
.
flatten
: concatenating an IO of IO.
Instead of having used flat_map
, you could have used map
and then
flatten
to reduce the IO of IO
into a single layer of IO
:
>>> main : IO[None,None,None]= date_str.map(lambda x: print_io(x)).flatten()
>>> main.run(None)
2021-02-20T15:58:44.244715
Ok(success=None)
Most of the time, you will use flat_map
because it is simpler to use.
But now you know where its name comes from: flatten of map.
pure
: just a value.
pure
is very simple: the result of the computation is the very same
argument of pure
:
>>> main : IO[None,None,int] = io.pure(5)
>>> main
Pure(5)
>>> main.run(None)
Ok(success=5)
It is very useful when some functions/method expect some IO
but you want to provide a constant.
defer_io
: computing an IO later.
defer_io
is very much like defer
but for functions returning IO
:
>>> main : IO[None,None,IO[None,None,None]] = io.defer(print_io, "Hello", "World", "!")
>>> main.run(None)
Ok(success=Defer((<built-in function print>, ('Hello', 'World', '!'), {})))
>>> main : IO[None,None,None] = io.defer_io(print_io, "Hello", "World", "!")
>>> main.run(None)
Hello World !
Ok(success=None)
Like flat_map
is faltten
of map
, defer_io
is flatten
of defer
.
It is useful to defer the call of function returning IO
.
Use Case: Stack-Safety
Let's see one of the main feature of IO
: it is stack safe!
When you run the following function in Python, even if the
argument times
is small, the computation will fail miserably
because it blew the stack:
>>> def print_date(times: int) -> None:
... if times > 0:
... d = datetime.now()
... print(d.isoformat())
... print_date(times - 1)
>>> print_date(1000)
<LOTS OF DATES>
2021-02-20T16:20:37.188880
2021-02-20T16:20:37.188883
2021-02-20T16:20:37.188886
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 5, in print_date
File "<stdin>", line 5, in print_date
File "<stdin>", line 5, in print_date
[Previous line repeated 992 more times]
File "<stdin>", line 4, in print_date
RecursionError: maximum recursion depth exceeded while calling a Python object
2021-02-20T16:20:37.188889
On the contrary, the equivalent function using IO
will never
blew the stack, even for very high values of times
:
>>> def print_date_io(times: int) -> IO[None, None, None]:
... if times > 0:
... return (
... now
... .flat_map(lambda d: print_io(d.isoformat()))
... .flat_map(lambda _: print_date_io(times - 1))
... )
... else
... return io.pure(None)
>>> print_date(1000000)
<LOTS OF DATES>
2021-02-20T16:23:22.968454
2021-02-20T16:23:22.968464
Ok(success=None)
With IO
, you can use recursion without fear! Just remember to
wrap your computation in an IO
(using defer
, defer_io
, map
,
flat_map
and others) to benefit from IO
's safety.
Failures
All the computation we have seen until now were successful.
We will now see how IO
's failure management works.
But before, I need to present you the Result[E,A]
type.
The Result[E,A]
type
Result[E,A]
represent the result of a computation.
A computation can either be successful, returning a value of type A
,
or have failed on some expected errors of type E
, or failed one some
unexpected exceptions.
IO
and Result
make a distinction between expected failures,
called errors, and unexpected failures, called panics.
For an operating system, an application crashing is an expected error. A well designed operating system is expected to be prepared to such errors. It has to deal with situations like this nicely and continue running normally. Errors are part of the normal life of your program. An error means that some operation failed but your program is still healthy.
On the contrary, memory corruption inside the kernel is an unexpected error. The system can not continue running normally. The failure may have made the computation dangerous and/or the result wrong. The only option is terminating the system as smoothly as possible. Panics should never happen, but sometimes even the most improbable events do occur. When panics happen, consider your computation lost. Terminate it doing as little damage as possible.
Raffiot is designed to report all encountered failures. Failures are
never silently ignored but exposed via the Result
type. This is why the
Result
type works with lists of domain failures (errors) and unexpected
failures (panics). In addition, Raffiot saves the stack trace of every
exception encountered so that you always know where the exception was risen,
even with multiprocessing!
For the rest of this section, you will need these imports.
>>> from raffiot import *
Ok(success:A)
: A Success
When a computation successfully return some value a
of type A
,
it actually returns the value Ok(a)
of type Result[E,A]
. The
value a
can be obtained by Ok(a).success
:
>>> from typing import Any
>>> r : Result[Any,int] = Ok(5)
>>> r
Ok(success=5)
>>> r.success
5
Errors(errors:List[E])
: Some Expected Failures
When a computation fail because of an error e
of type E
,
it actually return a value Errors(errors=[e])
of type Result[E,Any]
.
The type E
can be any type you want. Choose as type E
a type
that fit your business domain errors the best.
The list of all errors encountered can be obtained by Errors([e]).errors
:
>>> r : Result[int,Any] = Errors([5])
>>> r
Errors(errors=[5])
>>> r.errors
5
Note that you must ALWAYS provide a list to Errors! So, to avoid bugs, please use the method
result.error(e: E) -> Result[E,Any]when you want to raise a single error or
result.errors(e1:E, e2:E, ...) -> Result[E,Any]` when
you want to raise several errors:
>>> result.error(5)
Errors(errors=[5])
>>> result.errors(5, 2)
Errors(errors=[5, 2])
Traced Exceptions: Never lose a stack trace!
Raffiot lift every exception encountered during a computation into a
TracedException
. This is a very simple type:
@dataclass
class TracedException(Exception):
exception: Exception
stack_trace: str
It packs an exception with its stack trace. To the best of our knowledge, Python does not seem to store the stack trace with the exception but only keep the stack trace of the last one encountered. This is a huge problem because in a parallel or concurrent computation, many exception can be raised. If you use multiprocessing, they can even be raises on a different process!
When you have an exception exn
, there are two ways to build a TracedException
:
- If you are in the
except
clause of thetry-except
block that caught the exceptionexn
, you can call the static methodTracedException.in_except_clause(exn)
to capture the stack trace of the exception. Note that it only works in the except clause! - If you are not in the
except
clause of thetry-except
block that caught the exceptionexn
, you can call the static methodTracedException.with_stack_trace(exn)
to capture the current stack trace.
Panic(exceptions: List[TracedException], errors: List[E])
: Some Unexpected Failures
When a computation fail because of a exception exn
,
it actually return a value
Panic(exceptions=[TracedException(exception=exn, stack_trace="...")],errors=[])
of type Result[Any,Any]
.
Note that a Panic
stores a list of traced exceptions and not a single one.
The original exception exn
can be obtained by
Panic([TracedException(exn)],[]).exceptions[0].exception
and its stack trace
by Panic([TracedException(exn)],[]).exceptions[0].stack_trace
.
>>> r : Result[Any,Any] = Panic(
... exceptions=[TracedException.with_stack_trace(Exception("BOOM!"))],
... errors=[]
... )
>>> r
Panic(
exceptions=[TracedException(exception=Exception('BOOM!'), stack_trace='...')],
errors=[]
)
>>> r.exceptions
[TracedException(exception=Exception('BOOM!'), stack_trace='...')]
>>> r.errors
[]
>>> r.exceptions[0].exception
Exception('BOOM!')
>>> print(r.exceptions[0].stack_trace)
File "<stdin>", line 2, in <module>
File "/raffiot.py/raffiot/utils.py", line 63, in with_stack_trace
return TracedException(exception=exn, stack_trace="".join(format_stack()))
When an unexpected failures happen, there may have already been some expected
failures. This is why the Panic
case have a list of errors slot. The list
of exceptions should never be empty (otherwise it shouldn't be a Panic
).
Using the Panic
constructor is error-prone as you must always provide lists
for the exceptions
and errors
field. Instead you can use the helper function
result.panic
:
>>> result.panic(Exception("BOOM"))
Panic(exceptions=[TracedException(exception=Exception('BOOM'), stack_trace='...')], errors=[])
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"))
Panic(exceptions=[
TracedException(exception=Exception('BOOM 1'), stack_trace='...'),
TracedException(exception=Exception('BOOM 2'), stack_trace='...')
],
errors=[])
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"), errors=[5,2])
Panic(exceptions=[
TracedException(exception=Exception('BOOM 1'), stack_trace='...'),
TracedException(exception=Exception('BOOM 2'), stack_trace='...')
],
errors=[5, 2]
)
fold
: transforming a Result[E,A]
To transform a Result[E,A]
, use the method fold
. It takes
as argument three functions. The first one is called when the
result is an Ok
. The second is called when it are some Errors
.
The third is called on Panic
. When called, each of
these function receive as argument the value/list of errors/list of traced
exceptions and errors (depending on the case) stored in the result:
>>> Ok(5).fold(
... lambda s: f"success: {s}",
... lambda e: f"errors: {e}",
... lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'success: 5'
>>> result.errors(7,5).fold(
... lambda s: f"success: {s}",
... lambda e: f"errors: {e}",
... lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'error: [7,5]'
>>> result.panic(Exception("BOOM 1"), Exception("BOOM 2"), errors=[5,2]).fold(
... lambda s: f"success: {s}",
... lambda e: f"errors: {e}",
... lambda p,e: f"traced exeptions: {p}, errors: {e}"
... )
'traced exeptions: [
TracedException(exception=Exception(\'BOOM 1\'), stack_trace=\'...\'),
TracedException(exception=Exception(\'BOOM 2\'), stack_trace=\'...\')
],
errors: [5, 2]'
raise_on_panic
: reporting panics as exceptions
Raffiot's functions and methods never raise exception
but instead return a Panic
. When you need a failed
computation to raise the exception, call raise_on_panic
on the result:
>>> Ok(5).raise_on_panic()
Ok(success=5)
>>> result.error(7).raise_on_panic()
Errors(errors=[7])
>>> result.panic(Exception("BOOM!")).raise_on_panic()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/dev/raffiot.py/raffiot/result.py", line 441, in raise_on_panic
raise MultipleExceptions.merge(*self.exceptions, errors=self.errors)
raffiot.utils.TracedException: BOOM!
File "<stdin>", line 1, in <module>
File "/dev/raffiot.py/raffiot/result.py", line 559, in panic
exceptions=TracedException.ensure_list_traced(exceptions),
File "/dev/raffiot.py/raffiot/utils.py", line 76, in ensure_list_traced
return [cls.ensure_traced(exn) for exn in exceptions]
File "/dev/raffiot.py/raffiot/utils.py", line 76, in <listcomp>
return [cls.ensure_traced(exn) for exn in exceptions]
File "/dev/raffiot.py/raffiot/utils.py", line 70, in ensure_traced
return cls.with_stack_trace(exception)
File "/dev/raffiot.py/raffiot/utils.py", line 66, in with_stack_trace
return TracedException(exception=exn, stack_trace="".join(format_stack()))
raise_on_panic
and unsafe_get
are
the only functions/methods raising exceptions.
Never expect other functions to raise exception on failures, they
will return an Errors
or Panic
instead:
>>> main : IO[None,None,None] = io.panic(Exception("BOOM!"))
>>> main.run(None)
Panic(exceptions=[
TracedException(exception=Exception('BOOM!'), stack_trace='...')
], errors=[])
>>> main.run(None).raise_on_panic()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/raffiot.py/raffiot/result.py", line 441, in raise_on_panic
raise MultipleExceptions.merge(*self.exceptions, errors=self.errors)
raffiot.utils.TracedException: BOOM!
File "<stdin>", line 1, in <module>
File "/raffiot.py/raffiot/io.py", line 467, in panic
TracedException.ensure_list_traced(exceptions),
File "raffiot.py/raffiot/utils.py", line 76, in ensure_list_traced
return [cls.ensure_traced(exn) for exn in exceptions]
File "raffiot.py/raffiot/utils.py", line 76, in <listcomp>
return [cls.ensure_traced(exn) for exn in exceptions]
File "raffiot.py/raffiot/utils.py", line 70, in ensure_traced
return cls.with_stack_trace(exception)
File "/raffiot.py/raffiot/utils.py", line 66, in with_stack_trace
return TracedException(exception=exn, stack_trace="".join(format_stack()))
Errors: Expected failures in IO
IO
has built-in support errors. Remember that we call error the
expected failures. Errors can be of any type you want. You should
usually take as error type the one that fit your business domain
errors the best. In the type IO[R,E,A]
, E
is the type of errors.
error
: this failure was expected, we're still in safe zone!
To raise an error, simply call io.error
:
>>> from typing import Any, List
>>> main_str : IO[None,str,Any] = io.error("Oops")
>>> main_str.run(None)
Errors(errors=['Oops'])
>>> main_int : IO[None,int,Any] = io.error(5)
>>> main_int.run(None)
Errors(errors=[5])
>>> main_list : IO[None,List[int],Any] = io.error([1,2,3])
>>> main_list.run(None)
Error(error=[[1, 2, 3]])
errors
: raising several errors at once.
To raise several errors, simply call io.errors
:
>>> from typing import Any, List
>>> main_str : IO[None,str,Any] = io.errors("Oops1", "Oops2")
>>> main_str.run(None)
Errors(errors=['Oops1', 'Oops2'])
>>> main_int : IO[None,int,Any] = io.errors(5,7)
>>> main_int.run(None)
Errors(errors=[5, 7])
But beware: the iterable arguments will be treated as collections of errors
and not a single error. For example the error [1,2,3]
will be treated by
io.errors
as three errors 1
, 2
and 3
. So when you want to raise iterable
errors, use io.error
instead:
>>> main_list : IO[None,List[int],Any] = io.errors([1,2,3])
>>> main_list.run(None)
Errors(errors=[1, 2, 3])
catch
: reacting to expected failures to continue.
To react to errors, just call the method catch
.
It is very similar to a try-exept
block.
It takes as argument a function called the error handler.
When the computation fails because of some errors le: List[E]
,
the error handler is called with le
as argument.
The result is then the error handler's result.
>>> def main(i: int) -> IO[None,str,int]:
... return (
... io.error(i)
... .catch(lambda x:
... io.pure(2*x[0])
... if x[0] % 2 == 0
... else io.error("Oops"))
... )
>>> main(5).run(None)
Errors(errors=['Oops'])
>>> main(6).run(None)
Ok(success=12)
If the computation is successful, or if it failed on a panic,
then catch
has no effect.
Note that the error handler can itself raise errors and panics.
map_error
: transforming expected failures.
It is often useful to transform an error. For example you may want to add some useful information about the context: what was the request that led to this error, what were the arguments of the operation that failed, etc.
To transform an error, call map_error
. It behaves like map
,
but on errors:
>>> main : IO[None,int,Any] = io.error([1,2]).map_error(lambda l: l[0] + l[1])
>>> main.run(None)
Errors(errors=[3])
If the computation is successful or if it fails on a panic,
then map_error
has no effect.
Panics: Unexpected Failures in IO
The type of panics is always the Python type of exception Exception
.
Panics can be raise either manually, by calling io.panic
, or
when run
encounters an exception.
The method run
on IO
never raises exceptions!
Every exception p
raised during the execution of run
are caught and
transformed into panics result.panic(p)
.
ALL exceptions are caught
All the functions and method that accept functions as arguments
run them inside a try-except
block to catch every raised exception.
All exception caught this way are transformed into panics:
>>> main : IO[None,None,float] = io.pure(0).map(lambda x: 1/x)
>>> main.run(None)
Panic(exceptions=[TracedException(
exception=ZeroDivisionError('division by zero'),
stack_trace='...')
], errors=[])
Remember that panics are unexpected failures and unexpected failures
should never happen. In an ideal world you should never give to
map
, flat_map
, defer
and others functions that may raise exceptions.
In an ideal world, panics would never occur. But we do not live in an
ideal world, so map
, flat_map
and others covers your back by
catching any exception for your own safety.
panic
: something went terribly wrong, we're in unsafe zone.
It is sometimes useful to manually raise panics. For example when you encounter a problematic situation you though were impossible but still did happen. If your program is not designed to handle this situation, you should raise a panic.
>>> main : IO[None,None,Any] = io.panic(Exception("BOOM!"))
>>> main.run(None)
Panic(exceptions=[TracedException(
exception=Exception('BOOM!'),
stack_trace='...')
], errors=[])
The function io.panic
accepts several exceptions as arguments and even
domain errors with the errors
keyword argument.
recover
: stopping the computation safely after an unexpected failure.
Even in case of a panic, you have a chance to perform some emergency
actions to recover. For example, you may want to restart the computation
on panics. To react to panics, just call recover
.
It is very similar to a try-exept
block.
It takes as argument a function, called the panic handler.
If the computation fails because of a panic with exceptions
lexn: List[TracedException]
,
and errors lerr: List[E]
, then the panic handler is called with lexn
as
first argument and lerr
as its second argument.
The result is then the the handler's result.
>>> main : IO[None,None,Any] = (
... io.panic(Exception("BOOM!"))
... .recover(lambda lexn, lerr: io.pure(
... f"Recovered from traced exceptions: {lexn} and errors {lerr}")
... )
... )
>>> main.run(None)
Ok(success='Recovered from traced exceptions:[TracedException('
'exception=Exception(\'BOOM!\'), stack_trace=\'...')] and errors []')
map_panic
: transforming traced exceptions.
It is often useful to transform a panic. For example you may want to add some useful information about the context: what was the request that led to this error, what were the arguments of the operation that failed, etc.
To transform all the exceptions of a Panic
, call map_panic
.
It behaves like map
, but on exceptions contained in Panic
:
>>> main : IO[None, None, None] = (
... io.pure(0)
... .map(lambda x: 1/x)
... .map_panic(lambda traced_exception:
... TracedException(
... exception=Exception("BOOM"),
... stack_trace=traced_exception.stack_trace
... )
... )
... )
>>> main.run(None)
Panic(exceptions=[TracedException(exception=Exception('BOOM'),
stack_trace='...')], errors=[])
More tools
Here are some very useful functions. They can all be expressed using the functions and methods seen above but they deserve being seeing in details:
attempt
: failures as values.
The method attemp
transform an IO[R,E,A]
into
IO[R,None,Result[E,A]]
. If the original computation is successful,
the transformed one returns an Ok(Ok(...))
. If the original computation fails
on some errors, the transformed one returns Ok(Errors(...))
.
If the original computation fails on a panic, the transformed one returns Ok(Panic(...))
:
>>> io_ok : IO[None,None,Result[None,int]] = io.pure(5).attempt()
>>> io_ok.run(None)
Ok(success=Ok(success=5))
>>> io_error : IO[None,None,Result[int,None]] = io.error(7).attempt()
>>> io_error.run(None)
Ok(success=Errors(errors=[7]))
>>> io_panic : IO[None,None,Result[None,None]] = io.panic(Exception("BOOM!")).attempt()
>>> io_panic.run(None)
Ok(success=Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='...')], errors=[]))
It is hugely useful when you want to do different actions depending
on the result of a computation. Calling attempt
and flat_map
is
easier than combining flat_map
, catch
and recover
correctly.
from_result
: from Result[E,A]
to IO[None,E,A]
The function io.from_result
does almost the opposite of attempt
.
It transform a Result[E,A]
into the corresponding IO[None,E,A]
:
>>> io_ok : IO[None,None,int] = io.from_result(Ok(5))
>>> io_ok.run(None)
>>> io_error : IO[None,int,None] = io.from_result(result.error(5))
>>> io_error.run(None)
Errors(errors=[5])
>>> io_panic : IO[None,None,None] = io.from_result(result.panic(
... TracedException(Exception("BOOM!"),"")))
>>> io_panic.run(None)
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='')], errors=[])
from_result
is often useful after an attempt
to restore the state
of the computation.
finally_
: doing something unconditionally.
The method finally_
is the finally clause of a try-except-finally.
It executed a IO
after the current one, discard its result and
restore the result of the current one. The IO
executed after is actually
a function taking as argument a Result[R,E]
from the preceding IO
.
It enables to perform different actions depending on the result of the first
computation:
>>> io.pure(5).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Ok(success=5)
Ok(success=5)
>>> io.error(7).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Errors(errors=[7])
Errors(errors=[7])
>>> io.panic(Exception("BOOM!")).finally_(lambda r: io.defer(print, f"Hello, result is {r}")).run(None)
Hello, result is Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='...')], errors=[])
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='...')], errors=[])
on_failure
: reaction to both errors and panics.
Calling both methods catch
and recover
is sometimes annoying.
When you need to react to any failure, call on_failure
. It
takes as argument a function called the failure handler.
When a computation fails, it calls the failure handler with the
failure passed as argument as a Result[E,None]
.
This Result[E,None]
is never Ok
because on_failure
call the handler
only on failures.
>>> io.pure(5).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=5)
>>> io.error(7).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=12)
>>> io.panic(Exception("BOOM!")).on_failure(lambda x: io.pure(12)).run(None)
Ok(success=12)
Context
The time has come to talk about the third type parameter of an IO
.
In the type IO[R,E,A]
, R
is the type of the context.
The context is a value that is always accessible to any IO
.
You can think of it as a local global variable.
I assure you this sentence make sense!
run
: executing an IO
in a given context.
We have called the method run
many times. And every time we gave it
None
as argument. The argument you give to run
is actually the
context value. You take any value you want as the context.
Usually the context is a value you want to be accessible from everywhere.
Global variables are indeed accessible from everywhere but they are very annoying because they can only have one value at a time. Furthermore every change made to the global variable by a part of your code will affect all other parts reading this global variable.
On the opposite side, local variable are nice because every part of your code can have its own dedicated local variable. But they are annoying because to make them accessible everywhere, you have to pass it to every functions as parameters. This is error prone and pollutes your code.
Given an IO
named main
, when you call run
with some context r
,
the value r
is accessible from everywhere in main
.
The context behaves like a global variable inside the running IO
.
But you can pass every call to run
a different context.
The context behaves like a local variable between different calls to run
.
read
: accessing the shared context.
To access the context, just call the function io.read
. Its result
is the context:
>>> main : IO[int,None,int] = io.read
>>> main.run(5)
Ok(success=5)
>>> main.run(77)
Ok(success=77)
This example is indeed trivial. But imagine that main
can be
a very big program. You can call io.read
anywhere in main
to
get the context. It saves you the huge burden of passing the context
from run
to every function until it reaches the location of io.read
.
contra_map_read
: transforming the shared context.
As I said, the context behaves as a local global variable.
With io.read
you have seen its global behaviour.
The method contra_map_read
transforms the context, but only
for the IO
it is being called on.
Note that the context is transformed before the IO
is executed.
The method contra_map_read
is very useful when you need to
alter the context. For example, your program may start with None
as context, read its configuration to instantiate the services it wants
to inject and then change to context to pass these services everywhere
for dependency injection.
>>> main : IO[str,None,int] = io.read.contra_map_read(lambda s: len(s))
>>> main.run("Hello")
Ok(success=5)
defer_read
: doing something context-dependent later.
The function io.defer_read
is useful when you it is easier to
implement an IO
as a normal function and then transform it into an IO
.
io.defer_read
, like io.defer
, takes as first argument the function
you want to execute later. But unlike io.defer
, this function:
- has access to the context
- returns a
Result[E,A]
The last point is important because it means the function can raise
errors while io.defer
can only raise panics.
>>> def f(context:int, i: int) -> Result[str,int]:
... if context > 0:
... return result.ok(context + i)
... else:
... return result.error("Ooups!")
>>> main : IO[int,None,int] = io.defer_read(f, 5)
>>> main.run(10)
Ok(success=15)
>>> main.run(-1)
Error(error='Ooups!')
defer_read_io
: computing a context-dependent IO later.
The function io.defer_read_io
is the IO
counterpart of
io.defer_read
. It is useful when the context-aware function you want
to execute later returns an IO
.
>>> def f(context:int, i:int) -> IO[None,str,int]:
... if context > 0:
... return io.pure(context + i)
... else:
... return io.error("Ooups!")
>>> main : IO[int,None,int] = io.defer_read_io(f, 5)
>>> main.run(10)
Ok(success=15)
>>> main.run(-1)
Errors(errors=['Ooups!'])
Use Case: Dependency Injection
As a demonstration of how dependency injection works in Raffiot,
create and fill the file dependency_injection.py
as below:
from raffiot import *
import sys
from dataclasses import dataclass
from typing import List
@dataclass
class NotFound(Exception):
url: str
class Service:
def get(self, path: str) -> IO[None,NotFound,str]:
pass
class HttpService(Service):
def __init__(self, host: str, port: int) -> None:
self.host = host
self.port = port
def get(self, path: str) -> IO[None,NotFound,str]:
url = f"http://{self.host}:{self.port}/{path}"
if path == "index.html":
response = io.pure(f"HTML Content of url {url}")
elif path == "index.md":
response = io.pure(f"Markdown Content of url {url}")
else:
response = io.error(NotFound(url))
return io.defer(print, f"Opening url {url}").then(response)
class LocalFileSytemService(Service):
def get(self, path: str) -> IO[None,NotFound,str]:
url = f"/{path}"
if path == "index.html":
response = io.pure(f"HTML Content of file {url}")
elif path == "index.md":
response = io.pure(f"Markdown Content of file {url}")
else:
response = io.error(NotFound(url))
return io.defer(print, f"Opening file {url}").then(response)
main : IO[Service,NotFound,List[str]] = (
io.read
.flat_map(lambda service:
service.get("index.html")
.flat_map(lambda x:
service.get("index.md")
.flat_map(lambda y: io.defer(print, "Result = ", [x,y]))
)
)
)
if len(sys.argv) >= 2 and sys.argv[1] == "http":
service = HttpService("localhost", 80)
else:
service = LocalFileSytemService()
main.run(service)
Running the program with the HTTPService
gives:
$ python dependency_injection.py http
Opening url http://localhost:80/index.html
Opening url http://localhost:80/index.md
Result = ['HTML Content of url http://localhost:80/index.html', 'Markdown Content of url http://localhost:80/index.md']
While running it with the LocalFileSytemService
gives:
$ python dependency_injection.py localfs
Opening file /index.html
Opening file /index.md
Result = ['HTML Content of file /index.html', 'Markdown Content of file /index.md']
Once again, main
is still a very short IO
, so it may be not trivial
how much the context is a life saver. But imagine the main
IO
to be
one of your real program. It would be much bigger, and passing the
context as global or local variables would be a much bigger problem.
Combining a list of IOs
Remember that an IO
is the description of some computation, like
a source code. As such, it can be manipulated like any value, and
so stored in lists. This section covers the operations you have
on lists of IO
s.
zip
: from a list of IO to an IO of list
The zip
function and method transform a list of IO
into
an IO
returning a list. Each value of the resulting list is
the value returned by the IO
at the same location in the
input list.
The whole computation fails on the first failure encountered.
>>> from typing import List
>>> main : IO[None,None,List] = io.zip(io.pure(8), io.pure("Hello"))
>>> main.run(None)
Ok(success=[8, 'Hello'])
>>> main : IO[None,str,List] = io.zip(io.pure(8), io.error("Oups"))
>>> main.run(None)
Errors(errors=['Oups'])
sequence
: running IOs in sequence
The function io.sequence
is like the method then
:
it executes a list of IO
sequentially, returning the
value of the last IO
and failing on the first failure
encountered:
>>> main : IO[None,None,int] = io.sequence(
... io.defer(print, "Hello"),
... io.defer(print, "World"),
... io.pure(12)
... )
>>> main.run(None)
Hello
World
Ok(success=12)
traverse
: almost like map
The function io.traverse
is very much like the function map
on lists.
Like the map
on lists it applies a function to every element of a list.
But unlike the map
on lists the function it applies to every element
returns an IO
.
io.traverse
is useful when you need to execute a function returning
an IO
to every element of a list. It returns an IO
computing the
list of values returned by each call.
Like zip
and sequence
, it fails on the first failure encountered.
>>> from typing import List
>>> def add_context(i: int) -> IO[int,None,int]:
... return io.read.flat_map(lambda c:
... io.defer(print, f"context = {c}, argument = {i}")
... .then(io.pure(c + i))
... )
>>> main : IO[int,None,List[int]] = io.traverse(range(10), add_context)
>>> main.run(10)
context = 10, argument = 0
context = 10, argument = 1
context = 10, argument = 2
context = 10, argument = 3
context = 10, argument = 4
context = 10, argument = 5
context = 10, argument = 6
context = 10, argument = 7
context = 10, argument = 8
context = 10, argument = 9
Ok(success=[10, 11, 12, 13, 14, 15, 16, 17, 18, 19])
Asynchronous and Concurrent Programming
For now you know that an IO
is very nice for stack safety,
dependency injection, failure management, and code-as-date manipulations.
There is another big feature IO
has: simple asynchronous and
concurrent programming.
run
: the second and third argument.
An IO
is executed on a pool of threads. Until now we only gave io.run
one argument: the context. But io.run
accepts three arguments! The second
one is the number of threads in the pool and the third one is how long a thread
goes to sleep when idle.
The number of threads in the pool is fixed so you should never call a blocking
function inside one of the pool's thread. Create a new thread and run the
blocking operation inside using async_
.
When one of thread has no fibers to run, it call time.sleep
to avoid wasting
precious CPU cycles doing nothing. The third parameter of run
is the amount of
time an idle thread sleeps (a float
of the number of seconds to sleep).
Because of the infamous Python's Global Interpreter Lock Python can not run thread in parallel. So if your code only uses one 100% of a single core, this is normal. You know the story: Python is single-threaded.
To use n thread with an idle time of idle_time seconds, just give n and
idle_time to io.run
:
>>> io.pure(5).run(None, 50, 0.01)
Ok(success=5)
Asynchronous Programming
A call to some function is called synchronous when the thread making the call actually waits for the call to return a value. This is annoying because the thread could be used to perform useful computations instead of just waiting.
On the contrary, a call is said asynchronous when the tread making the call does not wait for the call to finish but run useful computations in the mean time.
The notorious expression callback Hell kindly expresses how asynchronous programming can be error-prone, hard to write and hard to read.
Asynchronous programming is all about callbacks, but fortunately,
programming models were created to hide much of its complexity under
a clean and simple interface. The famous Promise
of JavaScript is such an interface. The async/await
syntax of many languages, including Python, is also such an interface.
So is Raffiot's IO
. But unlike the async/await syntax, synchronous
and asynchronous code can be transparently mixed with IO
.
async_
: running something asynchronously
Calling a function f
usually looks like this:
>>> def f():
... print("f is running")
... return 3
>>> def main():
... print("f not started yet")
... result = f()
... print(f"f finished and returned {result}")
>>> main()
f not started yet
f is running
f finished and returned 3
When the function main
calls f
, it waits for f
to finish.
When f
finishes, main
resumes its computation with the result
of f
.
Asynchronous functions, like
apply_async
do not work this way.
Calling an asynchronous function fasync
usually looks like this.
>>> import time
>>> from multiprocessing import Pool
>>>
>>> def f():
... print("f is running")
... return 3
>>>
>>> with Pool(4) as pool:
... def fasync(callback):
... pool.apply_async(f, callback = callback)
...
... def main():
... print("fasync not started yet")
...
... def callback(result):
... print(f"fasync finished and returned {result}")
...
... fasync(callback)
... print("fasync started")
...
... main()
... time.sleep(0.5)
fasync not started yet
fasync started
f is running
fasync finished and returned 3
As you can seen, the function main
does not wait that f
finishes
but continues its execution printing fasync started
.
The function main
can not get the result of f
so it defines
a function, called a callback, to process the result of when it
finishes.
With Raffiot's IO
you would write:
>>> import time
>>> from multiprocessing import Pool
>>> from raffiot import *
>>>
>>> def f():
... print("f is running")
... return 3
>>>
>>> with Pool(4) as pool:
... f_io : IO[None,None,int] = (
... io.async_(
... lambda r, k:
... pool.apply_async(f, callback = lambda r: k(Ok(r)))
... )
... )
...
... main : IO[None,None,None] = io.sequence(
... io.defer(print, "fasync not started yet"),
... f_io.flat_map(lambda result:
... io.defer(print, f"fasync finished and returned {result}")
... ),
... io.defer(print, "fasync started")
... )
...
... main.run(None)
fasync not started yet
f is running
fasync finished and returned 3
fasync started
Concurrent Programming
Concurrent programming is about running things "in parallel". Raffiot can run a large number of concurrent computation simply and safely:
parallel
: running concurrent tasks
The function io.parallel
runs a list of IO
s in parallel.
Remember that because of Python's
Global Interpreter Lock
only one thread executing Python's code can be running
at any time. But your code involves a lot of primitives
written in C/C++/etc, then you might get lucky and
use all of your cores.
parallel
returns a list of values called fibers.
A fiber represents a tasks running in parallel/concurrently.
Every fiber in the returned list correspond to the
IO
at the same location in the argument list.
For example in
io.parallel(ios).flat_map(lambda fibers: ...)
for every index i
, fibers[i]
is the fiber representing
the computation of the IO ios[i]
running in parallel/concurrently.
>>> import time
>>> def task(i: int) -> IO[None,None,None] :
>>> return io.defer(print, f"Task {i}: Begin").then(
... io.defer(time.sleep, 1),
... io.defer(print, f"Task {i}: End")
... )
>>> main : IO[None,None,None] = (
... io.parallel([task(i) for i in range(6)])
... .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 4: Begin
Task 2: Begin
Finished
Task 5: Begin
Task 0: End
Task 1: End
Task 3: End
Task 4: End
Task 2: End
Task 5: End
Ok(success=None)
As you can see, main
does not wait for the IO
s running
in parallel/concurrently to continue its execution.
wait
: waiting for concurrent tasks to end
Sometimes you want to wait for a parallel/concurrent computation
to finish. Remember that parallel/concurrent computation are
represented by the fibers returned by io.parallel
.
To wait for some fibers to finish, just call io.wait
with
the list of fibers you want to wait on. The result of wait
is the list of all the fibers results (of type Result[E,A]
).
For example, in
io.wait(fibers).flat_map(lambda results: ...)
for any index i
, result[i]
of type Result[E,A]
is the result
of the computation represented by the fiber fibers[i]
.
>>> main : IO[None,None,None] = (
... io.parallel([task(i) for i in range(6)])
... .flat_map(lambda fibers: io.wait(fibers))
... .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 5: Begin
Task 4: Begin
Task 2: Begin
Task 0: End
Task 3: End
Task 1: End
Task 5: End
Task 4: End
Task 2: End
Finished
Ok(success=None)
yield_
: letting other task progress
Remember that an IO
runs on a pool of thread.
There there is more IO
s to run than the number of threads to run on,
there is a chance that some IO
will not get executed.
An IO
can explicitly release its thread for a moment to let other
tasks a chance to progress.
Call io.yield_
to release the current thread. The IO
will make
a break and continue its execution later.
>>> main : IO[None,None,None] = io.defer(print, "Hello").then(
... io.yield_,
... io.defer(print, "World!")
... )
>>> main.run(None)
Hello
World!
Ok(success=None)
Controlling Concurrency
Sometimes you want to prevent some fibers to run concurrently. For example you may want to avoid several fibers modifying variables at the same time or avoiding too many fibers to access some resources.
reentrant_lock
: only one fiber at a time.
The primitive resource.reentrant_lock
ensures that only one fiber can run
a portion of code at a time:
resource.reentrant_lock: IO[Any, None, Resource[Any, None, None]]
Let's take an example. The class Shared
represents any class defining mutable
objects. In our example, calling the set
method change the object's attribute
value
:
>>> from raffiot import *
>>> from typing import Any
>>>
>>> class Shared:
... def __init__(self):
... self._value = 0
...
... def get(self) -> int:
... return self._value
...
... def set(self, i: int) -> None:
... self._value = i
>>>
>>> shared_object = Shared()
The increment
IO does exactly as its name suggests: it reads the shared
object attribute value
using the method get
, wait for one second and
set the attribute with value + 1
using the set
method:
>>> increment: IO[Any,None,None] = (
... io.defer(shared_object.get)
... .flat_map(lambda value:
... io.sleep(1)
... .then(io.defer(shared_object.set, value + 1))
... )
... )
>>> shared_object.get()
0
>>> increment.run(None)
Ok(success=None)
>>> shared_object.get()
1
Running increment
several times concurrently is unsafe:
>>> shared_object.get()
1
>>> io.parallel(increment, increment).run(None)
Ok(success=...)
>>> shared_object.get()
2
Although the value
was 1 and increment
has been called twice, its final
value is 2 instead of the expected 3. The reason if the issue is they both
have read the value 1 at the same time, and so both written value + 1 == 2
instead of 3. We need to prevent one instance of increment
to run if
another one is already running. We can do so using reentrant_lock
:
>>> shared_object.get()
2
>>> resource.reentrant_lock.flat_map(lambda lock:
... io.parallel(
... lock.with_(increment),
... lock.with_(increment)
... )
>>> ).run(None)
Ok(success=[...])
>>> shared_object.get()
4
reentrant_lock
gives us a lock
which is a Resource
. The two instances
of increment
still runs in parallel, but inside a lock.with_(an_io)
which
prevent them from running at the same time. The first instance to take the lock
forces the second one to wait it releases it.
You will learn more about Resource
in the section
Resource Management, but for now just remember that you can
prevent some fibers to run concurrently by creating a lock with reentrant_lock
and using lock.with_
to wrap portion of the code you want to avoid being
accessed concurrently. The type Resource
is used to ensure that the lock will
always be released, even if the computation fails.
Note: every call to reentrant_lock
gives back a different lock.
Unlike the python equivalent
threading.Lock
,
Raffiot's locks do not block threads, they only block fibers.
In addition, the these locks are reentrant, which means that the fiber that have the lock can still acquire it without blocking.
semaphore
: limited resource.
The primitive resource.semaphore
is useful to simulate limited resources.
Imagine you have to call an API for which it is forbidden to make more than
n concurrent calls, semaphore
is the way to go:
resource.semaphote(tokens: int): IO[Any, None, Resource[Any, None, None]]
The parameter tokens
is the number of fibers the semaphore will allow to run
concurrently:
>>> from raffiot import *
>>> from typing import Any
>>>
>>> def fiber(sem, i:int) -> IO[Any, None, None]:
... return sem.with_(io.defer(print, f"Fiber {i} running!"))
>>>
>>> resource.semaphore(5).flat_map(lambda sem:
... io.parallel([fiber(sem, i) for i in range(100)])
... ).run(None)
Even though there are 100 fibers running concurrently, there will be only 5
concurrent calls to print
.
Time
Time functions enable you to schedule some computations in the future or to stop a fiber until a point in time is reached.
sleep
: making a break
To pause an IO
for some time, just call io.sleep
with the number of seconds
you want the IO
paused:
>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> main : IO[None, None, None] = now.then(io.sleep(2), now)
>>> main.run(None)
1615136436.7838593
1615136438.785897
Ok(success=None)
Calling io.sleep(0)
does nothing. The IO
is guaranteed to be paused for at
least the time you requested, but it may sleep longer! Especially when threads
are busy.
sleep_until
: waking up in the future.
To pause an IO
until some determined time in the future, call io.sleep_until
with the desired epoch:
>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> time()
1615136688.9909387
>>> main : IO[None, None, None] = now.then(io.sleep_until(1615136788), now)
>>> main.run(None)
1615136713.6873975
1615136788.0037072
Ok(success=None)
Calling io.sleep_until
with an epoch in the past does nothing.
The IO
is guaranteed to be paused until the epoch you requested is reached but
it can sleep longer! Especially when threads are busy.
Resource Management
Resource management is the safe creation and release of resources: files, connection handlers, etc. Resource management ensures all created resources will be released this avoiding resource leaks.
Consider the code below accessing a database:
>>> connection = connect_to_database()
>>> connection.exectute_sql_query(...)
>>> connection.close()
If the execution of the SQL query raise an exception, the connection is never closed. Having too many unused connection opened may forbid other parts of the code from creating new connections, or may slow down the database or even crash the application.
Fortunately Python has a built-in support for resource management
with the with
syntax:
>>> with connect_to_database() as connection:
... connection.exectute_sql_query(...)
Sometimes the resource you want to create depends on another resource. For example the connection configuration to the database could be stored in a configuration file that need to be opened and closed:
>>> with open_config_file() as file_content:
... config = read_config(file_content)
... with connect_to_database(config) as connection:
... connection.exectute_sql_query(...)
Once again Python's with
-statement covers this case nicely.
But there are two issues with Python's built-in resource management:
- it is not trivial to pack two dependent resources into one.
- it is not trivial to create your own resources.
This is where Raffiot's resource management comes in.
Creating a Resource
is as simple as providing a function to create
the resource and one to release it. You can also lift any Python's
"with
-enabled" resource to Raffiot's Resource
by a single
function call.
Resource
do compose very well too with the same API as IO
.
In fact Resource
is built upon IO
. It has almost all of its
functionalities. Here are the imports for this section:
>>> from raffiot import *
>>> from typing import Tuple, List, Any
Let's start by defining an IO
that generates a random string
every time it runs:
>>> import random
>>> import string
>>> rnd_str : IO[None, None, str] = (
... io.defer(lambda:
... ''.join(
... random.choice(string.ascii_uppercase + string.digits)
... for _ in range(8)
... )
... )
... )
>>> rnd_str.run(None)
Ok(success='CCQN80YY')
>>> rnd_str.run(None)
Ok(success='5JEOGVZS')
>>> rnd_str.run(None)
Ok(success='ZNLWSH1B')
>>> rnd_str.run(None)
Ok(success='MENS91RD')
from_open_close_io
: Creating a Resource
from open/close IO
s
The Resource
we want to define will create and print a new string
every time it is used. Releasing it will simply be printing it.
A Resource
is essentially two computations:
- one creating the resource
- one releasing the resource
Let's start by the IO
creating and printing the string:
>>> rs_open : IO[None, None, str] = (
... rnd_str.flat_map(lambda s: io.sequence(
... io.defer(print, f"Opening {s}"),
... io.pure(s)
... ))
... )
Now the function releasing the string (i.e. printing it):
>>> def rs_close(s: str, cs: ComputationStatus) -> IO[None, None, None]:
... return io.defer(print, f"Closing {s}")
The first function argument is the created resource. The second one indicates
whether the computation was successful. It can be either
ComputationStatus.SUCCEEDED
or ComputationStatus.FAILED
.
From there, creating a Resource
is as simple as a single call
to resource.from_open_close_io
:
>>> rs : Resource[None,None,str] = resource.from_open_close_io(rs_open, rs_close)
That wasn't that hard, isn't it?
use
: using a resource
Now that we have a Resource
, we want to use it. To do so, just
call the method use
. You need to give it a function taking as
argument the resource created and retuning an IO
that used this
resource. The result is an IO
:
>>> io_ok : IO[None,None,int] = rs.use(lambda s: io.pure(5))
>>> io_ok.run(None)
Opening B9G0G96J
Closing B9G0G96J
Ok(success=5)
As you can see a random string is created and released.
The result is an IO
whose result is the result of the inner IO
.
>>> io_error : IO[None,None,Any] = rs.use(lambda s: io.error("Oups!"))
>>> io_error.run(None)
Opening R9A1YSJ3
Closing R9A1YSJ3
Error(error='Oups!')
If the inner IO
fails, the string is still released!
>>> io_panic : IO[None,None,None] = rs.use(lambda s: io.panic(Exception("BOOM!")))
>>> io_panic.run(None)
Opening N1H4A63V
Closing N1H4A63V
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='...')], errors=[])
If the inner IO
panics, the string is still released too!
Note: the with_(an_io)
method is a nice alias for use(lambda _: an_io)
.
map
, flat_map
, defer
, async_
and others.
Resource
supports almost the same API as IO
.
It includes map
, flat_map
, defer
, zip
, etc.
It means, for example, that you can create resources in parallel,
or simply create a list of resources (if one fails, all fails),
etc.
lift_io
: from IO
to Resource
Actually, any IO[E,E,A]
can be lifted into a Resource[R,E,A]
.
The releasing function is just a no-op. It brings a lot of expressiveness
and safety to resource creation:
>>> rs : Resource[None,None,None] = resource.lift_io(io.defer(print, "Hello World!"))
>>> rs.use(lambda none: io.pure(5)).run(None)
Hello World!
Ok(success=5)
from_open_close
: Resource from open and close functions
Sometimes it is easier to create a Resource
from usual Python's
functions rather than from the open/close IO
s. To do so, just
use the function resource.from_open_close
:
>>> def rs_open() -> str:
... s = ''.join(
... random.choice(string.ascii_uppercase + string.digits)
... for _ in range(8)
... )
... print(f"Opening {s}")
... return s
>>> def rs_close(s: str, cs: ComputationStatus) -> None:
... print(f"Closing {s}")
>>> rs : Resource[None,None,str] = resource.from_open_close(rs_open, rs_close)
Once again, the he first argument of rs_close
is the created resource and
the second one indicates whether the computation was successful:
ComputationStatus.SUCCEEDED
or ComputationStatus.FAILED
.
from_with
: Resource from with
If the resource you want to create already support Python's
with
-statement, then you're lucky: you just have to make
one single call to resource.from_with
>>> rs : Resource[None,None,str] = resource.from_with(io.defer(open, "hello.txt", "w"))
Creating a Resource directly
A Resource[R,E,A]
is essentially an
IO[R,E,Tuple[A, Callable[[ComputationStatus], IO[R,E,Any]]]]
.
When the IO
runs, it returns a pair
Tuple[A, Callable[[ComputationStatus], IO[R,Any,Any]]]
.
The fist member of the pair is the created resource of type A
.
The second member of the pair is the release function. Its argument is a
ComputationStatus
indicating whether the computation was successful.
It must return an IO
that perform the release of the resource.
Note that any failure encountered when releasing the resource makes the IO
to fail too.
>>> create : IO[None,None,Tuple[str, IO[None,Any,Any]]] = (
... rnd_str.flat_map(lambda filename:
... io.defer(print, f"Opening {filename}").map(lambda file:
... (file, lambda computationStatus: io.defer(print, f"Closing {filename}"))
... )
... )
... )
>>> rs : Resource[None,None,str] = Resource(create)
Use Case : Temporary File
This is a complete use case of a Resource
creating a random file.
>>> from io import TextIOWrapper
>>> create : IO[None,None,Tuple[TextIOWrapper, IO[None,None,None]]] = (
... rnd_str
... .flat_map(lambda filename:
... io.defer(print, f"Opening {filename}")
... .then(io.defer(open, filename, "w"))
... .map(lambda file:
... ( file,
... lambda computationStatus: io.defer(print, f"Closing {filename}")
... .then(io.defer(file.close))
... )
... )
... )
... )
>>> rs : Resource[None,None,TextIOWrapper] = Resource(create)
>>> io_ok : IO[None,None,None] = (
... rs.use(lambda file:
... io.defer(file.write, "Hello World!")
... )
... )
>>> io_ok.run(None)
Opening 6E21M413
Closing 6E21M413
Ok(success=12)
>>> io_error : IO[None,None,None] = (
... rs.use(lambda file:
... io.defer(file.write, "Hello World!")
... .then(io.error("Oups!"))
... )
... )
>>> io_error.run(None)
Opening R9A1YSJ3
Closing R9A1YSJ3
Errors(errors=['Oups!'])
>>> io_panic : IO[None,None,None] = (
... rs.use(lambda file:
... io.defer(file.write, "Hello World!")
... .then(io.panic(Exception("BOOM!")))
... )
... )
>>> io_panic.run(None)
Opening 5YZ02058
Closing 5YZ02058
Panic(exceptions=[TracedException(exception=Exception('BOOM!'),
stack_trace='...')], errors=[])
Variables
IO
is heavily expression based but Python does not allow to define local
variables in expressions. Raffiot provides two types to work around this
limitation: Val
and Var
. A Val
is an immutable variable while a Var
is
a mutable variable.
Immutable Variables
Immutable variables can be created very easily by using the class Val
:
>>> from raffiot import *
>>> Val(5)
Val(value=5)
Val
has most of the usual operations like map
, flat_map
, zip
, etc.
Please have a look the the API for a complete list of
methods.
For example, Python's forbids this syntax:
>>> lambda x:
>>> y = x + 5
>>> print(f"y={y}")
because it only allows lambdas to contain one single expression. To simulate this behaviour, you can use:
>>> lambda x: Val(x+5).map(lambda y: print(f"y={y}"))
We totally agree that the first syntax is far superior, but until Python allows lambda to declare local variables, this is probably the best we can get.
Mutable Variables
Raffiot's mutable variable are not only mutable variables. They are designed
to play nicely with concurrency. Every access to the a Var
is exclusive,
which means only one fiber is allowed to read or modify the variable at any
time.
Var.create
, Var.create_rs
: the only ways to create a mutable variable
Because a Var
is not just a value, it can only be created either by the IO
Var.create(a:A)
of type IO[Any,None,Var[A]]
or the Resource
Var.create(a:A)
of type Resource[Any,None,Var[A]]
:
>>> from typing import Any
>>> main: IO[Any,None,None] = (
... Var.create(5).flat_map(lambda var1: var1.set(7).then(var1.get()))
... )
>>> main.run(None)
Ok(success=7)
>>> main_rs: Resource[Any,None,None] = (
... Var.create_rs(5).flat_map(lambda var1: var1.set_rs(7).then(var1.get_rs()))
... )
>>> main_rs.use(io.pure).run(None)
Ok(success=7)
As you can see, you can use the methods get
/set
to get/set the current value
as an IO
or get_rs
/set_rs
to get/set the current value as a Resource
.
Alternatively, you can use the method get_and_set
and get_and_set_rs
to
assign a new value to the variable and get the previous value at the same time.
update
: modifying the current value
When you need to update the current value of a variable var:Var[A]
, use the
update
method.
It takes as input a function f: Callable[[A], Tuple[A,B]]
. This function will
receive the current value of the variable and must return a pair
(new_value, returned)
. The new_value
will become the new value of the
variable. The value returned
serves to output some value if you want to.
>>> main: IO[Any,None,None] = (
... Var.create(5).flat_map(lambda v: v.update(lambda x: (x+1, 2*x)))
... )
>>> main.run(None)
Ok(success=UpdateResult(old_value=5, new_value=6, returned=10))
The functions update_io
and update_rs
are respectively for IO and
Resource functions.
traverse
: creating a new variable from another one.
The traverse
methods create a new variable using an existing one:
>>> main: IO[Any,None,None] = (
... Var.create(5)
... .flat_map(lambda var1: var1.traverse(lambda x: io.defer(print, x)
... .then(io.pure(x+1))
... )
... .flat_map(lambda var2: var2.get())
... )
... )
5
Ok(success=6)
zip
: Getting the values of a group of variables
The zip
class method regroup the values a list of variable. All concurrent
access to the variables are forbidden during the zip to ensure consistent
reading of the variables values:*
>>> main: IO[Any,None,None] = (
... io.zip(Var.create(0), Var.create(1))
... .flat_map(lambda vars:
... Var.zip(vars[0], vars[1])
... )
... )
>>> main.run(None)
Ok(success=[0, 1])
Note: the zip_with
instance method is equivalent.
ap
: Combining the values of a group of variables
The zip
class method regroup the values a list of variable. All concurrent
access to the variables are forbidden during the zip to ensure consistent
reading of the variables values:*
>>> main: IO[Any,None,None] = (
... Var.create(lambda x, y: x + y)
... .flat_map(lambda var_fun:
... io.zip(Var.create(2), Var.create(3))
... .flat_map(lambda vars:
... var_fun.ap(vars[0], vars[1])
... )
... )
... )
>>> main.run(None)
Ok(success=5)