Module raffiot.var

Local Variables to work around Python annoying limitations about lambdas.

Python forbids local variables and

Expand source code
"""
Local Variables to work around Python annoying limitations about lambdas.

Python forbids local variables and
"""

from __future__ import annotations

from collections import abc
from dataclasses import dataclass
from typing import Generic, TypeVar, Callable, List, Any, Tuple

from typing_extensions import final

from raffiot import io, resource
from raffiot.io import IO
from raffiot.resource import Resource

__all__ = [
    "UpdateResult",
    "Var",
]


A = TypeVar("A")
B = TypeVar("B")


@final
@dataclass
class UpdateResult(Generic[A, B]):
    """
    The result of the `update` methods of `Var`.
    """

    __slots__ = "old_value", "new_value", "returned"

    """
    The result of an Update
    """

    old_value: A
    """
    The value before the Update
    """

    new_value: A
    """
    The value after the update
    """

    returned: B
    """
    The result produced by the update
    """


@final
class Var(Generic[A]):
    """
    A mutable variable.
    **All** concurrent access are protected using a Reentrant Lock.

    **IMPORTANT:** /!\\ **NEVER CREATE VARIABLES BY THE CONSTRUCTOT !!!!** /!\\

    Use `Var.create instead.`
    """

    __slots__ = "_lock", "_value"

    def __init__(self, lock: Resource[Any, None, None], value: A):
        self._lock = lock
        self._value = value

    def lock(self) -> Resource[Any, None, None]:
        """
        The Reentrant Lock that guarantees exclusive access to this variable.
        """
        return self._lock

    @classmethod
    def create(cls, a: A) -> IO[Any, None, Var[A]]:
        """
        Create a new variable whose value is `a`.

        :param a:
        :return:
        """
        return resource.reentrant_lock.map(lambda lock: Var(lock, a))

    @classmethod
    def create_rs(cls, a: A) -> Resource[Any, None, Var[A]]:
        """
        Create a new variable whose value is `a`.

        :param a:
        :return:
        """
        return resource.lift_io(resource.reentrant_lock.map(lambda lock: Var(lock, a)))

    #############
    #   GETTER  #
    #############

    def get(self) -> IO[None, None, A]:
        """
        Get the current value of this variable.
        :return:
        """
        return self._lock.with_(io.defer(lambda: self._value))

    def get_rs(self) -> Resource[None, None, A]:
        """
        Get the current value of this variable.
        :return:
        """
        return self._lock.then(resource.defer(lambda: self._value))

    #############
    #   SETTER  #
    #############

    def set(self, v: A) -> IO[None, None, None]:
        """
        Assign a new value to this variable.

        :param v:
        :return:
        """

        def h(w) -> None:
            self._value = w

        return self._lock.with_(io.defer(h, v))

    def set_rs(self, v: A) -> Resource[None, None, None]:
        """
        Assign a new value to this variable.

        :param v:
        :return:
        """

        def h(w) -> None:
            self._value = w

        return self._lock.then(resource.defer(h, v))

    #####################
    #   GETTER + SETTER #
    #####################

    def get_and_set(self, v: A) -> IO[Any, None, A]:
        """
        Assign a new value to this variable. The previous value is returned.

        :param v:
        :return:
        """

        def h():
            old_value = self._value
            self._value = v
            return old_value

        return self._lock.with_(io.defer(h))

    def get_and_set_rs(self, v: A) -> Resource[None, None, None]:
        """
        Assign a new value to this variable. The previous value is returned.

        :param v:
        :return:
        """

        def h():
            old_value = self._value
            self._value = v
            return old_value

        return self._lock.then(resource.defer(h))

    ###############
    #    UPDATE   #
    ###############

    def update(self, f: Callable[[A], Tuple[A, B]]) -> IO[Any, Any, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> UpdateResult[A, B]:
            old_value = self._value
            new_value, ret = f(self._value)
            self._value = new_value
            return UpdateResult(old_value, new_value, ret)

        return self._lock.with_(io.defer(h))

    def update_io(
        self, f: Callable[[A], IO[Any, None, Tuple[A, B]]]
    ) -> IO[Any, None, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> IO[Any, None, UpdateResult[A, B]]:
            old_value = self._value

            def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
                self._value = x[0]
                return UpdateResult(old_value, self._value, x[1])

            return f(old_value).map(g)

        return self._lock.with_(io.defer_io(h))

    def update_rs(
        self, f: Callable[[A], Resource[Any, None, Tuple[A, B]]]
    ) -> Resource[Any, None, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> Resource[Any, None, UpdateResult[A, B]]:
            old_value = self._value

            def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
                self._value = x[0]
                return UpdateResult(old_value, self._value, x[1])

            return f(old_value).map(g)

        return self._lock.then(resource.defer_resource(h))

    ######################
    #  Creating New Vars #
    ######################

    def traverse(self, f: Callable[[A], IO[Any, None, B]]) -> IO[Any, None, Var[B]]:
        """
        Create a new variable by transforming the current value of this variable.

        :param f:
        :return:
        """

        def h():
            return f(self._value).flat_map(Var.create)

        return self._lock.with_(io.defer_io(h))

    @classmethod
    def zip(cls, *vars: Var[A]) -> IO[Any, None, List[A]]:
        """ "
        Group these variables current values into a list.
        """

        if len(vars) == 1 and isinstance(vars[0], abc.Iterable):
            args = vars[0]
        else:
            args = vars
        return resource.zip([x._lock for x in args]).with_(
            io.defer(lambda: [x._value for x in args])
        )

    def zip_with(self, *vars: Var[A]) -> IO[Any, None, List[A]]:
        """
        Group this variable current value with `vars` variable current values.

        :param vals: other variables to combine with self.
        :return:
        """
        return Var.zip(self, *vars)

    def ap(self, *arg: Var[A]) -> IO[Any, None, B]:
        """
        Apply the function contained in this variable to `args` variables.

        :param arg:
        :return:
        """
        return self.zip_with(*arg).map(lambda l: l[0](*l[1:]))  # type: ignore

Classes

class UpdateResult (old_value: A, new_value: A, returned: B)

The result of the update methods of Var.

Expand source code
@final
@dataclass
class UpdateResult(Generic[A, B]):
    """
    The result of the `update` methods of `Var`.
    """

    __slots__ = "old_value", "new_value", "returned"

    """
    The result of an Update
    """

    old_value: A
    """
    The value before the Update
    """

    new_value: A
    """
    The value after the update
    """

    returned: B
    """
    The result produced by the update
    """

Ancestors

  • typing.Generic

Instance variables

var new_value : ~A

The value after the update

var old_value : ~A

The value before the Update

var returned : ~B

The result produced by the update

class Var (lock: Resource[Any, None, None], value: A)

A mutable variable. All concurrent access are protected using a Reentrant Lock.

IMPORTANT: /!\ NEVER CREATE VARIABLES BY THE CONSTRUCTOT !!!! /!\

Use Var.create() instead.

Expand source code
@final
class Var(Generic[A]):
    """
    A mutable variable.
    **All** concurrent access are protected using a Reentrant Lock.

    **IMPORTANT:** /!\\ **NEVER CREATE VARIABLES BY THE CONSTRUCTOT !!!!** /!\\

    Use `Var.create instead.`
    """

    __slots__ = "_lock", "_value"

    def __init__(self, lock: Resource[Any, None, None], value: A):
        self._lock = lock
        self._value = value

    def lock(self) -> Resource[Any, None, None]:
        """
        The Reentrant Lock that guarantees exclusive access to this variable.
        """
        return self._lock

    @classmethod
    def create(cls, a: A) -> IO[Any, None, Var[A]]:
        """
        Create a new variable whose value is `a`.

        :param a:
        :return:
        """
        return resource.reentrant_lock.map(lambda lock: Var(lock, a))

    @classmethod
    def create_rs(cls, a: A) -> Resource[Any, None, Var[A]]:
        """
        Create a new variable whose value is `a`.

        :param a:
        :return:
        """
        return resource.lift_io(resource.reentrant_lock.map(lambda lock: Var(lock, a)))

    #############
    #   GETTER  #
    #############

    def get(self) -> IO[None, None, A]:
        """
        Get the current value of this variable.
        :return:
        """
        return self._lock.with_(io.defer(lambda: self._value))

    def get_rs(self) -> Resource[None, None, A]:
        """
        Get the current value of this variable.
        :return:
        """
        return self._lock.then(resource.defer(lambda: self._value))

    #############
    #   SETTER  #
    #############

    def set(self, v: A) -> IO[None, None, None]:
        """
        Assign a new value to this variable.

        :param v:
        :return:
        """

        def h(w) -> None:
            self._value = w

        return self._lock.with_(io.defer(h, v))

    def set_rs(self, v: A) -> Resource[None, None, None]:
        """
        Assign a new value to this variable.

        :param v:
        :return:
        """

        def h(w) -> None:
            self._value = w

        return self._lock.then(resource.defer(h, v))

    #####################
    #   GETTER + SETTER #
    #####################

    def get_and_set(self, v: A) -> IO[Any, None, A]:
        """
        Assign a new value to this variable. The previous value is returned.

        :param v:
        :return:
        """

        def h():
            old_value = self._value
            self._value = v
            return old_value

        return self._lock.with_(io.defer(h))

    def get_and_set_rs(self, v: A) -> Resource[None, None, None]:
        """
        Assign a new value to this variable. The previous value is returned.

        :param v:
        :return:
        """

        def h():
            old_value = self._value
            self._value = v
            return old_value

        return self._lock.then(resource.defer(h))

    ###############
    #    UPDATE   #
    ###############

    def update(self, f: Callable[[A], Tuple[A, B]]) -> IO[Any, Any, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> UpdateResult[A, B]:
            old_value = self._value
            new_value, ret = f(self._value)
            self._value = new_value
            return UpdateResult(old_value, new_value, ret)

        return self._lock.with_(io.defer(h))

    def update_io(
        self, f: Callable[[A], IO[Any, None, Tuple[A, B]]]
    ) -> IO[Any, None, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> IO[Any, None, UpdateResult[A, B]]:
            old_value = self._value

            def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
                self._value = x[0]
                return UpdateResult(old_value, self._value, x[1])

            return f(old_value).map(g)

        return self._lock.with_(io.defer_io(h))

    def update_rs(
        self, f: Callable[[A], Resource[Any, None, Tuple[A, B]]]
    ) -> Resource[Any, None, UpdateResult[A, B]]:
        """
        Update the value contained in this variable.

        :param f:
        :return:
        """

        def h() -> Resource[Any, None, UpdateResult[A, B]]:
            old_value = self._value

            def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
                self._value = x[0]
                return UpdateResult(old_value, self._value, x[1])

            return f(old_value).map(g)

        return self._lock.then(resource.defer_resource(h))

    ######################
    #  Creating New Vars #
    ######################

    def traverse(self, f: Callable[[A], IO[Any, None, B]]) -> IO[Any, None, Var[B]]:
        """
        Create a new variable by transforming the current value of this variable.

        :param f:
        :return:
        """

        def h():
            return f(self._value).flat_map(Var.create)

        return self._lock.with_(io.defer_io(h))

    @classmethod
    def zip(cls, *vars: Var[A]) -> IO[Any, None, List[A]]:
        """ "
        Group these variables current values into a list.
        """

        if len(vars) == 1 and isinstance(vars[0], abc.Iterable):
            args = vars[0]
        else:
            args = vars
        return resource.zip([x._lock for x in args]).with_(
            io.defer(lambda: [x._value for x in args])
        )

    def zip_with(self, *vars: Var[A]) -> IO[Any, None, List[A]]:
        """
        Group this variable current value with `vars` variable current values.

        :param vals: other variables to combine with self.
        :return:
        """
        return Var.zip(self, *vars)

    def ap(self, *arg: Var[A]) -> IO[Any, None, B]:
        """
        Apply the function contained in this variable to `args` variables.

        :param arg:
        :return:
        """
        return self.zip_with(*arg).map(lambda l: l[0](*l[1:]))  # type: ignore

Ancestors

  • typing.Generic

Static methods

def create(a: A) ‑> IO[typing.Any, NoneType, Var[~A]]

Create a new variable whose value is a.

:param a: :return:

Expand source code
@classmethod
def create(cls, a: A) -> IO[Any, None, Var[A]]:
    """
    Create a new variable whose value is `a`.

    :param a:
    :return:
    """
    return resource.reentrant_lock.map(lambda lock: Var(lock, a))
def create_rs(a: A) ‑> Resource[typing.Any, NoneType, Var[~A]]

Create a new variable whose value is a.

:param a: :return:

Expand source code
@classmethod
def create_rs(cls, a: A) -> Resource[Any, None, Var[A]]:
    """
    Create a new variable whose value is `a`.

    :param a:
    :return:
    """
    return resource.lift_io(resource.reentrant_lock.map(lambda lock: Var(lock, a)))
def zip(*vars: Var[A]) ‑> IO[typing.Any, NoneType, typing.List[~A]]

" Group these variables current values into a list.

Expand source code
@classmethod
def zip(cls, *vars: Var[A]) -> IO[Any, None, List[A]]:
    """ "
    Group these variables current values into a list.
    """

    if len(vars) == 1 and isinstance(vars[0], abc.Iterable):
        args = vars[0]
    else:
        args = vars
    return resource.zip([x._lock for x in args]).with_(
        io.defer(lambda: [x._value for x in args])
    )

Methods

def ap(self, *arg: Var[A]) ‑> IO[typing.Any, NoneType, ~B]

Apply the function contained in this variable to args variables.

:param arg: :return:

Expand source code
def ap(self, *arg: Var[A]) -> IO[Any, None, B]:
    """
    Apply the function contained in this variable to `args` variables.

    :param arg:
    :return:
    """
    return self.zip_with(*arg).map(lambda l: l[0](*l[1:]))  # type: ignore
def get(self) ‑> IO[NoneType, NoneType, ~A]

Get the current value of this variable. :return:

Expand source code
def get(self) -> IO[None, None, A]:
    """
    Get the current value of this variable.
    :return:
    """
    return self._lock.with_(io.defer(lambda: self._value))
def get_and_set(self, v: A) ‑> IO[typing.Any, NoneType, ~A]

Assign a new value to this variable. The previous value is returned.

:param v: :return:

Expand source code
def get_and_set(self, v: A) -> IO[Any, None, A]:
    """
    Assign a new value to this variable. The previous value is returned.

    :param v:
    :return:
    """

    def h():
        old_value = self._value
        self._value = v
        return old_value

    return self._lock.with_(io.defer(h))
def get_and_set_rs(self, v: A) ‑> Resource[NoneType, NoneType, NoneType]

Assign a new value to this variable. The previous value is returned.

:param v: :return:

Expand source code
def get_and_set_rs(self, v: A) -> Resource[None, None, None]:
    """
    Assign a new value to this variable. The previous value is returned.

    :param v:
    :return:
    """

    def h():
        old_value = self._value
        self._value = v
        return old_value

    return self._lock.then(resource.defer(h))
def get_rs(self) ‑> Resource[NoneType, NoneType, ~A]

Get the current value of this variable. :return:

Expand source code
def get_rs(self) -> Resource[None, None, A]:
    """
    Get the current value of this variable.
    :return:
    """
    return self._lock.then(resource.defer(lambda: self._value))
def lock(self) ‑> Resource[typing.Any, NoneType, NoneType]

The Reentrant Lock that guarantees exclusive access to this variable.

Expand source code
def lock(self) -> Resource[Any, None, None]:
    """
    The Reentrant Lock that guarantees exclusive access to this variable.
    """
    return self._lock
def set(self, v: A) ‑> IO[NoneType, NoneType, NoneType]

Assign a new value to this variable.

:param v: :return:

Expand source code
def set(self, v: A) -> IO[None, None, None]:
    """
    Assign a new value to this variable.

    :param v:
    :return:
    """

    def h(w) -> None:
        self._value = w

    return self._lock.with_(io.defer(h, v))
def set_rs(self, v: A) ‑> Resource[NoneType, NoneType, NoneType]

Assign a new value to this variable.

:param v: :return:

Expand source code
def set_rs(self, v: A) -> Resource[None, None, None]:
    """
    Assign a new value to this variable.

    :param v:
    :return:
    """

    def h(w) -> None:
        self._value = w

    return self._lock.then(resource.defer(h, v))
def traverse(self, f: Callable[[A], IO[Any, None, B]]) ‑> IO[typing.Any, NoneType, Var[~B]]

Create a new variable by transforming the current value of this variable.

:param f: :return:

Expand source code
def traverse(self, f: Callable[[A], IO[Any, None, B]]) -> IO[Any, None, Var[B]]:
    """
    Create a new variable by transforming the current value of this variable.

    :param f:
    :return:
    """

    def h():
        return f(self._value).flat_map(Var.create)

    return self._lock.with_(io.defer_io(h))
def update(self, f: Callable[[A], Tuple[A, B]]) ‑> IO[typing.Any, typing.Any, UpdateResult[~A, ~B]]

Update the value contained in this variable.

:param f: :return:

Expand source code
def update(self, f: Callable[[A], Tuple[A, B]]) -> IO[Any, Any, UpdateResult[A, B]]:
    """
    Update the value contained in this variable.

    :param f:
    :return:
    """

    def h() -> UpdateResult[A, B]:
        old_value = self._value
        new_value, ret = f(self._value)
        self._value = new_value
        return UpdateResult(old_value, new_value, ret)

    return self._lock.with_(io.defer(h))
def update_io(self, f: Callable[[A], IO[Any, None, Tuple[A, B]]]) ‑> IO[typing.Any, NoneType, UpdateResult[~A, ~B]]

Update the value contained in this variable.

:param f: :return:

Expand source code
def update_io(
    self, f: Callable[[A], IO[Any, None, Tuple[A, B]]]
) -> IO[Any, None, UpdateResult[A, B]]:
    """
    Update the value contained in this variable.

    :param f:
    :return:
    """

    def h() -> IO[Any, None, UpdateResult[A, B]]:
        old_value = self._value

        def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
            self._value = x[0]
            return UpdateResult(old_value, self._value, x[1])

        return f(old_value).map(g)

    return self._lock.with_(io.defer_io(h))
def update_rs(self, f: Callable[[A], Resource[Any, None, Tuple[A, B]]]) ‑> Resource[typing.Any, NoneType, UpdateResult[~A, ~B]]

Update the value contained in this variable.

:param f: :return:

Expand source code
def update_rs(
    self, f: Callable[[A], Resource[Any, None, Tuple[A, B]]]
) -> Resource[Any, None, UpdateResult[A, B]]:
    """
    Update the value contained in this variable.

    :param f:
    :return:
    """

    def h() -> Resource[Any, None, UpdateResult[A, B]]:
        old_value = self._value

        def g(x: Tuple[A, B]) -> UpdateResult[A, B]:
            self._value = x[0]
            return UpdateResult(old_value, self._value, x[1])

        return f(old_value).map(g)

    return self._lock.then(resource.defer_resource(h))
def zip_with(self, *vars: Var[A]) ‑> IO[typing.Any, NoneType, typing.List[~A]]

Group this variable current value with vars variable current values.

:param vals: other variables to combine with self. :return:

Expand source code
def zip_with(self, *vars: Var[A]) -> IO[Any, None, List[A]]:
    """
    Group this variable current value with `vars` variable current values.

    :param vals: other variables to combine with self.
    :return:
    """
    return Var.zip(self, *vars)