:::warning Announcements - Assignment 7 due tonight - Assignment 8 out tomorrow - Help hours today 3:45-5:15pm, L047 - MIT Programming Languages Review workshop Friday, April 25: http://plr.csail.mit.edu (small extra credit bump) ::: :::success Agenda for today - More parallelism - Examples using `spawn`/`join` - Examples using `async`/`await` - Concurrency - `mutex` abstraction ::: # Parallelism using `spawn`/`join` Recall from last lecture two core OCaml parallelism functions: - `Domain.spawn` spawns a new domain which maps 1-to-1 to an OS thread. - `Domain.join` waits for the provided domain to finish (for its promise to be resolved) before continuing on the current thread. It produces the value for the promise. `spawn`/`join` correspond to OS threads. We'll first follow an example from the [OCaml manual: Parallel Programming Chapter](https://ocaml.org/manual/5.3/parallelism.html) to demonstrate their use: parallelizing a naive fibonacci implementation. We'll start with a sequential implementation that reads the number `n` from the command line and calculates the `n`th fibonacci number: ```ocaml let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in let result = fib n in print_endline result let _ = main () ``` We can compile this code with: ``` $ ocamlopt -o fib.o fib.ml ``` And we get: ``` $ time ./fib.o 45 1836311903 ./fib.o 45 5.18s user 0.02s system 99% cpu 5.208 total ``` This tells us that it took **5.2 seconds** to run this code to compute the 45th fibonacci number. Just for the sake of demonstration, we can see that doing double the amount of work (by calling `fib` twice on the same `n`) roughly doubles the amoung of time for the code to execute: ```ocaml let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in let result1 = fib n in Printf.printf "fib(%d) = %d\n%!" n result1; let result2 = fib n in Printf.printf "fib(%d) = %d\n%!" n result2 let _ = main () ``` Without any changes, this takes about twice as much time (**10.8** seconds): ``` time ./fib.o 45 1836311903 1836311903 ./fib.o 45 10.31s user 0.03s system 95% cpu 10.843 total ``` Now, we can show that because the two `fib(n)` computations are independent (we do not need one to compute the other), we can parallelize them by running each in a distinct thread/domain. To do this, we'll use `Domain.spawn` to parallelize each call (via a thunk) and `Domain.join` to wait for the results prior to printing the `int` values: ```ocaml (* fib.ml *) let rec fib n = if n < 2 then 1 else fib (n - 1) + fib (n - 2) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in let d1 = Domain.spawn (fun _ -> fib n) in let d2 = Domain.spawn (fun _ -> fib n) in let r1 = Domain.join d1 in let r2 = Domain.join d2 in print_endline r1; print_endline n r2 let _ = main () ``` Now, we get **5.8 seconds**: ``` $ time ./fib.o 45 1836311903 1836311903 ./fib.o 45 10.71s user 0.03s system 183% cpu 5.863 total ``` Even though we are still computing `fib` twice! :::success Note that this is not *as fast* as the 5.2 seconds for computing just one call to `fib 45`. Why not? ::: **Answer**: there is some cost to creating new OS threads. We can consider this overhead to parallelizing the computation. In general, parallel computations always involve some overhead/coordination between threads (at minimum, to spawn new OS threads). This example shows a clear speedup, but is a little silly, since we were just doing duplicate work. Let's think about a case where different threads do distinct useful work. Let's say we want to sum 1 million floating point numbers! # Group exercise ```ocaml (* Group exercise: implement parallel_sum on an array that uses Domain.spawn and Domain.join to calculate the sum of numbers in a large array. 1. First, assume you just want to split the work between two parallel domains. 2. Then, if time, update your function to take in `k`, the number of domains. *) let parallel_sum arr = ``` ## Solution using array indexing ```ocaml (* Helper function to measure the time a function takes *) let time f = let start = Unix.gettimeofday () in let result = f () in let stop = Unix.gettimeofday () in Printf.printf "Time: %.6f seconds\n" (stop -. start); result (* Helper to sum a chunk of an array *) let sum_chunk arr low high : float = let sum = ref 0.0 in for i = low to high do sum := !sum +. arr.(i) done; !sum (* Parallel version of sum *) let parallel_sum arr = let n = Array.length arr in let mid = n / 2 in let d1 = Domain.spawn (fun () -> sum_chunk arr 0 (mid - 1)) in let sum2 = sum_chunk arr mid (n - 1) in let sum1 = Domain.join d1 in (* Note: need to join before we can sum *) sum1 +. sum2 let () = let n = 100_000_000 in let big_array = Array.init n (fun i -> float_of_int i /. 10.0) in print_endline "Sequential sum:"; (* Note: fold_left is a bit slower, so showing the direct comparison using sum_chunk *) (* let sum1 = time (fun () -> Array.fold_left ( +. ) 0.0 big_array) in *) let sum1 = time (fun () -> sum_chunk big_array 0 (100_000_000 - 1)) in Printf.printf "Result: %.2f\n\n" sum1; print_endline "Parallel sum:\n"; let sum2 = time (fun () -> parallel_sum big_array) in Printf.printf "Result: %.2f\n" sum2 ``` When we run this, we get that the parallel sum is twice as fast: ``` $ ./sumfloats-ex.o Sequential sum: Time: 0.0878 seconds Result: 4999999950000.00 Parallel sum: Time: 0.0452 seconds Result: 4999999950000.00 ``` Woohoo, faster code! :::info Note that in general, it's a good idea to 1. get a working sequential version, and 2. see if that sequential version is problematically slow *before* trying to parallelize code. ::: # Pools of workers `spawn`/`join` are a good option when we have a small number of parallel tasks, and are okay with those tasks being 1-to-t with OS threads. However, there are limits to the number of OS threads a program can spawn. If we have a large number of tasks we'd like to parallelize as much as possible, Domainslib provides `async`/`await` as a higher-level mechanism for managing a _pool_ of tasks. Conceptually, we setup a pool and provide the number of domains we'd like to use. `async` creates a task that is added to the pool of tasks to be completed. The library itself handles distributing tasks to each domain. `await` waits for the results of a task (a promise) to be resolved before continuing. While `async`/`await` are higher-level, in OCaml, they require some initial code to set up the pool, run the domains in parallel, and clean up the pool, respectively: ```ocaml Task.setup_pool Task.run Task.teardown_pool ``` We'll demonstrate their use by example (again, from the OCaml manual parallel program chapter). We'll go back to the `fib` example. Let's be more strategic about where we invoke parallelism here: we want to do useful parallelism _within_ each call to `fib`, rather than just across calls. When we look at `fib`'s definition itself, we can see that both `fib (n-1)` and `fib (n-2)` are potentially long-running calls, but their results do not depend on one another. So, we can execute them as separate `async` tasks! We'll then use `await` to combine the results into the final `+` in the call. :::info Note that if for a given call to `fib`, `n` is small, then the call *is already fast* without being paralleled. The potential speedup is not worth it relative to the overhead of creating/coordinating a new task. Thus, we'll just call the normal, sequential `fib` if `n` is less than 10. Deciding what that cutoff should be is a subjective/empirical call. ::: ```ocaml (* Example: fibonacci, following the example from OCaml manual parallel programming chapter *) open Domainslib let rec fib n = if n < 2 then 1 else (fib (n - 1)) + (fib (n - 2)) let rec par_fib pool n = if n < 10 then fib n else ( let first = Domainslib.Task.async pool (fun ()-> par_fib pool (n-1)) in let second = Domainslib.Task.async pool (fun ()-> par_fib pool (n-2)) in let first_res = Domainslib.Task.await pool first in let second_res = Domainslib.Task.await pool second in first_res + second_res ) let main () = let n = try int_of_string Sys.argv.(1) with _ -> 1 in (* Take in the number of domains as an argument *) let num_domains = int_of_string Sys.argv.(2) in (* Create the pool of tasks *) let pool = Domainslib.Task.setup_pool ~num_domains:(num_domains-1) () in (* Run the tasks we create using the pool, pass our parallel function*) let result = Domainslib.Task.run pool (fun () -> par_fib pool n) in (* Best practices: cleanup when done *) Domainslib.Task.teardown_pool pool; (* Print the result *) print_endline (string_of_int result) let _ = main () ``` # Concurrency Recall from last lecture than concurrency is when we want to coordinate access to a shared resource. Let's see any example of using `stdout` (standard out, where print lines are directed by default) as a shared resource. In this example, we have two lists of string that we'd like to print to the terminal. We don't care which list prints first, but the lists should be kept intact. To make this interesting, let's simulate each string taking some work to produce by calling `sleep` in the function beforehand. :::info `sleepf` just hangs/pauses the thread for the provided number of seconds. ::: Mutex ```ocaml (* Sleep to simulate longer-running work, then print each line*) let sleep_then_print delay lines mutex = Unix.sleepf delay; List.iter (fun line -> print_endline line ) lines let list1 = [ "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; ] let list2 = [ "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; ] let main () = (* Run these two in parallel spawned domains *) let t1 = Domain.spawn (fun ()-> sleep_then_print 2. list1 mutex) in let t2 = Domain.spawn (fun ()-> sleep_then_print 2. list2 mutex) in Domain.join t1; Domain.join t2 let _ = main () ``` When the two `sleep_then_print` calls happen in parallel, some of the time, we get our lists of strings interwoven! Like in this snippet, where even the first line was not completely from either list: ``` HelloTwinkle, twinkle, little star, How I wonder what you are! Up above the world so high, Like a diamond in the sky. Twinkle, twinkle, little star, How I wonder what you are! Up above the world so high, Like a diamond in the sky. Twinkle, twinkle, little star, How I wonder what you are! Up above the world so high, Like a diamond in the sky. Twinkle, twinkle, little star, How I wonder what you are! world Hello world Hello Up above the world so high, Like a diamond in the sky. world Hello world ``` If we tried to solve this problem with just `join` after one call, like so: ```ocaml let t1 = Domain.spawn (fun () -> sleep_then_print 2. list1 mutex) in Domain.join t1; let t2 = Domain.spawn (fun () -> sleep_then_print 2. list2 mutex) in Domain.join t2 ``` Then we prevent unintended jumbling of our lists, but we also essentially have gone back to a sequential version of our code, where both long-running pieces of work (the two `sleep`s) happen sequentially, despite not actually involving the shared resource (the standard out). This version would take the same length of time as a sequential version (or slightly longer due to overhead). ## Mutex What we really want here is more targeted _mutual exclusion_: the idea that we have some parts of our code that should be mutually exclusive: either domain can run the code at once, but not both. Parts of the code that do not need mutual exclusion (here, the `sleep`s) can still run in parallel. This is again a pervasive idea across programming languages: the idea of a **`mutex`, or mutual exclusion primitive**. This is often also called a "lock". We create a mutex value to protect a section of our code (here, the printing). We `lock` the `mutex` when we want to start a critical piece of code that should not overlap. We `unlock` it when we are done. The programming language library/implementation ensures that only one thread can "have" the mutex at once. OCaml's mutex is implemented in the [mutex module](https://ocaml.org/manual/5.1/api/Mutex.html). Essentially every programming language that supports concurrency provides some version of a mutex/lock. The code operations of `lock` and `unlock` are also shared abstractions, though many languages include far more complex and expressive version of each. In our printing example, we can add a `mutex` to our main function and pass it to each `sleep_then_print` call. Within the call to `sleep_then_print`, we'll `lock` the `mutex` **after** sleeping (since that can happen in parallel), but **before** the section of code that need to *not* overlap: the printing `iter`. We then `unlock` the `mutex` at the end of that critical block. The OCaml library will ensure that at runtime, whichever domain reaches the `Mutex.lock mutex` line first gets to "have" the mutex and complete its printing; making the other domain wait until the mutex is unlocked before allowing it to enter that section of the code. ```ocaml let sleep_then_print delay lines mutex = Unix.sleepf delay; Mutex.lock mutex; List.iter (fun line -> print_endline line ) lines; Mutex.unlock mutex let list1 = [ "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; "Hello"; "world"; ] let list2 = [ "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; "Twinkle, twinkle, little star,"; "How I wonder what you are!"; "Up above the world so high,"; "Like a diamond in the sky."; ""; ] let main () = (* Create a mutex to manage stdout *) let mutex = Mutex.create () in (* Pass it to each sleep_then_print call*) let t1 = Domain.spawn (fun ()-> sleep_then_print 2. list1 mutex) in let t2 = Domain.spawn (fun ()-> sleep_then_print 2. list2 mutex) in Domain.join t1 ; Domain.join t2 let _ = main () ```