module Nproc:Process poolssig
..end
Master and workers communicate by message-passing. The implementation relies on fork, pipes, Marshal and Lwt.
Error handling:
option
type
and None
indicates that an exception was caught.log_error
and log_info
can be redefined to take advantage of a particular logging system.string_of_exn
may be called in both master and workers.type
t
type
worker_info = private {
|
worker_id : |
(* | Worker identifier ranging between 0 and (number of workers - 1). | *) |
|
worker_loop : |
(* | Function that starts the worker's infinite loop. | *) |
exception Start_worker of worker_info
init
function passed as an option to Nproc.create
.
In this case it is the user's responsibility to catch the exception
and to start the worker loop.
The purpose of this exception is to allow the user to clear
the call stack in the child processes, allowing
the garbage collector to free up heap-allocated memory that
would otherwise be wasted.
val create : ?init:(worker_info -> unit) -> int -> t * unit Lwt.t
create nproc
returns (ppool, lwt)
where
ppool
is a pool of nproc
processes and lwt
is a lightweight thread
that finishes when the pool is closed.
init
: initialization function called at the beginning of
of each worker process. By default it does nothing.
Specifying a custom init
function allows to perform
some initial cleanup of resources
inherited from the parent (master),
such as closing files or connections. It may also
raise the Nproc.Start_worker
exception as a means
of clearing the call stack inherited from the parent,
enabling the garbage collection of some useless data.
If this Start_worker
mechanism is used,
the worker_loop
function from the Nproc.worker_info
record needs to be called explicitly after catching
the exception.val close : t -> unit Lwt.t
val terminate : t -> unit
val submit : t -> f:('a -> 'b) -> 'a -> 'b option Lwt.t
submit ppool ~f x
passes f
and x
to one of the worker processes,
which computes f x
and passes the result back to the master process,
i.e. to the calling process running the Lwt event loop.
The current implementation uses the Marshal module to serialize
and deserialize f
, its input and its output.
val iter_stream : ?granularity:int ->
?init:(worker_info -> unit) ->
nproc:int -> f:('a -> 'b) -> g:('b option -> unit) -> 'a Stream.t -> unit
nproc
worker processes running in parallel.
iter_stream
runs the Lwt event loop internally. It is intended
for programs that do not use Lwt otherwise.
Function f
runs in the worker processes. It is applied to elements
of the stream that it receives from the master process.
Function g
is applied to the result of f
in the master process.
The current implementation uses the Marshal module to serialize
and deserialize f
, its inputs (stream elements) and its outputs.
f
is serialized as many times as there are elements in the stream.
If f
relies on a large immutable data structure, we recommend
using the env
option of Full.iter_stream
.
granularity
: allows to improve the performance of short-lived
tasks by grouping multiple tasks internally into
a single task.
This reduces the overhead of the underlying
message-passing system but makes the tasks
sequential within each group.
The default granularity
is 1.init
: see Nproc.create
.val log_error : (string -> unit) Pervasives.ref
stderr
channel
and flushes its buffer.val log_info : (string -> unit) Pervasives.ref
stderr
channel
and flushes its buffer.val string_of_exn : (exn -> string) Pervasives.ref
Printexc.to_string
.
Users might want to change it into a function that prints
a stack backtrace, e.g.
Nproc.string_of_exn := (fun e -> Printexc.get_backtrace () ^ Printexc.to_string e)
module Full:sig
..end