```ocaml=
(* it's passed time we clean up the code.
Objectives:
- encode most/all of the invariants to be guaranteed by the type system
- add exception-less variants to the interface (bonus point for backwards
compatibility and deprecation mechanism)
- identify other possible improvements
The objectives are kept somewhat vague, the exercise is meant to be
discursive, exploratory, and open-ended.
*)
(* Prerequisites:
- This exercise uses Lwt. But it makes only a very simple use of it (no
scheduling trickery, no [wakeup], etc.
- You can `:%s/Lwt/Async/` if you prefer.
- The module [Lwt_condition] is from the [lwt] package, the relevant
documentation is:
(* Lwt_condition.mli, excerpt *)
type 'a t
(** Condition variable type. The type parameter denotes the type of
value propagated from notifier to waiter. *)
val create : unit -> 'a t
(** [create ()] creates a new condition variable. *)
val wait : 'a t -> 'a Lwt.t
(** [wait mutex condvar] will cause the current thread to block,
awaiting notification for a condition variable, [condvar].
When the awaited condition is notified, the value parameter passed
to [signal] is returned. *)
val signal : 'a t -> 'a -> unit
(** [signal condvar value] notifies that a condition is ready. A
single waiting thread will be awoken and will receive the
notification value which will be returned from [wait]. Note
that condition notification is not "sticky", i.e. if there is
no waiter when [signal] is called, the notification will be
missed and the value discarded. *)
*)
module SelfPropelledBufferedSink : sig
(** A self-propelled-buffered-sink. You can push values to its internal
buffer. Whenever there are values in the buffer, the values are consumed
sequentially (one after the other) by the [consumer]. *)
type 'a t
(** [create size consumer] is a self-propelled-buffered-sink with a set
maximum buffer [size] and a [consumer]. *)
val create : int -> ('a -> unit Lwt.t) -> 'a t
(** [push t v] adds [v] to [t]'s buffer. The returned promise resolves when
there is available space in the buffer. *)
val push : 'a t -> 'a -> unit Lwt.t
(** [push_now_exn t v] adds [v] to [t]'s buffer now.
@raise [Failure] if [t]'s buffer is full. *)
val push_now_exn : 'a t -> 'a -> unit
val push_now : 'a t -> 'a -> unit Or_error.t
(** [consuming t] is [true] if [t]'s consumer is executing (i.e., if it has
returned a promise that hasn't been resolved yet) and false otherwise. *)
val consuming : 'a t -> bool
(** [waiting t] is [not @@ consuming t]. *)
val waiting : 'a t -> bool
end = struct
open Lwt.Syntax (* let* and* let+ and+ *)
(* invariants:
- [buffer_size] is the number of elements in [buffer],
- [0 <= buffer_size]
- [buffer_size <= buffer_size_bound]
- [worker] is a resolved promise only at initialisation-time, otherwise
it is a forever-pending promise that consumes newly added elements
- [running] is [true] when there's an unresolved promise from [consumer]
and [false] when the worker is waiting on more elements.
*)
type 'a t = {
buffer : 'a Queue.t;
buffer_size_bound : int;
consumer : ('a -> unit Lwt.t);
free_space_notifier : unit Lwt_condition.t;
new_element_notifier : unit Lwt_condition.t;
mutable worker : unit Lwt.t;
mutable running : unit Lwt.t;
}
let create buffer_size_bound consumer =
let t = {
buffer_size = 0;
buffer = Queue.create ();
buffer_size_bound;
consumer;
free_space_notifier = Lwt_condition.create ();
new_element_notifier = Lwt_condition.create ();
running = Lwt.return_unit;
} in
let rec work () =
match Lwt.state t.running with
| Fail _ | Return _ -> begin
match Queue.take_opt t.buffer with
| None ->
let* () = Lwt_condition.wait t.new_element_notifier in
work ()
| Some v ->
t.running <- Lwt.catch (fun () -> t.consumer v) (fun _ -> Lwt.return ());
Lwt_condition.signal t.free_space_notifier ();
work ()
end
| Sleep ->
let* () = t.running in
work ()
in
async work;
t
let push t v =
if Queue.length t.buffer = t.buffer_size_bound then begin
(* reached limit, wait for free space *)
let+ () = Lwt_condition.wait t.free_space_notifier in
Queue.push v t.buffer;
Lwt_condition.signal t.new_element_notifier ()
end else begin
Queue.push v t.buffer;
Lwt_condition.signal t.new_element_notifier ();
Lwt.return ()
end
let push_now t v =
if Queue.length t.buffer = t.buffer_size_bound then begin
Or_error.fail "push_now: full buffer"
end else begin
Queue.push v t.buffer;
Lwt_condition.signal t.new_element_notifier ();
Ok ()
end
let push_now_exn t v = Or_error.ok_exn (push_now t v)
let consuming t = t.running
let waiting t = not t.running
end
```