Module Fut
Future values for asynchronous programming.
Fut
provides support to define and combine futures. A future is an undetermined value that becomes determined at an arbitrary point in the future. The future acts as a place-holder for the value while it is undetermined.
Promises determine the value of futures cooperatively while future queues determine the value of blocking or long running function applications with a set of concurrent workers and act as a mutual exclusion synchronization primitive. Semaphores allow to limit the resource usage of a set of determining futures.
The separate Futu
library exposes Unix
system calls as futures.
Consult the basics, the semantics and examples.
%%VERSION%% — homepage
Futures
type 'a state
=[
|
`Never
|
`Undet
|
`Det of 'a
]
The type for future states.
`Det v
if the future determinedv
.`Never
if the future is known to never determine.`Undet
if the future is undetermined. If the state is different from`Undet
, the future is set.
type 'a set
=[
|
`Never
|
`Det of 'a
]
The type for set future states. See
state
.
val state : 'a t -> 'a state
state f
is the current state off
.Thread-safe. This function can be called by other threads.
val state_set : 'a t -> 'a set
state_set f
is the set state off
.- raises Invalid_argument
if
f
is nots
.Thread-safe. This function can be called by other threads.
val await : ?timeout:float -> 'a t -> 'a state
await timeout f
, is likestate
except iff
is undetermined it waits untilf
is set ortimeout
seconds passed. Iftimeout
is unspecified,await
blocks untilf
is set.- raises Invalid_argument
if
timeout
is negative.
val finally : ('a -> 'b) -> 'a -> 'c t -> 'c t
finally fn v f
isf
butfn v
is called (and its result ignored) wheneverf
is set and immediately iff
is already set at call time.fn
may safely useset_state
f
to determine how it set. Iffn v
raises an exception it is reported to the exception trap specified withRuntime.set_exn_trap
.
Applicative combinators
These combinators have no effects on the determination of their arguments.
val never : unit -> 'a t
never ()
is a future that never determines a value.- [
never ()
]t= `Never
- [
val ret : 'a -> 'a t
ret v
is a future that determines the valuev
immediately.- [
ret v
]t= `Det v
- [
val recover : 'a t -> [ `Det of 'a | `Never ] t
recover f
is a future that determines`Det v
iff
determinedv
or`Never
iff
was set to never determine.- [
recover f
]t= `Det (`Det v)
if [f
]t= `Det v
- [
recover f
]t= `Det (`Never)
if [f
]t= `Never
- [
val bind : 'a t -> ('a -> 'b t) -> 'b t
bind f fn
is the futurefn v
wherev
is the value determined byf
.- [
bind f fn
]t=
[fn v
]t if [f
]t= `Det v
- [
bind f fn
]t= `Never
if [f
]t= `Never
- [
val app : ('a -> 'b) t -> 'a t -> 'b t
app ff fv
is a future that determinesf v
wheref
is determined byff
andv
byfv
.- [
app ff fv
]t= `Det (f v)
if [ff
]t= `Det f
and [fv
]t =`Det v
- [
app ff fv
]t= `Never
if [ff
]t= `Never
or [fv
]t= `Never
- [
val map : ('a -> 'b) -> 'a t -> 'b t
map fn f
is a future that determines the valuefn v
wherev
is the value determined byf
.- [
map fn f
]t= `Det (fn v)
if [f
]t= `Det v
- [
map fn f
]t= `Never
if [f
]t= `Never
.
- [
val fold : ('a -> 'b -> 'a) -> 'a -> 'b t list -> 'a t
fold fn acc [f
1; ...; f
n]
isList.fold_left fn acc [v
1; ...; v
n]
wherev
i is the value determined by the futuref
i.- [
fold fn acc [f
1; ...; f
n]
]t= List.fold_left fn acc [v
1; ...; v
n]
if for alli
[f
i]t= v
i - [
fold fn acc [f
1; ...; f
n]
]t= `Never
if there isi
with [f
i]t= `Never
- [
val sustain : 'a t -> 'a t -> 'a t
sustain f f'
determines likef
if it does and likef'
iff
never determines.- [
sustain f f'
]t= `Det v
if [f
]t= `Det v
- [
sustain f f'
]t=
[f'
]t if [f
]t= `Never
- [
val first : 'a t -> 'a t -> ('a * 'a t) t
first f f'
is a future that determines(v, snd)
wherev
is the value of the first future to determine andsnd
is the other one. If both futures are already determined the left one is taken.- [
first f f'
]t= `Det (v, f')
if [f
]t= `Det v
and for allt' < t
[f'
]t'<> `Det _
- [
first f f'
]t= `Det (v, f)
if [f'
]t= `Det v
and for allt' <= t
[f
]t'<> `Det _
- [
first f f'
]t= `Never
if [f
]t=
[f'
]t= `Never
- [
val firstl : 'a t list -> ('a * 'a t list) t
next fs
is a future that determines(v, fs')
wherev
is the first value to determine infs
andfs'
are the remaining ones. If more than one future is already determined the first left one is taken.- [
firstl [f
1; ...; f
n]
]t= `Det (v, [f
1; ...; f
j-1; f
j+1; ...;f
n])
if [f
j]t= `Det v
and for all t' < t and i < j [f
i]t'<> `Det _
- [
firstl [f
1; ...; f
n]
]t= `Never
if for alli
[f
i]t= `Never
- [
Effectful combinators
These combinators may set their `Undet
ermined arguments to `Never
and abort their determination.
Important. When a future is aborted, it is set to never determine and all the future it may be waiting on (and recursively) are also aborted, except those that are protected by protect
. Aborting a future that is set has no effect since once the future is set it never changes again.
Warning. If the aborted future is an `Undet
ermined future from a future queue, there's no guarantee that the corresponding application will be/has not been executed when it is set to `Never
. In any case the semantics of futures is preserved, it will switch from `Undet
to `Never
and remain so forever but depending on what the function application does, the world may be affected.
val abort : 'a t -> unit
abort f
aborts the futuref
. Iff
was`Undet
ermined at application time, it is set to never determine and all the futures it is waiting on are also aborted.- [
f
]t'= `Never
with t' > ta, if [f
]ta =`Undet
whereta
isabort
's application time.
TODO semantics we need to define a waits(f) that is the set of of futures
f
waits on.- [
val protect : 'a t -> 'a t
protect f
is a future that behaves likef
but is insensitive toabort
. It may of course still never determine because of its definition but its dependents are not able to abort it.f
of course remains abortable.
val pick : 'a t -> 'a t -> 'a t
pick f f'
is a future that determines as the first future that does and sets the other to never determine. If both futures are already determined the left one is taken. If both futures never determine, the future never determines.If [
f
]t= `Det v
then- [
pick f f'
]t= `Det v
- [
f'
]t'= `Never
with t' > t, if [f'
]t =`Undet
.
- [
If [
f'
]t= `Det v
and [f
]t<> `Det _
then- [
pick f f'
]t= `Det v
- [
f
]t'= `Never
with t' > t, if [f
]t =`Undet
.
- [
- If [
f
]t = [f'
]t= `Never
then [pick f f'
]t= `Never
TODO work out the semantics to see if it is equal to
first f f' >>= fun (v, f) -> ignore (abort f); ret v
Promises
val promise : ?abort:(unit -> unit) -> unit -> 'a promise
promise abort ()
is a new promise.abort
is called if the future of the promise is set to never determine viaset
or an effectful combinator. Once the future is setabort
is eventually garbage collected. If you want to call a function whenever the future of the promise is set, usefinally
on the future.Thread-safe. This function can be called from other threads.
val future : 'a promise -> 'a t
future p
is the future set by promisep
.Thread-safe. This function can be called from other threads.
val set : 'a promise -> [ `Det of 'a | `Never ] -> unit
set p s
setsfuture
p
tos
. Does nothing if the future ofp
is already set.Not thread safe, if you need to set the promise from another thread use a
Runtime.action
.
Future queues
type queue
The type for future queues. A future queue determines function applications sequentially in FIFO order. Applications submitted in two different queues are determined concurrently.
module Queue : sig ... end
Future queues.
val apply : ?queue:queue -> ?abort:bool Stdlib.ref -> ('a -> 'b) -> 'a -> 'b t
apply queue abort fn v
is a future that determines the valuefn v
onqueue
(defaults toQueue.concurrent
). If provided, theabort
argument is set totrue
by the runtime system if the future is set to`Never
, e.g. because it wasabort
ed;fn
can consult this reference cell to stop its computation or take appropriate action.fn
can raiseNever
to silently set the future to never determine. Any uncaught exception offn v
also sets the future to never determine however these exceptions are reported to the exception trap specified withRuntime.set_exn_trap
.Warning.
fn
is executed on another thread and as such it must respect Fut's thread safety rule.- The future won't determine if
await
is not called regularly. - If your program is not multi-threaded yet, it will become at that point.
Timers
Timer futures determine values after a specific amount of time has passed.
Warning. These futures don't determine if await
is not called regularly.
val delay : float -> float t
delay d
is a value that determinesdiff = d - d'
ind
seconds whered'
is the actual time the runtime system took to determine the timer. Ifdiff
is0.
the timer was on time, if positive the delay was early and if negative it was late.- [
delay d
]ta+d'= `Det (d-d')
whereta
isdelay
's application time andd'
is determined by the runtime system (but should be as near as possible fromd
).
- [
Semaphores
module Sem : sig ... end
Semaphores for limiting resource usage.
Error handling
type nonrec ('a, 'b) result
= ('a, 'b) Stdlib.result t
The type for future results. A future determining a result values.
Infix operators
Use of these operators may kill a few parentheses.
module Op : sig ... end
Infix operators.
Runtime system
module Runtime : sig ... end
Runtime system configuration and interaction.
Basics
A future is a place-holder for a value that may become determined at an arbitrary point in the future. Futures can be combined together to define new futures that will determine whenever the underlying futures do. For example, below revolt
determines once time_to_revolt
does.
let time_to_revolt = Fut.tick 1.871 in
let revolt =
Fut.bind time_to_revolt
(fun () -> Fut.apply Printf.printf "Revolt!")
let () = ignore (Fut.await revolt)
A future can be in three different states described by the type state
and returned by the (non-blocking) function Fut.state
. It is:
`Undet
, if the future is`Undet
ermined and may still determine a value at some point in the future.`Det v
, if the future`Det
ermined the valuev
.`Never
, if the future will never determine a value.
If the state of a future f
is `Det
or `Never
we say that the future is set.
The module Fut
.Ops defines a few infix operators to make the code more readable. For example the Ops
.(>>=) operator can be used for bind
. This makes it easier to sequence binds:
open Fut.Ops;;
let time_to_revolt d = Fut.tick d in
let revolt =
time_to_revolt 1.871 >>= fun () ->
Fut.apply Printf.printf "Revolt!" >>= fun () ->
time_to_revolt 0.72 >>= fun () ->
Fut.apply Printf.printf "Again!"
let () = ignore (Fut.await revolt)
Semantics
The semantic function []: 'a Fut.t -> time -> 'a Fut.state
gives meaning to a future f
by mapping it to a function [f
] that give its state at any point in time (after its creation). We write [f
]t the evaluation of this semantic function at time t
.
Remember that once a future is set it never changes again. This is captured by the following axiom that holds for any future f
at any time t
:
- If [
f
]t<> `Undet
then for allt' > t
we have [f
]t' = [f
]t
The semantics of combinators are given in terms of [] by describing only the instants at which the resulting future is set. In the remaining undescribed instants the state of the future is implicitly taken to be unset, i.e. `Undet
etermined. The descriptions also implicitly quantify universally over the free variable t
.
Never determined futures
Fut
makes a distinction between a future that is undetermined and may remain so forever and a future that is known to be never determined (e.g. Fut.never
), a future with state `Never
. Futures depending on a future value that `Never
determines also never determine. This means that most combinator whenever they are given a future that never determines return a future that never determines.
One way to get never determined futures is by using the future exclusive choice combinator first
between two futures. The first one that determines makes the other become never determined. Note that for future created with Fut.apply
, the application (and its effects) may (or not) still be executed, but it's value is discarded.
Exceptions
If the computation to determine a future's value raises an exception it is set to `Never
determine and the exception is logged by the runtime system to the handler provided by Runtime.set_exn_trap
. The default handler is backend dependent but usually logs the exception on stderr
.
There is a single exception to this behaviour. If the exception Never
is raised by an application given to Fut.apply
, the corresponding future never determines but the exception is not logged.
Do not rely on the fact that exceptions are automatically trapped by the system, this mechanism is in place so that unexpected exceptions do not kill the runtime system for long running programs. If you deal with a library that is designed around exceptions you should catch and handle them, if only to use Fut.never
if you wish so. For example if looking up a data structure may return Not_found
don't do that:
let find k m = Fut.ret (M.find k m)
This may significantly obfuscate what's happening (and won't be efficient). Explicitely handle the exception:
let find k m = try Fut.ret (M.find k m) with Not_found -> Fut.never ()
In this example, using option types may anway be clearer:
let find k m = Fut.ret (try Some (M.find k m) with Not_found -> None)
Future queues
Future queues determine long-running or blocking function applications with a set of concurrent workers usually implemented as system threads.
A future queue determines the applications submitted to it through the Fut.apply
function sequentially and in FIFO order. This property also makes them a mutual exclusion synchronization primitive. Applications submitted in two different queues are determined concurrently. In the following examples f1
and f2
will be set sequentially, while f3
will do so concurrently to those.
let q1 = Fut.Queue.create ()
let q2 = Fut.Queue.create ()
let f1 = Fut.apply ~queue:q1 Printf.printf "first f1"
let f2 = Fut.apply ~queue:q1 Printf.printf "then f2"
let f3 = Fut.apply ~queue:q2 Printf.printf "f3 at anytime"
Note that the function applications are executed on another threads and as such must respect Fut's thread safety rule.
There is a special queue the concurrent queue which is the default argument of apply
. This queue has no FIFO ordering constraint: anything submitted to it is determined concurrently. It is useful to just apply a long-running function to avoid starving the runtime system:
let long_work = Fut.apply long_running_function ()
It is also possible to provide a reference cell abort
to apply
. This reference cell will be set true
by the runtime system if the future is aborted by the runtime system. This can be used to stop the application. For example:
let long_running_work abort =
while (not !abort || !finished) do ... done
let abort = ref false
let long_work = Fut.apply ~abort long_running_work abort
let _ = Fut.abort long_work (* will set abort to [true] *)
Note that the runtime system never reads abort it just sets it to true
when appropriate.
Promises
Promises are used to expose the functionality of third-party libraries as futures. A call to promise
returns a promise that has an associated future (returned by future
). The promise can be set
, which sets the associated future. See this example to see how promises can be used to expose callback based asynchronous interfaces as futures.
Note that Fut.set
must be called on Fut
's thread. If one needs to set futures from another thread, a Runtime.action
can be used. See the next section.
Fut and system threads
Except for applications determined on future queues, Fut
has a single-threaded cooperative concurency model. This means that at any time there's only one future that is trying to determine its value on Fut
's thread.
Thread-safety rule. The thread-safety rule of Fut
is simple. A single thread must run Fut
's runtime system via a call to await
and no other thread is allowed to use any function of Fut
except, promise
, future
, state
and Runtime.action
.
In particular the thread-safety rule does apply to function applications performed with Fut.app
as they are performed on other threads.
Fut
tries to abstract away all your thread needs via the concept of future queues. Sometimes this is however infeasible for example because a library prevents you from sharing structures across threads and future queues provides you no support to control on which thread an application will happen. In that case you can still create your own system thread and interact with Fut
by using promise
and a Runtime.action
to Fut.set
it once it determined. See this example.
Before any call to apply
or Runtime
.set_thread_count the program is single threaded.
Fut and system processes
In general Unix.fork
does not cope well with threads. The best approach is thus to fork all the processes you need before the first call to await
.
Examples
Backends
TODO explain how to link against a backend.
The universal client
The echo server
Integration with asynchronous functions
Suppose we have a function to perform asynchronous reads on a regular file whose signature is as follows:
type read = [ `Error | `Read of int ]
val Aio.read : Unix.file_descr -> string -> int -> int -> (read -> unit) -> unit
where read fd s j l k
tries to read l
bytes from fd
to store them in s
starting at j
calling k
when it has completed.
Exposing this function call as a future is very simple. Create a promise and set it in the callback k
:
val read : Unix.file_descr -> string -> int -> int -> read Fut.t
let read fd s j l =
let p = Fut.promise () in
let k r = Fut.set p (`Det r) in
Aio.read fd s j l k;
Fut.future p
It is however important to make sure that the thread that calls the continuation k
is Fut
's thread. If this is not the case the callback k
can be replaced by:
let k r = Fut.Runtime.action (fun () -> Fut.set p (`Det r))
Integration with threads
If for some reason you cannot use future queues and need to run a service in its own thread, the code below shows how to interact with Fut
and respect its thread-safety rule.
module Service : sig
type request
type response
val request : request -> response Fut.t
val run : unit -> unit
end = struct
type request
type response
let m = Mutex.create ()
let not_empty = Condition.create ()
let requests = Queue.create ()
let request req = (* executed by [Fut]'s thread. *)
let abort' = ref false in
let abort () = abort' := true in
let promise = Fut.promise ~abort () in
Mutex.lock m;
Queue.add (req, promise, abort') requests;
Condition.signal not_empty;
Mutex.unlock m;
Fut.future promise
let rec serve () = (* executed by the service's thread. *)
Mutex.lock m;
while Queue.is_empty requests do Condition.wait not_empty m done;
let (req, promise, abort) = Queue.pop requests in
Mutex.unlock m;
let resp = do_service req abort in
let set () = Fut.set promise (`Det resp) in
if not !abort then Fut.Runtime.action set;
serve ()
let run () = ignore (Thread.create serve ())
end
Integration with multiprocessing
You may want to use futures to delegate work to other processes.