# Parallel and Concurrent Parallel Programming Exam 2020 - Course Title: Parallel and Concurrent Parallel Programming - Course code: KSPRCPP2KU - Student: Diego Joshua Martínez Pineda - ITU email: diem@itu.dk I hereby declare that I have answered these exam questions myself without any outside help. - Diego Joshua Martínez Pineda 12.dec.2020 # 1 - `Measurer` ## 1.1 Measurement results With the supplied method and class: ``` java public static void main(String[] args){ try { System.out.println(measureThreadCreateAndStart()); } catch(Exception e){ System.out.println("Something failed"); } } ``` Results: `MeasurementResult{count=100000 mean=81,601 sdev=158}` ## 1.2 Warm up In order for the JIT compiler to gather enough statistics that may help, I also modified a bit the `Measurer` class. The idea was that during the warmup phase, instead of just continue looping without starting measurements, to do the same work they should do. And only when `endMeasurement` is called, their measurements should be ignored. This was accomplished by adding a `ignore` parameter to `startMeasurement` as follows: In `Measurer`: ```java public void startMeasurement(boolean _ignore){ ignore = _ignore; start = System.nanoTime(); } public void endMeasurement(){ long time = System.nanoTime()-start; if(ignore) return; registerMeasurement(time); } ``` In `measureThreadCreateAndStart`: ```java for (int i = 0; i<N; i++){ boolean ignore = i < warmup; m.startMeasurement(ignore); Thread t = new Thread(() -> m.endMeasurement()); t.start(); t.join(); } ``` ### Analysis of the mean behaviour ![](https://i.imgur.com/7xT1bvH.png) The behaviour of the mean seems rather erratic, it doesn't seem to be affected by the warmup parameters. This makes sense, since the work being measured is the start time of a thread, and this is erratic. It would make little difference if the code is compiled, since it depends on external factors, such as the OS scheduling for it. Nevertheless I answer at which values a lower mean is achieved. Around 75,000 and 96,000. This doesn't imply anything, since when increasing the warmup, the sample size is reduced and therefore the statistical validity of this values is also reduced. ### Analysis of the standard deviation behaviour ![](https://i.imgur.com/QoQQM7E.png) On the other hand, we can see the impact of the warmup on the standard deviation. There's a spike slightly before the 10,000. This is in agreement with the JVM default performance option `-XX:CompileThreshold=10000`[1]. Such a spike can be explained by the thread time being hijacked by the JIT compiler. Afterwards, and until 50,000 the standard deviation remains low. After 50,000 an erratic increasing behaviour appears, this is explained as in the mean case. The statistical validity of this estimators is reduced when the sample size is reduced. ## 1.3 Thread safety of `measureThreadCreateAndStart` To argue that we don't end up in any **race condition**, we first observe that at any point in time during our execution, there is one of this two threads running: the main thread, or the local `t`. The reason of this, is that right after calling `t.start`, `t.join` is called. This sets the calling thread (the main thread) in the `Waiting` state of the JVM's scheduler, and upon completion of `t`, it comes back to the `Runnable` state. In simpler words, the main thread halts execution until `t` is on a `Terminated` state. Since this is done in every iteration of the loop, we're sure that this order is preserved. At any point in time there's only one thread in `Runnable` state. To argue that the changes done to `m` in the main thread are **visible** to `t` and viceversa, we refer to the Java Memory Model (JMM) [2] [3]. First I identify the places where memory actions take place in the program: when `m.startMeasurement` or `m.endMeasurement` are called, they mutate the internal state of `m` and `endMeasurement` reads its internal state as well. Such actions obey the partial order `happens-before` that the JMM enforces, in particular the following rules must hold: - **A**: Program order rule - **B**: Thread start rule - **C**: Thread termination rule By **B**, we are ensured that `t.start` `happens-before` everything in the body of `t`. By **A**, `m.startMeasurement` `happens-before` `t.start`. Therefore by transitivity of the partial order `m.startMeasurement` `happens-before` everything in the body of `t`, in particular, it's call to `m.endMeasurement`. By **C**, mutations/reads in `m.endMeasurement` `happens-before` `t.join`, and as before by **A**, to subsequent iterations of the loop. To conclude, mutations on `m.startMeasurement` are visible to the reads of `m.endMeasurement` since it was shown that `m.startMeasurement` `happens-before` `m.endMeasurement` in every iteration. And mutations in `m.endMeasurement` are visible to subsequent calls to `m.startMeasurement` as shown in the previous paragraph. ## 1.4 Thread-safe `Measurer` The changed methods were: `total`, `getResults` and `registerMeasurement`. To ensure visibility of the changes, the fields `count`, `sum` and `ssq` could be made `volatile`. However to also avoid race conditions I decided to lock with the `this` object by prepending the `synchronized` keyword to such methods. Such methods are the ones that read or write the internal state, and therefore prone to visibility issues or race conditions. This are the least required changes for this (using the locks approach). ```java synchronized public Result getResults(){ //... } synchronized public long total(){ //... } synchronized private void registerMeasurement(long measurement){ //... } ``` Additionally a parameter `warmUpRounds` was included on the constructor and it's corresponding logic in `registerMeasurement`: ```java synchronized private void registerMeasurement(long measurement){ if(warmUpRuns>0){ warmUpRuns--; return; } // ... } ``` The complete implementation is on the class `MeasurerThreadSafe` of the file `MeasurerGreen.java` attached. ## 1.5 Is opening files parallelizable? ### procinfo The measurements were executed with a processor with the following specifications: - Model name: Intel(R) Core(TM) i3-7100U CPU @ 2.40GHz - Cpu cores: 2 - Cache size: 3072 kB - Threads per core: 2 ### Measurements Measurements where repeated 100,000 times with a 10,500 warmup, with the `MeasurerThreadSafe` from 1.4. Each measurement consisted on the main thread creating as much threads as corresponding to the case. After every one is ready (ensured with a `CyclicBarrier`), the file opening is executed. To measure when everyone is finished, the same `CyclicBarrier` is reused. ``` java static class MeasuredOpenReader implements Runnable{ final int file; public MeasuredReader(int _file){ file = _file; } @Override public void run(){ try { barrier.await(); FileReader in = new FileReader("1.5-files/0" + file + ".txt"); BufferedReader br = new BufferedReader(in); barrier.await(); // registers measurement when every thread is done opening its file in.close(); } catch (Exception e) { System.out.println("Something failed"); } } } static void measureOpenFiles(final int nFiles){ for(int i=1; i<=100_000;i++){ barrier = new CyclicBarrier(nFiles+1); for (int file = 1; file <= nFiles; file++) { new Thread(new MeasuredOpenReader(file)).start(); } // Starting threads at the same time try{barrier.await();}catch(Exception e){System.out.println("Something failed");} long start = m.startMeasurement(); // Waiting for all to be done try {barrier.await();}catch (Exception e){System.out.println("Something failed");} m.endMeasurement(start); } } public static void main(String[] args){ for(int i=1; i<=N_CASES; i++){ // New measurements for each case m = new MeasurerThreadSafe(10500); measureOpenFiles(i); System.out.println(i+" threads/files: "+m.getResults()); } } ``` Note that this measurements could've been executed without the `MeasurerThreadSafe` since only the main thread uses it. If we were measuring the time that it takes opening each individual file, on each thread, sharing such `m`, then it would be necessary. This approach was also implemented and can be found in the class `FilesParallelizable` of `FilesParallelizable.java` as a comment. The following is a plot to show the behaviour. The data can be found in the file `1.5.data`. ![](https://i.imgur.com/IBAsjKV.png) ### Analysis of the measurements The plot show its increasing behaviour, however to have a clearer perspective on how it increases, I'll list and plot the factors by which they increase below. I represent $|X_k|$ as the time measurement for $k$ threads: - $|X_2| = 1.48|X|$ - $|X_3| = 1.57|X_2|$ - $|X_4| = 1.79|X_3|$ - $|X_5| = 1.57|X_4|$ - $|X_6| = 1.20|X_5|$ - $|X_7| = 1.20|X_6|$ - $|X_8| = 1.21|X_7|$ ![](https://i.imgur.com/df0IvcG.png) The task of opening different files in many threads is barely parallelizable from 1 to 2 threads, however for subsequent increments under the number of CPU threads, we can see that the task is **not** parallelizable, the factors are really close to 2. After the number of CPU threads is reached we can see a sudden drop, this is to be expected, since there should be little difference after the parallelizable limit is reached. ### Serializable fraction and Amdahl's Law By solving Amdahl's inequality for the serializable fraction. We have the following: $F \leq \frac{N-S}{S(N-1)}$ For my machine $N=4$, by the measurements $S=(54389.2/4)/12997.3=1.05$. Since we were able to open 4 files in 54389.2 ns, and 1 in 12997.3 ns. This implies that the serializable fraction for file opening obeys: $F \leq 0.93$ In agreement with the law, the serializable fraction shows that this tasks is highly serializable, and that little speedups can be gained, which is also what the results showed. ## 1.6 Is reading files parallelizable? ### procinfo The measurements were executed with a processor with the following specifications: - Model name: Intel(R) Core(TM) i3-7100U CPU @ 2.40GHz - Cpu cores: 2 - Cache size: 3072 kB - Threads per core: 2 ### Measurements This was implemented in the same fashion as 1.5. The measurements where repeated 30,000 times with a warmup of 10,500. The main difference with 1.5, is that after the files are opened, they're read, and afterwards the measurement finishes. To avoid overflow, `ms` were used instead in the `MeasurerThreadSafe`. You can see this in the method `measureReadFiles` of the class `FilesParallelizable` on the accompanying file `FilesParallelizable.java`. ![](https://i.imgur.com/dQTYnlX.png) In the same fashion as in 1.5, the plots of speed factors is shown. ![](https://i.imgur.com/HDkpzzD.png) In contrast to opening the files, reading the files was shown to be much more parallelizable, the factors were close to 1, and in some cases even below 1. The mean time of having 4 threads (as many as CPU threads) open and read the whole files is 24.9 ms, in contrast to a single thread/file taking 16.6 ms. This represents a speed up of $(24.9/4)/16.6=2.6$ Amdahl's inequality implies that the serializable fraction for this task obeys $F \leq 0.18$, in agreement to the observation, this task has a low serializable fraction. ## 1.7 Is Thread creation *serialized*? In a similar fashion, the following method was used to measure thread creation, also on `FilesParallelizable.java`. ```java static void measureThreadCreation(int nThreads){ for(int i = 1; i <= 100_000; i++){ barrier = new CyclicBarrier(nThreads + 1); for(int t=0; t<nThreads; t++){ new Thread(()->{ try{ barrier.await(); // every one starts at the same time Thread x = new Thread(() -> { System.out.println("No one will call me :("); }); barrier.await(); // send finish signal } catch (Exception e) { System.out.println("Something failed"); } }).start(); } try{ barrier.await(); // everyone starts at the same time long start = m.startMeasurement(); barrier.await(); // Everyone finished } catch(Exception e){ System.out.println("Something failed") } } } ``` The same analysis as before shows the following results: ![](https://i.imgur.com/lPG7ZiQ.png) ![](https://i.imgur.com/Kpj3zT5.png) The measurements show a weird behaviour for this measurement. Only from 1 to 2 threads, the task was shown to be not parallelizable, but, form then on, it was highly serializable. We could take the speed up between the 2 and 4, to be 1.22. Giving an Amdahl's serializable factor of 0.76. Or for even a larger serializable value, by taking into account the first value, the speedup would be 1.07, giving a serializable factor of 0.91. This shows that the rumor appears to be true. However, to be completely sure I wouldnt be so confident by this, since the measured values may be to small. Perhaps a look into the implementation of such a process and careful analysis would be in place. # 2 - Message passing, buffers, and Kotlin sequences ## 2.1 Message passing ```java int limit= 10; final LinkedBlockingQueue<Integer> buffer= new LinkedBlockingQueue<Integer>(); Thread t1 = new Thread(() -> { try{ for (int j = 0; j < limit; j++) buffer.put(j); } catch(Exception e){} }); Thread t2 = new Thread(() -> { try{ while(true){ buffer.take(); System.out.printf("*"); } } catch(Exception e){} }); t2.start(); t1.start(); ``` The receiver thread is `t2`, it takes values from the `buffer`, this method blocks until a value beomes available. So every time `buffer.put` is called on `t1`, a value becomes available for `t2` and a `*` is printed. ## 2.2 Is final required? Yes, java doesn't allow non-final variables on the scope of the lambda to be used. This retriction is to avoid concurrency issues, and mutating parameters of a method outside of it's scope. ## 2.3 Recursion on Kotlin's sieve of Eratosthenes ```kotlin val morePrimes = rest.filter {i->i%first != 0}.onlyPrimes() ``` In that step, a new sequence is produced. However, this doesn't mean that the sequence is computed, since sequences compute their values on demand; it only adds operations to the chain of computations for it. Since this is then yielded with `yieldAll`, the values of the main sequence won't have any multiples of the first value (prime), by applying a subsequent recursive call on `onlyPrimes` it'll resemble the procedure of the sieve of eratosthenes. ## 2.4 Java's sieve of Eratosthenes The procedure for computing the primes resembles the Kotlin one: ``` java public class Eratosthenes { static int limit; static int BOUND = 10; public static void filterPrimes(BoundedBuffer<Integer> filtered){ new Thread(()->{ int prime = filtered.take(); if(prime > limit){ return; } else{ System.out.println(prime); } BoundedBuffer<Integer> evenMoreFiltered = new BoundedBuffer<Integer>(BOUND); filterPrimes(evenMoreFiltered); while(true){ int candidate = filtered.take(); if(candidate%prime!=0){ evenMoreFiltered.insert(candidate); } } }).start(); } public static void main(String[] args){ limit = 100; m = new Measurer(); BoundedBuffer<Integer> naturals = new BoundedBuffer<Integer>(BOUND); filterPrimes(naturals); for(int i=2; i<=limit+10; i++){ naturals.insert(i); } } } ``` The `BoundedBuffer` was implemented with two `Semaphore`'s. The full details of implementation can be seen in the class `BoundedBuffer` of the file `Eratothenes.java` ## 2.5 Measuring time taken To add measurements, a static attribute `m` was added. A single execution was measured. `startMeasurement` is called after creating the `naturals` buffer, `endMeasurement` is called when the limit is detected, prior to returning. Refer to the attached `Eratosthenes.java`. The result of measuring a single pass of primes under 100, without printing, and with a buffer bounded at 10, was 21 235 665 ns. ## 2.6 Limits on number of threads My system doesn't seem to impose a limit on the number of threads the JVM can ask. I was able to increase the limit to 9000, taking 1 136 593 521 ns (over 1s). The code for the time measurements is the same as in 2.5, changing parameters. ## 2.7 Timing by changing the buffer bound - Bound 20: 1 129 066 088 ns - Bound 100: 472 854 004 ns We see that the speedup is significant (more than 2x). This is to be expected, since a lower bound on the Buffer, implies that it will be filled most of the time. The consumer of such values, don't consume values as fast as they're produced, leading to many filled buffers and a lot of waiting time. By increasing the bound, less time is spent on waiting for the buffer to have space. The code for the time measurements is the same as in 2.5, changing parameters. ## 2.8 Using a `UnboundedBuffer` Very similar to the `BoundedBuffer` implementation, removing one of the semaphores. A `LinkedList` is used to represent the items on the buffer, which mutations are guarded by `synchronized`. When getting a value, we'll wait until a permit becomes available with `acquireUninterruptibly`. When inserting a value, we'll add a permit by using `release`. ```java class UnboundedBuffer<T> implements IBuffer<T> { private LinkedList<T> queue = new LinkedList<T>(); private Semaphore releasable; // used to block releases public UnboundedBuffer() { releasable = new Semaphore(0, true); } public void insert(T elem) { add(elem); releasable.release(); } public T take() { releasable.acquireUninterruptibly(); return poll(); } private synchronized void add(T elem) { queue.add(elem); } private synchronized T poll() { return queue.poll(); } } ``` ## 2.9 Measuring `Eratosthenes` with `UnboundedBuffer` According to the discussion on 2.7, placing no bound should give us speedups. This is confirmed by the measurements. Computing prime value until 9000 results in 213 099 696 ns. (5.3x speedup) See the code in the class `UnboundedEratosthenes` in the file `Eratosthenes.java`. Is very similar to the bounded implementation. ## 2.10 Measuring kotlin's Eratosthenes ```kotlin fun main(){ val start = System.nanoTime(); (2..1000).asSequence().onlyPrimes().forEach(::println) val time = System.nanoTime()-start println("Time taken(ns): ${time}") } ``` The output of this was 75 563 871 ns, this value shouldn't be compared with previous values, since it was executed in Kotlin's playground server. ## 2.11 Measuring kotlin's Eratosthenes without printing To ensure that the values are computed, and it's not only a `sequence` of filters, we compute the count and print it afterwards. ```kotlin fun main(){ val start = System.nanoTime(); val np = (2..1000).asSequence().onlyPrimes().count() val time = System.nanoTime()-start println("Time taken(ns): ${time}") println(np) } fun Sequence<Int>.onlyPrimes(): Sequence<Int> = sequence { if(any()){ val first: Int = first() val rest: Sequence<Int> = drop(1).toList().asSequence() yield(first) val morePrimes = rest.filter {i->i%first != 0}.onlyPrimes() yieldAll(morePrimes) } } ``` The output of this is 74 303 409 ns. We see that there's little difference, IO contention reduction benefits several parallel threads, in the case of the `sequence` no parallelization takes place. Moreover we don't have control on the assigned threads of the execution environment (Kotlin's playground server). # 3 - Streams and tweets All the code for section 3 is on file `StreamTweets.java`. ## 3.1 Parallel streams ### How the program has concurrency? Operations on streams are executed until their traversal (terminal operation). Until then, everything is just the definition of the pipeline that the computation should follow. All streams in the JDK are serial, unless explicitly asked, like in our case with the `.parallel` intermediate operation. This creates a parallel stream which ensures that all aggregate operations, and other intermediate operations that support this, are indeed parallel. Under the hood, such tasks are assigned to an optimized `ForkJoinPool`. When executing this example, the intended outcome occurs, we can see that parallelism occurs, since the order in which the values are processed in not guaranteed and is non-deterministic, however, the same values are always produced. This is further proof that our code is running in parallel. ### How many cores can it potentially utilize? Parallel streams implement their operation using a `ForkJoinPool` as its `ExecutorService`. As such, it implements work stealing strategies to maximize the CPU usage. By default the `ForkJoinPool` is initialized with the number of available hyperthreaded processors as parallelism level, i.e. the number of used cores. In the case of my system, that's 4. ### What is the role of `flatMap` and `gen10From` In our example a parallel stream with values 0, 100, ..., 500 is created. For each of this values, a stream is produced with `gen10From`. The `flatMap` for streams combine all this 6 streams into a single one. Such combined stream is the filtered to show only multiples of 3. ## 3.2 Counting tweets in 10 files ```java public static void countAllTweets(){ long nTweets = gen10From(0).parallel().map(i -> nTweetsInFile(i)).sum(); System.out.println("nTweets: " + nTweets); } public static int nTweetsInFile(int file){ return (int)streamTweets(file).count(); } ``` Indeed 14639 lines where retrieved. ## 3.3 Counting Delta tweets in parallel ```java public static void countDeltaTweets(){ long nDeltaTweets = gen10From(0) .parallel() .map(i -> nDeltaTweetsInFile(i)) .sum(); System.out.println("nDeltaTweets: "+nDeltaTweets); } public static int nDeltaTweetsInFile(int file) { return (int) streamTweets(file) .filter(tweet -> tweet.airline.equals("Delta")) .count(); } ``` Number of Delta tweets: 2222 ## 3.4 Counting negative tweets with "great" ```java public static void countNegativeGreatTweets() { long nTweets = gen10From(0) .parallel() .map(i -> nNegativeGreatTweetsInFile(i)) .sum(); System.out.println("nTweets: " + nTweets); } public static int nNegativeGreatTweetsInFile(int file) { return (int) streamTweets(file) .filter(tweet -> tweet.airline_sentiment.equals("negative")) .filter(tweet -> tweet.text.toLowerCase().contains("great")) .count(); } ``` Number of such tweets: 86 ## 3.5 Timing 3.4 I used the `Measurer` class from before, modified for milliseconds and with warm-up. `Measurer` was called 30,000 times with a warm-up of 10,500. ```java Measurer m = new Measurer(10_500); for (int i = 0; i < 30_000; i++){ m.startMeasurement(); nTweets = gen10From(0) .parallel() .map(j -> nNegativeGreatTweetsInFile(j)) .sum(); m.endMeasurement(); } System.out.println("Parallel timing: "+m.getResults()); m = new Measurer(10_500); for (int i = 0; i < 30_000; i++) { m.startMeasurement(); nTweetsSeq = gen10From(0) .map(j -> nNegativeGreatTweetsInFile(j)) .sum(); m.endMeasurement(); } System.out.println("Sequential timing: " + m.getResults()); ``` The parallel version was twice as fast. The sequential version took 28ms, and the parallel 15ms. ## 3.6 Unique tweeters ```java long count = Stream.of(0,1,2,3,4,5,6,7,8,9) .parallel() .flatMap(n-> streamTweets(n) .parallel() .map(t -> t.name)) .distinct() .count(); ``` Unique tweeters: 7701 ## 3.7 Grouped "great" tweets ```java public static void greatGroupedTweets(){ Stream.of(0,1,2,3,4,5,6,7,8,9) .parallel() .flatMap(n-> streamTweets(n) .parallel() .filter(t -> t.text.toLowerCase().contains("great")) ) .collect(Collectors.groupingByConcurrent(t->t.airline_sentiment)) .forEach((k, v)->System.out.println(k+": "+v.size())); } ``` Result of grouping: - negative: 86 - neutral: 19 - positive: 231 Handling opening and reading the files is done concurrently, we saw on 1.6 that this task is parallelizable, so performance is gained. Afterwards, for each file, a stream is created, which is also parallel, that filters the stream to tweets containing "great". After this `collect` is called, with the collector `groupingByConcurrent`. This collector is concurrent, although unordered, for our purposes, ordered is not required. This implies performance gains, since grouping can be called concurrently. The only sequential part is the intrinsically sequential IO for printing. ## 3.8 Unique tweeters for each airline I implemented the reading of file parallel by using `parallel` when reading. This was merged into a single stream via `flatMap`. After that, a `ConcurrentMap` was obtained by grouping concurrently by airline. From such `ConcurrentMap` I created a parallel stream of it's entries, to collect them again but maping the lists to the correponding distinct count. It's important to note that the behaviour of `distinct` when dealing with unordered streams as our case (being a parallel unordered stream), is designed to work concurrently without guarantees of order (which we don't require). When `count`ing the value, by being a terminal operation that requires the stream to be finished, it acts as a barrier for the threads of the pool, after which the result is computed. ```java Stream.of(0,1,2,3,4,5,6,7,8,9) .parallel() .flatMap(n->streamTweets(n)) .collect(Collectors.groupingByConcurrent(t->t.airline)) .entrySet() .stream() .parallel() .collect(Collectors.toConcurrentMap(e -> e.getKey(), e -> e.getValue() .stream() .parallel() .map(t -> t.name) .distinct() .count() )) .forEach((k, v) -> System.out.println(k + ": " + v)); ``` For completeness this were the results: ``` Southwest: 1622 US Airways: 1473 American: 1305 Delta: 1136 United: 1989 Virgin America: 394 ``` To even increase more the parallelization of this task, a common `ConcurrentHashMap`, used for collecting the individual results with a custom `Collector` of each file could be implemented. I couldn't manage to get this approach to work, however, its attempt is also as a comment in `StreamTweets.java`. ## 3.9 Top `negativereason` per airline ```java= public static void topNegativeReasons(){ class ReasonCount implements Comparable<ReasonCount>{ // Tuples are not a thing in Java 8 public String reason; public Integer count; ReasonCount(String r, Integer c){ reason = r; count = c; } public int compareTo(ReasonCount o){ return count - o.count; } } final Object printLock = new Object(); Stream.of(0,1,2,3,4,5,6,7,8,9) .parallel() .flatMap(n->streamTweets(n)) .collect(Collectors.groupingByConcurrent(t->t.airline)) .entrySet() .stream() .parallel() .collect(Collectors.toConcurrentMap( e -> e.getKey(), e -> { List<ReasonCount> l = e.getValue() .stream() .parallel() .collect(Collectors.groupingByConcurrent(t->t.negativereason)) .entrySet() .stream() .parallel() .map(f -> new ReasonCount(f.getKey(), f.getValue().size())) .filter( r -> !r.reason.equals("")) .collect(Collectors.toList()); l.sort(Comparator.reverseOrder()); return l.subList(0,3) .stream() .collect(Collectors.toMap(r -> r.reason, r -> r.count )); })) .forEach((airline, h)->{ synchronized(printLock){ System.out.println("Airline " + airline); h.forEach((reason, c) -> System.out .println(" - "+reason + ": " + c)); } }); } ``` I followed a very similar approach to 3.8. In contrast to that exercise, in this case, a sequential step couldn't be avoided, which is the one that required the value counts to be sorted. Also since the collection is done on a `ConcurrentMap` the order of the elements supplied `forEach` is not guaranteed. In order to show the results consistently y locked on an auxiliar `printLock`. Therefore this is sequential as well. For completeness the output was: ``` Airline Southwest - Can't Tell: 159 - Cancelled Flight: 162 - Customer Service Issue: 391 Airline US Airways - Can't Tell: 246 - Late Flight: 453 - Customer Service Issue: 811 Airline American - Late Flight: 249 - Cancelled Flight: 246 - Customer Service Issue: 768 Airline Delta - Can't Tell: 186 - Late Flight: 269 - Customer Service Issue: 199 Airline United - Can't Tell: 379 - Late Flight: 525 - Customer Service Issue: 681 Airline Virgin America - Can't Tell: 22 - Flight Booking Problems: 28 - Customer Service Issue: 60 ``` # 4 - Testing a hashmap ## 4.1 `OpenAddressingHashMapSync` The following methods were `synchronized` on the `this`object: - `findKeyOrEmpty`: This method reads the `entries` array, therefore `synchronized` is needed to avoid visibility issues. - `containsKey`: As before, `entries` is read. - `get`: As before, `entries` is read. - `put`: `entries` is read, and also mutated. Also the `filled` counter is updated. Therefore `synchronized` is needed for both avoiding visibility issues and race conditions when updating. - `remove`: As in `put`, both reads and mutations are made to shared state. - `size`: Required to avoid visibility issues since `filled` and `removed` are read. - `forEach`: The array `entries` is read for its iteration. Required for visibility. - `reHash`: For avoiding visibility and race condition issues. - `toString`: To avoid visibility issues. - `putIfAbsent`: Initially I thought it was superfluous because it only called `synchronized` methods. However, the reason for it requiring lock acquisition is subtle, the condition on `containsKey` and the consequence don't necesarily happen within the same thread, and things can change between them. Since the logic of what should happen depends on the same values, this should also happen within the same thread. Moreover the test of 4.2 fails sometimes without it. That is every method in the original implementation. I couldn't avoid locking on any of those for the reasons stated above. ## 4.2 Testing `OpenAddressingHashMapSync` A strategy as in Chapter 12 of [2] was followed. It was able to correctly identify synchronization issues in 4.1. And succeeded when a thread-safe implementation was given. ```java class ThreadSafeTest extends Tests{ private static final int CPU_CORES = 4; private static final int N_THREADS = CPU_CORES*2; private static final int N_OPS = 100_000; private static final int PER_THREAD = N_OPS/N_THREADS; private static final int N_KEYS = 20; private static final int RANGE = 100; // values on the operation between 0-RANGE private static final CyclicBarrier barrier = new CyclicBarrier(N_THREADS+1); private static final AtomicInteger checksum = new AtomicInteger(0); static void testMapThreadSafety(final OurMap<Integer, Integer> map){ System.out.printf("%nThread-safety test: %s%n", map.getClass()); for(int i=0; i<N_THREADS; i++){ new Thread(()->{ try{ barrier.await(); // To start the threads at the same time } catch(Exception e){} GoetzRandom rng = new GoetzRandom(); for(int j = 0; j < PER_THREAD; j++){ int operation = ((rng.next() % 3)+3)%3; int key = ((rng.next() % N_KEYS)+N_KEYS)%N_KEYS; if(operation == 0){ //put int value = rng.next() % RANGE; Integer prev = map.put(key, value); int previous = prev == null?0:prev.intValue(); checksum.getAndAdd(value-previous); } else if(operation == 1){ //putIfAbsent int value = rng.next() % RANGE; Integer prev = map.putIfAbsent(key, value); if(prev == null){ // Means the value was added checksum.getAndAdd(value); } } else{ //remove Integer prev = map.remove(key); if(prev != null){ checksum.getAndAdd(-prev); } } } try{ barrier.await(); } catch(Exception e){} }).start(); } try{ barrier.await(); // To start the threads at the same time barrier.await(); // To wait for everyone to finish } catch(Exception e){} final AtomicInteger realTotalSum = new AtomicInteger(0); map.forEach(new Consumer<Integer, Integer>(){ @Override public void accept(Integer k, Integer v){ realTotalSum.getAndAdd(v); } }); assertTrue(realTotalSum.get() == checksum.get()); System.out.println("Thread-safety: SUCCESS"); } } ``` ## 4.3 Implementing `OpenAddressingHashMapStriped` See the full implementation on the class `OpenAddressingHashMapStriped` of the file `OpenAddressingHashMap.java`. The idea was as in the implementation of the bucket hash map of week 12. The number of locks is a divisor of the size of the array to ensure thread-safety as discussed in that week. For that reason, the initial size of the entries array is defaulted on 60, a number with several divisors to explore when tuning. An important thing to note, is that even when locking only on the stripe that was needed for each of the key operations, the same issues that required `putIfAbsent` of `OpenAddressingHashMapSync` may arise. And that could happen to any operation that computes first the corresponding lock for the required key (`put`, `get`, `remove`). I couldn't get around this issue on the dependency of this block without synchronizing in another shared object. Namely the `this` object by prepending the `synchronized` keyword to this methods. This has a negative performance impact, since lock contention was what this approach attempted to reduce. Moreover the test on 4.2 either failed or deadlocked in other case. Timing measurements with different number of stripes: - 2: 6.8 s - 5: 7.2 s - 10: 7.2 s - 15: 7.2 s - 30: 7.1 s The measurements didn't reveal a relation with the number of stripes, must likely because of the unavoided lock contention described above. ## 4.4 Measuring `OpenAddressingHashMapStriped` against `OpenAddressingHashMapSync` The measurements for `OpenAddressingHashMapStriped` are established on 4.3. Measuring the test run for `OpenAddressingHashMapSync` revealed $4.9 s$. The outperformance of the `Sync` version is most likely due to the contention issues not being avoided as explained in 4.3, and even more, we're adding locks on top on the `Striped` version. # References 1 - [Java VM Options](https://www.oracle.com/java/technologies/javase/vmoptions-jsp.html) 2 - Java Concurrency in Practice: Goetz, Brian. 3 - [JLS - Memory Model](https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4)