Module Fut

module Fut: sig .. end
Future values for asynchronous programming.

Fut provides support to define and combine futures. A future is an undetermined value that becomes determined at an arbitrary point in the future. The future acts as a place-holder for the value while it is undetermined.

Promises determine the value of futures cooperatively while future queues determine the value of blocking or long running function applications with a set of concurrent workers and act as a mutual exclusion synchronization primitive. Semaphores allow to limit the resource usage of a set of determining futures.

The separate Futu library exposes Unix system calls as futures.

Consult the basics, the semantics and examples.

Release 0.0.0-72-g91e7845 - Daniel B├╝nzli <daniel.buenzl i@erratique.ch>



Futures


type 'a t 
The type for a future that determines a value of type 'a.
type 'a state = [ `Det of 'a | `Never | `Undet ] 
The type for future states. `Det v if the future determined v. `Never if the future is known to never determine. `Undet if the future is undetermined. If the state is different from `Undet, the future is set.
type 'a set = [ `Det of 'a | `Never ] 
The type for set future states. See Fut.state.
val state : 'a t -> 'a state
state f is the current state of f.

Thread-safe. This function can be called by other threads.

val state_set : 'a t -> 'a set
state_set f is the set state of f.
Raises Invalid_argument if f is not s.

Thread-safe. This function can be called by other threads.

val await : ?timeout:float -> 'a t -> 'a state
await timeout f, is like Fut.state except if f is undetermined it waits until f is set or timeout seconds passed. If timeout is unspecified, await blocks until f is set.
Raises Invalid_argument if timeout is negative.
val sync : 'a t -> 'a set
sync f is like Fut.await but blocks until f is set.
val finally : ('a -> 'b) -> 'a -> 'c t -> 'c t
finally fn v f is f but fn v is called (and its result ignored) whenever f is set and immediately if f is already set at call time. fn may safely use set_state f to determine how it set. If fn v raises an exception it is reported to the exception trap specified with Fut.Runtime.set_exn_trap.

Applicative combinators

These combinators have no effects on the determination of their arguments.

val never : unit -> 'a t
never () is a future that never determines a value.
val ret : 'a -> 'a t
ret v is a future that determines the value v immediately.
val recover : 'a t -> [ `Det of 'a | `Never ] t
recover f is a future that determines `Det v if f determined v or `Never if f was set to never determine.
val bind : 'a t -> ('a -> 'b t) -> 'b t
bind f fn is the future fn v where v is the value determined by f.
val app : ('a -> 'b) t -> 'a t -> 'b t
app ff fv is a future that determines f v where f is determined by ff and v by fv.
val map : ('a -> 'b) -> 'a t -> 'b t
map fn f is a future that determines the value fn v where v is the value determined by f.
val ignore : 'a t -> unit t
ignore f is Fut.map (fun _ -> ()) f.
val fold : ('a -> 'b -> 'a) -> 'a -> 'b t list -> 'a t
fold fn acc [f1; ...; fn] is List.fold_left fn acc [v1; ...; vn] where vi is the value determined by the future fi.
val sustain : 'a t -> 'a t -> 'a t
sustain f f' determines like f if it does and like f' if f never determines.
val first : 'a t -> 'a t -> ('a * 'a t) t
first f f' is a future that determines (v, snd) where v is the value of the first future to determine and snd is the other one. If both futures are already determined the left one is taken.
val firstl : 'a t list -> ('a * 'a t list) t
next fs is a future that determines (v, fs') where v is the first value to determine in fs and fs' are the remaining ones. If more than one future is already determined the first left one is taken.

Effectful combinators

These combinators may set their `Undetermined arguments to `Never and abort their determination.

Important. When a future is aborted, it is set to never determine and all the future it may be waiting on (and recursively) are also aborted, except those that are protected by Fut.protect. Aborting a future that is set has no effect since once the future is set it never changes again.

Warning. If the aborted future is an `Undetermined future from a future queue, there's no guarantee that the corresponding application will be/has not been executed when it is set to `Never. In any case the semantics of futures is preserved, it will switch from `Undet to `Never and remain so forever but depending on what the function application does, the world may be affected.

val abort : 'a t -> unit
abort f aborts the future f. If f was `Undetermined at application time, it is set to never determine and all the futures it is waiting on are also aborted. TODO semantics we need to define a waits(f) that is the set of of futures f waits on.
val protect : 'a t -> 'a t
protect f is a future that behaves like f but is insensitive to Fut.abort. It may of course still never determine because of its definition but its dependents are not able to abort it. f of course remains abortable.
val pick : 'a t -> 'a t -> 'a t
pick f f' is a future that determines as the first future that does and sets the other to never determine. If both futures are already determined the left one is taken. If both futures never determine, the future never determines.

TODO work out the semantics to see if it is equal to

  first f f' >>= fun (v, f) -> ignore (abort f); ret v

val pickl : 'a t list -> 'a t
pickl fs is List.fold_left Fut.pick (Fut.never ()) fs.

Promises


type 'a promise 
The type for promises. A promise can Fut.set the Fut.future it is associated to.
val promise : ?abort:(unit -> unit) -> unit -> 'a promise
promise abort () is a new promise. abort is called if the future of the promise is set to never determine via Fut.set or an effectful combinator. Once the future is set abort is eventually garbage collected. If you want to call a function whenever the future of the promise is set, use Fut.finally on the future.

Thread-safe. This function can be called from other threads.

val future : 'a promise -> 'a t
future p is the future set by promise p.

Thread-safe. This function can be called from other threads.

val set : 'a promise -> [ `Det of 'a | `Never ] -> unit
set p s sets Fut.future p to s. Does nothing if the future of p is already set.

Not thread safe, if you need to set the promise from another thread use a Fut.Runtime.action.


Future queues


type queue 
The type for future queues. A future queue determines function applications sequentially in FIFO order. Applications submitted in two different queues are determined concurrently.
module Queue: sig .. end
Future queues.
val apply : ?queue:queue ->
?abort:bool Pervasives.ref -> ('a -> 'b) -> 'a -> 'b t
apply queue abort fn v is a future that determines the value fn v on queue (defaults to Fut.Queue.concurrent). If provided, the abort argument is set to true by the runtime system if the future is set to `Never, e.g. because it was Fut.aborted; fn can consult this reference cell to stop its computation or take appropriate action.

fn can raise Fut.Never to silently set the future to never determine. Any uncaught exception of fn v also sets the future to never determine however these exceptions are reported to the exception trap specified with Fut.Runtime.set_exn_trap.

Warning.


exception Never
Never can be raised by a function application in a queue to set its corresponding future to never determine.

Timers

Timer futures determine values after a specific amount of time has passed.

Warning. These futures don't determine if Fut.await is not called regularly.

val delay : float -> float t
delay d is a value that determines diff = d - d' in d seconds where d' is the actual time the runtime system took to determine the timer. If diff is 0. the timer was on time, if positive the delay was early and if negative it was late.
val tick : float -> unit t
tick d is Fut.ignore (Fut.delay d).

Semaphores


module Sem: sig .. end
Semaphores for limiting resource usage.

Error handling and status futures


type ('a, 'b) result = [ `Error of 'b | `Ok of 'a ] 
The type for results with errors.
type ('a, 'b) status = ('a, 'b) result t 
The type for statuses. A future determining a result.
val sbind : ('a, 'c) status -> ('a -> ('b, 'c) status) -> ('b, 'c) status
val ok : 'a -> ('a, 'b) status
val error : 'b -> ('a, 'b) status

Infix operators

Use of these operators may kill a few parentheses.

val (>>=) : 'a t -> ('a -> 'b t) -> 'b t
f >>= fn is bind f fn.
val (>>|) : 'a t -> ('a -> 'b) -> 'b t
f >>| fn is map fn f.
val (>>&) : ('a, 'c) status -> ('a -> ('b, 'c) status) -> ('b, 'c) status
f >>& fn is sbind f fn.
module Op: sig .. end
Infix operators.

Runtime system


module Runtime: sig .. end
Runtime system configuration and interaction.

Basics

A future is a place-holder for a value that may become determined at an arbitrary point in the future. Futures can be combined together to define new futures that will determine whenever the underlying futures do. For example, below revolt determines once time_to_revolt does.

let time_to_revolt = Fut.tick 1.871 in
let revolt = 
  Fut.bind time_to_revolt 
    (fun () -> Fut.apply Printf.printf "Revolt!")

let () = ignore (Fut.await revolt)   
A future can be in three different states described by the type Fut.state and returned by the (non-blocking) function Fut.state. It is:

If the state of a future f is `Det or `Never we say that the future is set.

The module Fut.Ops defines a few infix operators to make the code more readable. For example the Ops.(>>=) operator can be used for Fut.bind. This makes it easier to sequence binds:

open Fut.Ops;;

let time_to_revolt d = Fut.tick d in
let revolt =
  time_to_revolt 1.871              >>= fun () ->
  Fut.apply Printf.printf "Revolt!" >>= fun () -> 
  time_to_revolt 0.72               >>= fun () -> 
  Fut.apply Printf.printf "Again!"

let () = ignore (Fut.await revolt)

Semantics

The semantic function []: 'a Fut.t -> time -> 'a Fut.state gives meaning to a future f by mapping it to a function [f] that give its state at any point in time (after its creation). We write [f]t the evaluation of this semantic function at time t.

Remember that once a future is set it never changes again. This is captured by the following axiom that holds for any future f at any time t:

The semantics of combinators are given in terms of [] by describing only the instants at which the resulting future is set. In the remaining undescribed instants the state of the future is implicitly taken to be unset, i.e. `Undetetermined. The descriptions also implicitly quantify universally over the free variable t.

Never determined futures

Fut makes a distinction between a future that is undetermined and may remain so forever and a future that is known to be never determined (e.g. Fut.never), a future with state `Never. Futures depending on a future value that `Never determines also never determine. This means that most combinator whenever they are given a future that never determines return a future that never determines.

One way to get never determined futures is by using the future exclusive choice combinator Fut.first between two futures. The first one that determines makes the other become never determined. Note that for future created with Fut.apply, the application (and its effects) may (or not) still be executed, but it's value is discarded.

Exceptions

If the computation to determine a future's value raises an exception it is set to `Never determine and the exception is logged by the runtime system to the handler provided by Fut.Runtime.set_exn_trap. The default handler is backend dependent but usually logs the exception on stderr.

There is a single exception to this behaviour. If the exception Fut.Never is raised by an application given to Fut.apply, the corresponding future never determines but the exception is not logged.

Do not rely on the fact that exceptions are automatically trapped by the system, this mechanism is in place so that unexpected exceptions do not kill the runtime system for long running programs. If you deal with a library that is designed around exceptions you should catch and handle them, if only to use Fut.never if you wish so. For example if looking up a data structure may return Not_found don't do that:

let find k m = Fut.ret (M.find k m)
This may significantly obfuscate what's happening (and won't be efficient). Explicitely handle the exception:
let find k m = try Fut.ret (M.find k m) with Not_found -> Fut.never () 
In this example, using option types may anway be clearer:
let find k m = Fut.ret (try Some (M.find k m) with Not_found -> None

Future queues

Future queues determine long-running or blocking function applications with a set of concurrent workers usually implemented as system threads.

A future queue determines the applications submitted to it through the Fut.apply function sequentially and in FIFO order. This property also makes them a mutual exclusion synchronization primitive. Applications submitted in two different queues are determined concurrently. In the following examples f1 and f2 will be set sequentially, while f3 will do so concurrently to those.

let q1 = Fut.Queue.create ()
let q2 = Fut.Queue.create ()
let f1 = Fut.apply ~queue:q1 Printf.printf "first f1"
let f2 = Fut.apply ~queue:q1 Printf.printf "then f2" 
let f3 = Fut.apply ~queue:q2 Printf.printf "f3 at anytime"
Note that the function applications are executed on another threads and as such must respect Fut's thread safety rule.

There is a special queue the concurrent queue which is the default argument of Fut.apply. This queue has no FIFO ordering constraint: anything submitted to it is determined concurrently. It is useful to just apply a long-running function to avoid starving the runtime system:

let long_work = Fut.apply long_running_function ()
It is also possible to provide a reference cell abort to Fut.apply. This reference cell will be set true by the runtime system if the future is aborted by the runtime system. This can be used to stop the application. For example:
let long_running_work abort = 
  while (not !abort || !finished) do ... done 

let abort = ref false 
let long_work = Fut.apply ~abort long_running_work abort
let _ = Fut.abort long_work (* will set abort to [true] *)
Note that the runtime system never reads abort it just sets it to true when appropriate.

Promises

Promises are used to expose the functionality of third-party libraries as futures. A call to Fut.promise returns a promise that has an associated future (returned by Fut.future). The promise can be Fut.set, which sets the associated future. See this example to see how promises can be used to expose callback based asynchronous interfaces as futures.

Note that Fut.set must be called on Fut's thread. If one needs to set futures from another thread, a Fut.Runtime.action can be used. See the next section.

Fut and system threads

Except for applications determined on future queues, Fut has a single-threaded cooperative concurency model. This means that at any time there's only one future that is trying to determine its value on Fut's thread.

Thread-safety rule. The thread-safety rule of Fut is simple. A single thread must run Fut's runtime system via a call to Fut.await and no other thread is allowed to use any function of Fut except, Fut.promise, Fut.future, Fut.state and Fut.Runtime.action.

In particular the thread-safety rule does apply to function applications performed with Fut.app as they are performed on other threads.

Fut tries to abstract away all your thread needs via the concept of future queues. Sometimes this is however infeasible for example because a library prevents you from sharing structures across threads and future queues provides you no support to control on which thread an application will happen. In that case you can still create your own system thread and interact with Fut by using Fut.promise and a Fut.Runtime.action to Fut.set it once it determined. See this example.

Before any call to Fut.apply or Runtime.set_thread_count the program is single threaded.

Fut and system processes

In general Unix.fork does not cope well with threads. The best approach is thus to fork all the processes you need before the first call to Fut.await.

Examples

Backends

TODO explain how to link against a backend.

The universal client

The echo server

Integration with asynchronous functions

Suppose we have a function to perform asynchronous reads on a regular file whose signature is as follows:

type read = [ `Error | `Read of int ]
val Aio.read : Unix.file_descr -> string -> int -> int -> (read -> unit) -> unit
where read fd s j l k tries to read l bytes from fd to store them in s starting at j calling k when it has completed.

Exposing this function call as a future is very simple. Create a promise and set it in the callback k:

val read : Unix.file_descr -> string -> int -> int -> read Fut.t
let read fd s j l = 
  let p = Fut.promise () in 
  let k r = Fut.set p (`Det r) in 
  Aio.read fd s j l k;
  Fut.future p
It is however important to make sure that the thread that calls the continuation k is Fut's thread. If this is not the case the callback k can be replaced by:
let k r = Fut.Runtime.action (fun () -> Fut.set p (`Det r))

Integration with threads

If for some reason you cannot use future queues and need to run a service in its own thread, the code below shows how to interact with Fut and respect its thread-safety rule.

module Service : sig
  type request 
  type response 
  val request : request -> response Fut.t
  val run : unit -> unit
end = struct
  type request 
  type response

  let m = Mutex.create ()
  let not_empty = Condition.create () 
  let requests = Queue.create ()

  let request req = (* executed by [Fut]'s thread. *)
    let abort' = ref false in 
    let abort () = abort' := true in
    let promise = Fut.promise ~abort () in
    Mutex.lock m; 
    Queue.add (req, promise, abort') requests;
    Condition.signal not_empty;
    Mutex.unlock m;
    Fut.future promise
      
  let rec serve () = (* executed by the service's thread. *)
    Mutex.lock m;
    while Queue.is_empty requests do Condition.wait not_empty m done;
    let (req, promise, abort) = Queue.pop requests in
    Mutex.unlock m; 
    let resp = do_service req abort in 
    let set () = Fut.set promise (`Det resp) in
    if not !abort then Fut.Runtime.action set;
    serve ()
    
    let run () = ignore (Thread.create serve ())
end

Integration with multiprocessing

You may want to use futures to delegate work to other processes.

Integration with signals