```ocaml= (* it's passed time we clean up the code. Objectives: - encode most/all of the invariants to be guaranteed by the type system - add exception-less variants to the interface (bonus point for backwards compatibility and deprecation mechanism) - identify other possible improvements The objectives are kept somewhat vague, the exercise is meant to be discursive, exploratory, and open-ended. *) (* Prerequisites: - This exercise uses Lwt. But it makes only a very simple use of it (no scheduling trickery, no [wakeup], etc. - You can `:%s/Lwt/Async/` if you prefer. - The module [Lwt_condition] is from the [lwt] package, the relevant documentation is: (* Lwt_condition.mli, excerpt *) type 'a t (** Condition variable type. The type parameter denotes the type of value propagated from notifier to waiter. *) val create : unit -> 'a t (** [create ()] creates a new condition variable. *) val wait : 'a t -> 'a Lwt.t (** [wait mutex condvar] will cause the current thread to block, awaiting notification for a condition variable, [condvar]. When the awaited condition is notified, the value parameter passed to [signal] is returned. *) val signal : 'a t -> 'a -> unit (** [signal condvar value] notifies that a condition is ready. A single waiting thread will be awoken and will receive the notification value which will be returned from [wait]. Note that condition notification is not "sticky", i.e. if there is no waiter when [signal] is called, the notification will be missed and the value discarded. *) *) module SelfPropelledBufferedSink : sig (** A self-propelled-buffered-sink. You can push values to its internal buffer. Whenever there are values in the buffer, the values are consumed sequentially (one after the other) by the [consumer]. *) type 'a t (** [create size consumer] is a self-propelled-buffered-sink with a set maximum buffer [size] and a [consumer]. *) val create : int -> ('a -> unit Lwt.t) -> 'a t (** [push t v] adds [v] to [t]'s buffer. The returned promise resolves when there is available space in the buffer. *) val push : 'a t -> 'a -> unit Lwt.t (** [push_now_exn t v] adds [v] to [t]'s buffer now. @raise [Failure] if [t]'s buffer is full. *) val push_now_exn : 'a t -> 'a -> unit val push_now : 'a t -> 'a -> unit Or_error.t (** [consuming t] is [true] if [t]'s consumer is executing (i.e., if it has returned a promise that hasn't been resolved yet) and false otherwise. *) val consuming : 'a t -> bool (** [waiting t] is [not @@ consuming t]. *) val waiting : 'a t -> bool end = struct open Lwt.Syntax (* let* and* let+ and+ *) (* invariants: - [buffer_size] is the number of elements in [buffer], - [0 <= buffer_size] - [buffer_size <= buffer_size_bound] - [worker] is a resolved promise only at initialisation-time, otherwise it is a forever-pending promise that consumes newly added elements - [running] is [true] when there's an unresolved promise from [consumer] and [false] when the worker is waiting on more elements. *) type 'a t = { buffer : 'a Queue.t; buffer_size_bound : int; consumer : ('a -> unit Lwt.t); free_space_notifier : unit Lwt_condition.t; new_element_notifier : unit Lwt_condition.t; mutable worker : unit Lwt.t; mutable running : unit Lwt.t; } let create buffer_size_bound consumer = let t = { buffer_size = 0; buffer = Queue.create (); buffer_size_bound; consumer; free_space_notifier = Lwt_condition.create (); new_element_notifier = Lwt_condition.create (); running = Lwt.return_unit; } in let rec work () = match Lwt.state t.running with | Fail _ | Return _ -> begin match Queue.take_opt t.buffer with | None -> let* () = Lwt_condition.wait t.new_element_notifier in work () | Some v -> t.running <- Lwt.catch (fun () -> t.consumer v) (fun _ -> Lwt.return ()); Lwt_condition.signal t.free_space_notifier (); work () end | Sleep -> let* () = t.running in work () in async work; t let push t v = if Queue.length t.buffer = t.buffer_size_bound then begin (* reached limit, wait for free space *) let+ () = Lwt_condition.wait t.free_space_notifier in Queue.push v t.buffer; Lwt_condition.signal t.new_element_notifier () end else begin Queue.push v t.buffer; Lwt_condition.signal t.new_element_notifier (); Lwt.return () end let push_now t v = if Queue.length t.buffer = t.buffer_size_bound then begin Or_error.fail "push_now: full buffer" end else begin Queue.push v t.buffer; Lwt_condition.signal t.new_element_notifier (); Ok () end let push_now_exn t v = Or_error.ok_exn (push_now t v) let consuming t = t.running let waiting t = not t.running end ```