Asynchronous and Concurrent Programming

For now you know that an IO is very nice for stack safety, dependency injection, failure management, and code-as-date manipulations. There is another big feature IO has: simple asynchronous and concurrent programming.

run : the second and third argument.

An IO is executed on a pool of threads. Until now we only gave io.run one argument: the context. But io.run accepts three arguments! The second one is the number of threads in the pool and the third one is how long a thread goes to sleep when idle.

The number of threads in the pool is fixed so you should never call a blocking function inside one of the pool's thread. Create a new thread and run the blocking operation inside using async_.

When one of thread has no fibers to run, it call time.sleep to avoid wasting precious CPU cycles doing nothing. The third parameter of run is the amount of time an idle thread sleeps (a float of the number of seconds to sleep).

Because of the infamous Python's Global Interpreter Lock Python can not run thread in parallel. So if your code only uses one 100% of a single core, this is normal. You know the story: Python is single-threaded.

To use n thread with an idle time of idle_time seconds, just give n and idle_time to io.run:

>>> io.pure(5).run(None, 50, 0.01)
Ok(success=5)

Asynchronous Programming

A call to some function is called synchronous when the thread making the call actually waits for the call to return a value. This is annoying because the thread could be used to perform useful computations instead of just waiting.

On the contrary, a call is said asynchronous when the tread making the call does not wait for the call to finish but run useful computations in the mean time.

The notorious expression callback Hell kindly expresses how asynchronous programming can be error-prone, hard to write and hard to read.

Asynchronous programming is all about callbacks, but fortunately, programming models were created to hide much of its complexity under a clean and simple interface. The famous Promise of JavaScript is such an interface. The async/await syntax of many languages, including Python, is also such an interface. So is Raffiot's IO. But unlike the async/await syntax, synchronous and asynchronous code can be transparently mixed with IO.

async_ : running something asynchronously

Calling a function f usually looks like this:

>>> def f():
...   print("f is running")
...   return 3
>>> def main():
...   print("f not started yet")
...   result = f()
...   print(f"f finished and returned {result}")
>>> main()
f not started yet
f is running
f finished and returned 3

When the function main calls f, it waits for f to finish. When f finishes, main resumes its computation with the result of f.

Asynchronous functions, like apply_async do not work this way. Calling an asynchronous function fasync usually looks like this.

>>> import time
>>> from multiprocessing import Pool
>>>
>>> def f():
...  print("f is running")
...  return 3
>>>
>>> with Pool(4) as pool: 
...   def fasync(callback):
...     pool.apply_async(f, callback = callback)
...   
...   def main():
...     print("fasync not started yet")
...   
...     def callback(result):
...       print(f"fasync finished and returned {result}")
...     
...     fasync(callback)
...     print("fasync started")
...    
...   main()
...   time.sleep(0.5)
fasync not started yet
fasync started
f is running
fasync finished and returned 3

As you can seen, the function main does not wait that f finishes but continues its execution printing fasync started. The function main can not get the result of f so it defines a function, called a callback, to process the result of when it finishes.

With Raffiot's IO you would write:

>>> import time
>>> from multiprocessing import Pool
>>> from raffiot import *
>>>   
>>> def f():
...   print("f is running")
...   return 3
>>>  
>>> with Pool(4) as pool: 
...   f_io : IO[None,None,int] = (
...     io.async_(
...       lambda r, k:
...         pool.apply_async(f, callback = lambda r: k(Ok(r)))
...     )
...   )
... 
...   main : IO[None,None,None] = io.sequence(
...     io.defer(print, "fasync not started yet"),
...     f_io.flat_map(lambda result:
...       io.defer(print, f"fasync finished and returned {result}")
...     ),
...     io.defer(print, "fasync started")
...   )
... 
...   main.run(None)
fasync not started yet
f is running
fasync finished and returned 3
fasync started

Concurrent Programming

Concurrent programming is about running things "in parallel". Raffiot can run a large number of concurrent computation simply and safely:

parallel : running concurrent tasks

The function io.parallel runs a list of IOs in parallel. Remember that because of Python's Global Interpreter Lock only one thread executing Python's code can be running at any time. But your code involves a lot of primitives written in C/C++/etc, then you might get lucky and use all of your cores.

parallel returns a list of values called fibers. A fiber represents a tasks running in parallel/concurrently. Every fiber in the returned list correspond to the IO at the same location in the argument list. For example in

io.parallel(ios).flat_map(lambda fibers: ...)

for every index i, fibers[i] is the fiber representing the computation of the IO ios[i] running in parallel/concurrently.

>>> import time
>>> def task(i: int) -> IO[None,None,None] :
>>>   return io.defer(print, f"Task {i}: Begin").then(
...     io.defer(time.sleep, 1),
...     io.defer(print, f"Task {i}: End")
...   )
>>> main : IO[None,None,None] = (
...   io.parallel([task(i) for i in range(6)])
...   .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 4: Begin
Task 2: Begin
Finished
Task 5: Begin
Task 0: End
Task 1: End
Task 3: End
Task 4: End
Task 2: End
Task 5: End
Ok(success=None)

As you can see, main does not wait for the IOs running in parallel/concurrently to continue its execution.

wait : waiting for concurrent tasks to end

Sometimes you want to wait for a parallel/concurrent computation to finish. Remember that parallel/concurrent computation are represented by the fibers returned by io.parallel.

To wait for some fibers to finish, just call io.wait with the list of fibers you want to wait on. The result of wait is the list of all the fibers results (of type Result[E,A]). For example, in

io.wait(fibers).flat_map(lambda results: ...)

for any index i, result[i] of type Result[E,A] is the result of the computation represented by the fiber fibers[i].

>>> main : IO[None,None,None] = (
...   io.parallel([task(i) for i in range(6)])
...   .flat_map(lambda fibers: io.wait(fibers))
...   .then(io.defer(print,"Finished")))
>>> main.run(None)
Task 0: Begin
Task 1: Begin
Task 3: Begin
Task 5: Begin
Task 4: Begin
Task 2: Begin
Task 0: End
Task 3: End
Task 1: End
Task 5: End
Task 4: End
Task 2: End
Finished
Ok(success=None)

yield_ : letting other task progress

Remember that an IO runs on a pool of thread. There there is more IOs to run than the number of threads to run on, there is a chance that some IO will not get executed. An IO can explicitly release its thread for a moment to let other tasks a chance to progress.

Call io.yield_ to release the current thread. The IO will make a break and continue its execution later.

>>> main : IO[None,None,None] = io.defer(print, "Hello").then(
...   io.yield_,
...   io.defer(print, "World!") 
... )
>>> main.run(None)
Hello
World!
Ok(success=None)

Controlling Concurrency

Sometimes you want to prevent some fibers to run concurrently. For example you may want to avoid several fibers modifying variables at the same time or avoiding too many fibers to access some resources.

reentrant_lock: only one fiber at a time.

The primitive resource.reentrant_lock ensures that only one fiber can run a portion of code at a time:

resource.reentrant_lock: IO[Any, None, Resource[Any, None, None]]

Let's take an example. The class Shared represents any class defining mutable objects. In our example, calling the set method change the object's attribute value:

>>> from raffiot import *
>>> from typing import Any
>>>  
>>> class Shared:
...     def __init__(self):
...         self._value = 0
...  
...     def get(self) -> int:
...         return self._value
...  
...     def set(self, i: int) -> None:
...         self._value = i
>>>  
>>> shared_object = Shared()

The increment IO does exactly as its name suggests: it reads the shared object attribute value using the method get, wait for one second and set the attribute with value + 1 using the set method:

>>> increment: IO[Any,None,None] = (
...   io.defer(shared_object.get)
...     .flat_map(lambda value:
...       io.sleep(1)
...         .then(io.defer(shared_object.set, value + 1))
...     )
... )
>>> shared_object.get()
0
>>> increment.run(None)
Ok(success=None)
>>> shared_object.get()
1

Running increment several times concurrently is unsafe:

>>> shared_object.get()
1
>>> io.parallel(increment, increment).run(None)
Ok(success=...)
>>> shared_object.get()
2

Although the value was 1 and increment has been called twice, its final value is 2 instead of the expected 3. The reason if the issue is they both have read the value 1 at the same time, and so both written value + 1 == 2 instead of 3. We need to prevent one instance of increment to run if another one is already running. We can do so using reentrant_lock:

>>> shared_object.get()
2
>>> resource.reentrant_lock.flat_map(lambda lock:
...   io.parallel(
...     lock.with_(increment),
...     lock.with_(increment)
...   )
>>> ).run(None)
Ok(success=[...])
>>> shared_object.get()
4

reentrant_lock gives us a lock which is a Resource. The two instances of increment still runs in parallel, but inside a lock.with_(an_io) which prevent them from running at the same time. The first instance to take the lock forces the second one to wait it releases it.

You will learn more about Resource in the section Resource Management, but for now just remember that you can prevent some fibers to run concurrently by creating a lock with reentrant_lock and using lock.with_ to wrap portion of the code you want to avoid being accessed concurrently. The type Resource is used to ensure that the lock will always be released, even if the computation fails.

Note: every call to reentrant_lock gives back a different lock.

Unlike the python equivalent threading.Lock, Raffiot's locks do not block threads, they only block fibers.

In addition, the these locks are reentrant, which means that the fiber that have the lock can still acquire it without blocking.

semaphore: limited resource.

The primitive resource.semaphore is useful to simulate limited resources.

Imagine you have to call an API for which it is forbidden to make more than n concurrent calls, semaphore is the way to go:

resource.semaphote(tokens: int): IO[Any, None, Resource[Any, None, None]]

The parameter tokens is the number of fibers the semaphore will allow to run concurrently:

>>> from raffiot import *
>>> from typing import Any
>>>  
>>> def fiber(sem, i:int) -> IO[Any, None, None]:
...   return sem.with_(io.defer(print, f"Fiber {i} running!"))
>>>  
>>> resource.semaphore(5).flat_map(lambda sem:
...   io.parallel([fiber(sem, i) for i in range(100)])
... ).run(None)

Even though there are 100 fibers running concurrently, there will be only 5 concurrent calls to print.

Time

Time functions enable you to schedule some computations in the future or to stop a fiber until a point in time is reached.

sleep: making a break

To pause an IO for some time, just call io.sleep with the number of seconds you want the IO paused:

>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> main : IO[None, None, None] = now.then(io.sleep(2), now)
>>> main.run(None)
1615136436.7838593
1615136438.785897
Ok(success=None)

Calling io.sleep(0) does nothing. The IO is guaranteed to be paused for at least the time you requested, but it may sleep longer! Especially when threads are busy.

sleep_until: waking up in the future.

To pause an IO until some determined time in the future, call io.sleep_until with the desired epoch:

>>> from time import time
>>> now : IO[None, None, None] = io.defer(time).flat_map(lambda t: io.defer(print, t))
>>> time()
1615136688.9909387
>>> main : IO[None, None, None] = now.then(io.sleep_until(1615136788), now)
>>> main.run(None)
1615136713.6873975
1615136788.0037072
Ok(success=None)

Calling io.sleep_until with an epoch in the past does nothing. The IO is guaranteed to be paused until the epoch you requested is reached but it can sleep longer! Especially when threads are busy.