--- title: "[Java8] 병렬처리" date: 2020-05-23 tags: [Java] --- *Java In Action 스터디 모임에서 학습한 내용* ### 들어가기 전에 + Java7 이전에는 데이터를 병렬 처리 하려면 좀 귀찮았다. 일단 데이터를 쪼개야 하고, 각각의 쓰레드에 할당해야하고, 각 쓰레드들이 작업을 다 마치길 기다린 다음 나눠진 결과들을 다 합쳐야 했다. Java7 에서는 fork/join 이라는 프레임워크를 통해 이 작업들을 일관적으로, 에러가 덜 발생하는 방향으로 작업할 수 있도록 했다. + Streams 인터페이스는 연속 데이터 스트림을 병행하여 처리할 수 있도록 바꾸는 역할을 한다. 이 내부 동작을 파악하는 것이 중요한데, 이걸 모르고 사용하게 되면 잘못 사용함으로써 생기는 bad results들을 맞이하게 될 것이다. + 특히, 각각의 데이터 덩어리들을 병렬 처리하는 것 학습하기 전에 병렬 스트림이 어떻게 덩어리 덩어리로 나눠지는지 Spliterator 를 활용할 것이다. ### 7.1 병렬 스트림 개요 + 이 전에 학습한 내용을 보면 parallelStream 메서드를 시용해서 컬렉션을 병렬 스트림으로 만들 수 있었다. parallel stream 은 컬렉션의 원소들을 여러 덩어리로 나누고, 그걸 각 쓰레드로 돌리는 것이다. 컴퓨터의 멀티 코어 프로세서들이 모두 공평하게/동일하게 바쁜 상태가 되도록 하는 것이다. (쉬는 녀석 없이.) + 예를 들어, n 이라는 숫자를 전달받아서 0부터 n까지의 합을 구하는 메서드를 생각해보자. ```java public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) //1. 무한 스트림 생성하는 부분 .limit(n) //2. 숫자 n 까지로 제한하는 부분 .reduce(0L, Long::sum); //3. reduce 로 숫자들을 더하는 부분 } // 혹은 전통적인 자바 코드라면 아래와 같다. public long iterativeSum(long n) { long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result; } ``` + 여기서 드는 추가 궁금증이 있다. n 이 아주 큰 숫자라면? 결과는 어떻게 동기화할까? 쓰레드는 몇 개 필요할까? ##### 7.1.1 연속 스트림을 병렬 스트림으로 바꿔보자 ```java public long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() //아까의 코드에서 이 줄이 추가되었다. .reduce(0L, Long::sum); } ``` + 위의 코드는 아래의 그림에서 보여주는 작업을 한다. + 내부적으로는 boolean 플래그가 parallel 작업을 하겠다는 신호탄으로 사용된다. + 반대로 parallel 작업을 sequential 로 바꾸는 것도 가능하다. + 추가설명필요!! + 아무튼 위와 같이 숫자들을 더하는 작업을 하는 데 세 가지 방법이 있다. interative style, sequential reduction, parallel reduction. 어떤게 더 빠른지 계속 비교해보자. ##### 참고) 병렬 스트림에 사용된 스레드 풀 조절하기 + parallel 메서드를 보면 병렬 스트림에 사용된 스레드들이 어디서 오는지, 어떻게 프로세스를 커스터마이징하는지 궁금할 수 있다. + parallel 스트림은 내부적으로 기본 ForkJoinPool을 사용하는데, 이건 가지고 있는 프로세서 숫자만큼의 쓰레드를 가지고 있다. (Runtime.getRuntime().available-Processors() 를 돌려보면 그 값을 알 수 있다.) + 하지만 아래와 같은 코드를 이용하면 pool 사이즈 조정도 가능하다. ```java System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12"); ``` + 전역 설정이기 때문에 작동시키고 있는 코드의 모든 병렬 스트림에 영향을 미칠 것이다. 하지만 하나의 특정한 병렬 스트림의 값을 변경하는 것은 불가능하다. 그러므로 꼭 이걸 변경해야할 이유가 없다면 굳이 바꾸지 않기를 추천한다. #### 7.1.2 스트림 퍼포먼스 측정하기 + 소프트웨어 개발을 하면서 퍼포먼스를 정확하게 확인하고 싶으면 "측정하고/측정하고/측정해야 한다." + 그래서 우리는 Java Microbenchmark Harness (JMH) 이라는 라이브러리를 이용할 것이다. 이것은 annotation 베이스이고, 자바 프로그램의 안정적인 microbenchmark다. (벤치마크 : 컴퓨터 프로그램, 프로그램 세트 또는 기타 작업을 실행하여 오브젝트의 상대적 성능을 평가하는 활동). 사실, JVM 위에서 알맞고 의미있는 벤치마크 를 개발하는 것은 쉽지 않은 일이다. 왜냐면 bytecode를 최적화하기 위해 HotSpot 이 요구하는 warm-up 시간이나, 가비지 컬렉터로 인해 발생하는 오버헤드와 같은 요소들 등 고려해야할 것들이 엄청 많기 때문이다. + 만약 Maven과 같은 툴을 쓴다면 JMH를 사용하기 위해, (Maven 빌드 프로세스를 정의하는) pom.xml 파일에 몇가지 dependency 만 추가하면 된다. 아래의 xml 을 보자 ```java <dependency> <groupId>org.openjdk.jmh</groupId> //JMH 코어를 실행하는 라이브러리 <artifactId>jmh-core</artifactId> <version>1.17.4</version> </dependency> <dependency> <groupId>org.openjdk.jmh</groupId> //JAR(Java Archive)를 생성하는 데 도움을 주는 annotation processor를 가지고 있다. <artifactId>jmh-generator-annprocess</artifactId> <version>1.17.4</version> </dependency> ``` + 더불어, Maven 설정에 아래와 같은 코드를 추가하면 쉽게 벤치마크를 실행할 수 있다. ```java <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <finalName>benchmarks</finalName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.openjdk.jmh.Main</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> ``` + 저렇게 설정함으로써 sequentialSum 이라는 메서드를 벤치마크할 수 있다. 코드를 보자. ```java @BenchmarkMode(Mode.AverageTime) //벤치마킹한 메서드에 걸리는 평균 시간을 측정한다는 의미 @OutputTimeUnit(TimeUnit.MILLISECONDS) //milliseconds 단위로 시간을 출력한다. @Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"}) //4Gb의 힙 저장 공간을 사용하여, 벤치마크를 두배 올린다. public class ParallelStreamBenchmark { private static final long N= 10_000_000L; @Benchmark //sequentialSum 메서드를 벤치마크하겠다는 표시 public long sequentialSum() { return Stream.iterate(1L, i -> i + 1).limit(N) .reduce( 0L, Long::sum); } @TearDown(Level.Invocation) // 각각의 반복을 수행한 뒤, 가비지 컬렉터를 돌린다. public void tearDown() { System.gc(); } } ``` + 이 클래스를 돌리면 이전에 설정한 Maven 플러그인이 benchmarks.jar 이름을 가진 JAR 파일을 생성하는데, 아래와 같이 사용할 수 있다. ```java java -jar ./target/benchmarks.jar ParallelStreamBenchmark ``` + 그 이하 과정은 생략. 결과만 보면 sequentialSum은 121.843ms/op(응답시간 milliseconds per operation), parallelSum은 604.059ms/op, interativeSum은 3.278ms/op 가 걸린다. parallelSum이 sequentialSum보다 다섯배나 걸리다니. 쿼드 코어의 장점을 활용하지도 못했다. 하지만 여기에는 두 가지 이슈가 있다. + 첫째, iterate 는 boxed object (객체형) 를 만들기 때문에, 추가되기 전에 숫자로 unbox 되어야 할 필요가 있다. 둘째, iterate는 parallel을 실행하기 위해 독립적인 덩어리로 나누는 게 쉽지 않다. reduce 를 실행시킬 때 전체 숫자 리스트를 효율적으로 덩어리 덩어리로 나누는 것이 불가능하기 때문이다. + 둘째, iterate 와 같이 병행처리에 적합하지 않은 작업에 병행 처리를 하게 되면 전체적으로 프로그램의 퍼포먼스를 떨어뜨릴 수 있다. + 즉, 그림 7-1처럼 reduction 과정이 흘러가지 않았다. reduction 과정이 시작될 때 전체 숫자 리스트는 병렬 처리를 위한 chunking 을 하기에 부적절하다. ##### 그러면 더 특화된 함수를 사용해보자 + 어떻게 효과적으로 병렬처리를 수행하고, 멀티코어 프로세서를 잘 활용할 수 있을까? + LongStream.rangeClosed 와 iterate 를 비교해 장점을 살펴보자 : + 1) LongStream.rangeClosed은 primitive type 을 사용하기 때문에 boxing, unboxing 오버헤드가 줄어든다. 2) LongStream.rangeClosed는 숫자 범위를 생성하기 때문에 독립 덩어리로 나누는 것이 쉽다. (예. 1-20까지의 숫자를 1-5, 6-10, 11-15, 16-20으로 나누기) ```java @Benchmark public long rangedSum() { return LongStream.rangeClosed(1, N) .reduce(0L, Long::sum); } ``` + 위와 같은 함수를 추가하여 unboxing관련 오버헤드가 sequential stream과 관계가 있는지 알 수 있다. + 결과를 보면 5.315ms/op 가 걸린다. 불필요한 boxing, unboxing 을 하지 않음으로써 시간이 이렇게 절약되었다. 병렬 처리를 무조건 하는 것보다 적절한 자료 구조를 선택하는 것이 더 중요하다는 것을 알 수 있다. 똑같이 병렬 스트림에 적용해보자. ```java @Benchmark public long parallelRangedSum() { return LongStream.rangeClosed(1, N) .parallel() .reduce(0L, Long::sum); } ``` + 시간은 2.677ms/op 가 걸린다. 앞서 7.1에서 설명한 것처럼 작동하는 것이다. 기존의 iterative 보다 20% 더 빠르다. + 다만, 병렬 프로세스는 스트림을 반복적으로 partitioning(나누기)해야 하고, 서로 다른 쓰레드에 각각의 substream의 reduce작동을 할당하고, 각각의 결과를 취합해야한다. 하지만 멀티 코어들끼리 데이터를 주고받는 것은 생각보다 소비적(시간이 많이 걸리)이므로, 다른 한 코어에서 병렬처리가 되어야 하는 작업이 코어에서 코어로 데이터를 주고받는 것보다 시간이 더 걸린다. ##### 7.1.3 병렬 스트림 적절하게 사용하기 + 병렬 스트림을 잘못 사용하면서 에러를 만드는 주요 원인 중의 하나는 공유하고 있는 state 를 mutate 하는 알고리즘을 사용하는 것이다. + 명령형 프로그래밍 imperative programming 에 익숙한 개발자들은 아래와 같은 코드에 익숙할 것이다. ```java public long sideEffectSum(long n) { // Accumulator 라는 클래스를 초기화하고, 하나씩 숫자를 돌려서 더하는 코드. Accumulator accumulator = new Accumulator(); // accumulator를 초기화하고 LongStream.rangeClosed(1, n).forEach(accumulator::add); // 원소에 하나씩 접근해서, 더한다. (계속 total에 접근함.) return accumulator.total; // } public class Accumulator { public long total = 0; public void add(long value) { total += value; } } // 위와 같은 코드를 나중에 동기화 작업으로 고치려고 하면, 병행처리를 하지 못하게 될 것이다. // parallel 로 고치면 아래와 같다. public long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } // 아래 코드를 이용해 각각의 결과를 출력해보면 System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs" ); // 이렇게 출력될 것이다. Result: 5959989000692 Result: 7425264100768 Result: 6827235020033 Result: 7192970417739 Result: 6714157975331 Result: 7497810541907 Result: 6435348440385 Result: 6999349840672 Result: 7435914379978 Result: 7715125932481 SideEffect parallel sum done in: 49 msecs ``` + 지금 퍼포먼스 시간은 중요하지 않다. 50000005000000.이 맞는 값인데, 보면 전부 그 값이 나오지 않고 있다. 멀티 쓰레드가 동시에 accumulator 에 접근하고 total += value 를 수행하고 있기 때문이다. 즉, 공유하고 있는 object 를 계속 mutating 하는 forEach 문에서 이 문제가 발생하고 있는 것이다. ##### 7.1.4 효과적으로 병렬 스트림을 사용하게 하는 몇 가지 팁들 + 만약 의심이 들면, 측정해봐라. 항상 연속 스트림을 병렬 스트림으로 바꾸는 것은 현명하지 못하다. 미리 연속 스트림과 병렬 스트림 각각의 버전으로 측정해본 다음 퍼포먼스가 더 좋은 것을 선택해라 + boxing 에 유의하라. 자동 boxing, unboxing 은 퍼포먼스에 중대한 영향력을 미친다. Java 8은 이런 작업들을 피하기 위해 원시 스트림을 가지고 있다.(예를 들어, IntStream, LongStream, DoubleStream 등). 그러므로 가능하다면 이것들을 사용해라. + 몇몇 동작들은 병렬 스트림보다 연결 스트림이 더 맞다. limit나 findFirst와 같이 원소들의 순서가 중요한 작업들은 병렬 처리로 하기에는 너무 소모적이다. 예를 들어 findAny보다 findFirst가 더 잘 작동할 것이다. unordered라는 메서드를 실행함으로써 정렬된 스트림을 비정렬된 스트림으로 바꿀 수 있다. 예를 들어 만약 스트림의 N 개의 원소가 필요한데 첫번째 N개의 원소는 필요없다면, 비정렬된 병렬 스트림에 limit을 사용함으로써 더 효과적으로 작업할 수 있을 것이다. + 스트림으로 수행되는 동작들의 파이프라인에 사용되는 비용적인 측면들을 고려해라. 예를 들면 몇 개의(P개) 원소들인지, 스트림 파이프라인에 그 중 하나의 원소를 처리하는 비용은 어느 정도인지(Q). P*Q를 곱하면 그 총 비용이 나온다. Q가 높을수록 병행처리에 적합하다고 볼 수 있겠다. + 적은 양의 데이터만 처리해야 한다면, 병행 처리를 꼭 사용할 필요가 없다. + 스트림을 구성하는 자료구조가 얼마나 잘 분해되는지 고려하라. 예를 들어, ArrayList는 LinkedList보다 더 잘 나눠진다. 왜냐하면 ArrayList는 전체를 훑어볼 필요없이 나눠지는데 비해, LinkedList는 전체를 훑어야 때문이다. 또한, range 팩토리 메서드로 만들어진 원소 스트림은 더 빨리 분해된다. 마지막으로, 7.3 에서 배우겠지만 개발자 자체의 Spliterator 를 사용해서 decomposition 분해 프로세스의 전체를 컨트롤할 수 있다. + 마지막으로, 병렬 스트림으로 인해 병렬 처리를 작동시키는 인프라는 fork/join 프레임워크라는 것을 중요하게 생각해야 한다. *[Table 7.1] 참조* ### 7.2 form/join 프레임워크 + 이 프레임워크는 반복적으로 task 를 작게 쪼개고, 그 subtask 들의 결과를 합치는 작업을 한다. ExecutorService 인터페이스의 실행이라고 할 수 있는데, ForkJoinPool 이라는 쓰레드 풀에서 워커 쓰레드로 task 들을 분배한다. ##### 7.2.1 RecursiveTask의 작동 + 이 풀에 작업을 넣기 위해서는 RecursiveTask<R> 이라는 서브 클래스를 만들어야 한다. R 은 병행 작업으로 생성되는 결과의 타입, 혹은 작업의 결과가 없을 때 RecursiveAction의 타입을 말한다. RecursiveTask 를 정의하기 위해서는 compute 라는 하나의 추상 메서드만 실행하면 된다. ```java protected abstract R compute(); ``` + 이 메서드는 여러 서브 작업으로 나누는 로직과 더이상 나눠질 수 없을 만큼의 작은 서브 태스크들로 나눌 수 없을 때 하나의 서브태스크 결과를 생산하는 알고리즘을 가지고 있다. ```java // pseudo code 로 나타내보면 if(작업들이 충분히 작거나 더이상 나눠지지 않는다면){ 작업들을 순서대로 수행한다 } else { 1. 작업을 두 개의 서브 태스크로 나눈다. 2. 반복해서 이 메서드를 부르고 각각의 서브 태스트들을 나눈다. 3. 모든 서브태스크들이 끝나길 기다린다. 4. 각 서브태스크들의 결과를 합친다. } ``` + 더 작은 태스크 단위로 나눌 수 있느냐에 특정한 기준은 없다. 하지만 어느정도 통용되는 규칙들은 7.2.2 에서 더 살펴볼 것이다. *표 7.3 참조* + fork/join 프레임워크를 사용해서 병행처리로 합을 구해보자 ```java // 코드 7.2 public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> { // fork/join 프레임워크와 사용할 수 있는 task를 만들기 위해 RecursiveTask 로 extend 했다. private final long[] numbers; // 이 배열의 숫자들은 더해질 것이다. private final int start; // subtask들이 가지고 있는 subarray의 첫번째/마지막 위치 private final int end; public static final long THRESHOLD = 10_000; // subtask를 나누는 기준이 되는 threshold 값을 지정. public ForkJoinSumCalculator(long[] numbers) { // 주요 업무(main task)의 subtask 들을 생성하기 위해 필요한 private 생성자 this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override // RecursiveTask의 추상 메서드를 override 한다. protected Long compute() { int length = end - start; // 이 task 를 수행한 뒤 모두 더한 subarray 의 크기 if (length <= THRESHOLD) { return computeSequentially(); // 만약 사이즈가 threshoold 기준보다 작거나 같으면, 결과를 순차적으로 계산한다. } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); // 배열을 반으로 나누고, 첫번째 반 의 합을 구하는 subtask를 생성한다. leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end); // 두번째 반의 합을 구하는 subtask를 생성한다. Long rightResult = rightTask.compute(); // 이 두번째 subtask를 동시에 실행한다. 사실 이 작업으로 split 을 더 허용하게 된다. Long leftResult = leftTask.join(); // 첫번째 subtask 의 결과를 읽는다. 만약 다 끝나지 않았다면 기다린다. return leftResult + rightResult; // subtask 두 개의 결과를 합한다. } // threshold보다 적은 사이즈들을 구하는 간단한 순차 알고리즘이다. private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } ``` + 첫번째 n개 자연수들의 합을 병렬로 구하는 메서드가 이제 깔끔해졌다. 아래의 ForkJoinSumCalculator 생성자에 원하는 숫자의 배열을 파라미터로 주면 된다. ```java public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); } // 보면, LongStream 을 사용해서 첫번째 n 개의 자연수를 포함한 배열을 생성했다. 그리고나서 ForkJoinTask 를 생성했는데 (RecursiveTask의 슈퍼클래스이기도 하다.), 금방 위에서 본 코드 7.2 에서 본 ForkJoinSumCalculator의 public 생성자에 이 배열을 넘겨주었다. // 그리고 ForkJoinPool 을 새로 만들어서 그것의 invoke 메서드에 해당 task 를 넘겨주었다. 마지막 메서드에서 반환한 값은 ForkJoinPool 내부에서 ForkJoinSumCalculator에 의해 정의된 task 가 실행되었을 때 얻어진 값이다. ``` + 실제 프로그래밍을 할 때, 하나 이상의 ForkJoinPool 을 쓰는건 타당하지 않다. 그래서 한 번 instance 를 만들고 static 정적 공간에 놔둔뒤 싱글톤으로 사용해야 한다. 그래야 소프트웨어의 어느 파트에서든 편하게 재사용할 수 있다. ##### ForkJoinSumCalculator 돌리기 + ForkJoinPool 로 ForkJoinSumCalculator task를 보내면, 이 task는 pool 의 쓰레드가 실행하게 되며, 이 task 의 compute 메서드를 부르게 된다. compute 메서드는 task 가 순차적으로 실행되기에 딱 적당하게 작은 크기인지를 체크한다. 그렇지 않다면 더해야 할 숫자들의 배열을 둘로 나눠서 ForkJoinSumCalculator 두 개를 각각 할당한다. + 이와 같은 과정은 반복되며, 더 이상 나눌 수 없거나 한 상태가 될 때까지, 원래의 task 를 둘로 나눈다. 각각의 subtask 들은 순차적으로 진행되며, forking 작업을 하면서 생성된 binary tree 형태의 task 들은 root 부터 차례대로 훑게 된다. 이 task 의 결과는 이 나눠진 subtask 들이 수행한 결과를 합한다. *그림 7.2 참조* + 퍼포먼스는 아래와 같은 코드로 측정할 수 있다+ ```java System.out.println("ForkJoin sum done in: " + measureSumPerf( ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs" ); ``` + 결과는 아마 "ForkJoin sum done in: 41 msecs" 로 도출될 것이다. + 병렬 stream 보다 못 미치는 수준이다. 하지만 ForkJoinSumCalculator task 에서 저 작업을 시도하기 전에 어쩔 수 없이 전~체 숫자 stream 을 long[] 에 넣어서 실행했기 때문이다. ##### fork/join 프레임워크를 활용하는 최고의 방법들 + 책..... ##### Work Stealing 알고리즘 + task 들이 ForkJoinPool 에서 전체 쓰레드에 나눠진다는 뜻인데, 각각의 쓰레드들은 double linked queue 로 짜여진 task 들을 가지고 있게 되며, task 하나를 끝내자마자 queue 의 head 로부터 하나를 땡겨와서 또 실행하기 시작한다. + 다른 쓰레드가 바쁘게 일하고 있더라도 queue 가 비어있을 가능성이 생기는데, idle 상태로 두는 대신, 쓰레드는 random 하게 다른 쓰레드의 queue 를 선택해서 task 를 "훔쳐오면" 된다. + 이 알고리즘은 pool 에서 일하는 thread 들 간의 작업 균형을 맞추기 위해 쓰인다. (그림 7.5 참조) + stream 을 나누는 데 사용되는 자동 메커니즘은 Spliterator 라고 하는데, 바로 이어서 설명하겠다. ### Spliterator + 일반적인 Iterator 처럼 원소들 전체를 살펴보는 데 사용된다. 하지만 "병렬로" 작업한다는 데 차이가 있다. + Java8 에서는 Collections 프레임워크에 포함된 모든 자료구조에 대해 기본적으로 Spliterator 를 지원하고 있다. + Collectino 인터페이스는 기본 함수인 spliterator() 를 제공하는데, Spliterator 오브젝트를 리턴한다. ```java public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); } ``` #### 7.3.1 split 하는 과정 + 위의 함수는 아래 그림과 같이 반복하여 trySplit을 호출한다. + 그러다가 Spliterator 들이 null 을 리턴하는 순간 과정이 끝난다. + 이와 같은 과정들은 characteristics 메서드에 의해 선언되는 Spliterator의 특성에 영향을 받는다. ##### 7.3.2 직접 Spliterator 를 실행하기 + Iterative 버전으로 String의 단어 수를 카운팅하는 간단한 메서드를 작성해보자. ```java public int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { // 모든 캐릭터들을 하나씩 훑어 본다. if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) counter++; // 카운팅 숫자를 늘린다. lastSpace = false; } } return counter; } ``` + 만약 아래와 같은 스트링으로 단어의 개수를 뽑는다면 19개가 나올 것이다. ```javascript final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words"); ``` + 그럼 이제 위의 함수를 병렬 처리하여 작업해보자. ##### 함수형 스타일로 word counting 을 다시 작성해보자 + 일단 String 을 stream 으로 변환하는 작업이 필요하다. 안타깝게도 int, long, double 에만 primitive stream 원시 스트림이 있기 때문에 Stream<Chatacter>를 써야할 것이다. ```java Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); ``` + 스트림을 나누면서, 단어의 수를 세는 int 변수와, 마지막 단어인지 아닌지를 체크하는 boolean 변수, 두 가지를 체크해야할 것이다. Java는 튜플 (wrapper 오브젝트 필요없이 서로 다른 원소들의 정렬된 리스트를 표현할 때 사용하는 자료구조)이 없기 때문에, WordCounter 라는 새로운 클래스를 만들어보자. 이 클래스가 위의 두 가지 변수들을 가지고 있을 것이다. ```java class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { // String에서 단어를 하나씩 훑어보는 알고리즘 if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter+1, false) : // 빈칸을 발견하거나 더 훑어볼 단어가 없으면 +1 한다. this; } } public WordCounter combine(WordCounter wordCounter) { // 두 개의 wordcounter를 합친다. counter를 더함. return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); // counter의 합들을 사용함으로써 마지막 빈칸에 대해서는 신경을 안 써도 된다. } public int getCounter() { return counter; } } ``` + 여기서 accumulate 함수는 WordCounter의 상태가 어떻게 변하는지를 보여준다. 더 정확하게 말하면, 불변 클래스이기 때문에 새로운 WordCounter를 생성?? + 불변 클래스로 state를 증가시킴으로써 다음 단계에서 병렬화될 수 있는 것이다. + accumulate 메서드는 새로운 Character 스트림을 훑어볼 때마다 불린다. 특히, 빈칸이 없거나 마지막 단어의 다음이 빈칸일 때 counter가 증가한다. + 두 번째 메서드인 combine은 Character 스트림의 두 개의 각각 다른 서브 파트에서 작업한 WordCounter의 부분 결과를 합할 때 불린다. 그래서 내부 counter의 값을 더하는 것이다. + 지금까지 생각해봤던 로직들을 함수로 표현하면 아래와 같다. ```java private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } // 그리고 위에서 이용한 String 을 다시 넣어서 작업해보자 Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words"); ``` + 그러면 결과는 또 ```java Found 19 words ``` 라고 잘 나올것이다. ##### WordCounter를 병렬화시키자. ```java System.out.println("Found " + countWords(stream.parallel()) + " words"); ``` + 이렇게 코드를 수정하면 이상하게 결과는 Found 25 words 라고 나온다. + 원래 String 값이 나눠지는 부분이 랜덤하기 때문에, 때때로 단어는 두 개로 나눠지고 두 번 카운팅되기도 한다. 이렇게 String을 병렬처리로 작업하는 것이 항상 옳은 결과만 보여주지는 않는 다는 것을 얘기하기도 한다. 그래서 필요한게 Spliterator 다. ```java class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NON-NULL + IMMUTABLE; } } ```