---
title: "[Java8] 병렬처리"
date: 2020-05-23
tags: [Java]
---
*Java In Action 스터디 모임에서 학습한 내용*
### 들어가기 전에
+ Java7 이전에는 데이터를 병렬 처리 하려면 좀 귀찮았다. 일단 데이터를 쪼개야 하고, 각각의 쓰레드에 할당해야하고, 각 쓰레드들이 작업을 다 마치길 기다린 다음 나눠진 결과들을 다 합쳐야 했다. Java7 에서는 fork/join 이라는 프레임워크를 통해 이 작업들을 일관적으로, 에러가 덜 발생하는 방향으로 작업할 수 있도록 했다.
+ Streams 인터페이스는 연속 데이터 스트림을 병행하여 처리할 수 있도록 바꾸는 역할을 한다. 이 내부 동작을 파악하는 것이 중요한데, 이걸 모르고 사용하게 되면 잘못 사용함으로써 생기는 bad results들을 맞이하게 될 것이다.
+ 특히, 각각의 데이터 덩어리들을 병렬 처리하는 것 학습하기 전에 병렬 스트림이 어떻게 덩어리 덩어리로 나눠지는지 Spliterator 를 활용할 것이다.
### 7.1 병렬 스트림 개요
+ 이 전에 학습한 내용을 보면 parallelStream 메서드를 시용해서 컬렉션을 병렬 스트림으로 만들 수 있었다. parallel stream 은 컬렉션의 원소들을 여러 덩어리로 나누고, 그걸 각 쓰레드로 돌리는 것이다. 컴퓨터의 멀티 코어 프로세서들이 모두 공평하게/동일하게 바쁜 상태가 되도록 하는 것이다. (쉬는 녀석 없이.)
+ 예를 들어, n 이라는 숫자를 전달받아서 0부터 n까지의 합을 구하는 메서드를 생각해보자.
```java
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1) //1. 무한 스트림 생성하는 부분
.limit(n) //2. 숫자 n 까지로 제한하는 부분
.reduce(0L, Long::sum); //3. reduce 로 숫자들을 더하는 부분
}
// 혹은 전통적인 자바 코드라면 아래와 같다.
public long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
```
+ 여기서 드는 추가 궁금증이 있다. n 이 아주 큰 숫자라면? 결과는 어떻게 동기화할까? 쓰레드는 몇 개 필요할까?
##### 7.1.1 연속 스트림을 병렬 스트림으로 바꿔보자
```java
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //아까의 코드에서 이 줄이 추가되었다.
.reduce(0L, Long::sum);
}
```
+ 위의 코드는 아래의 그림에서 보여주는 작업을 한다.
+ 내부적으로는 boolean 플래그가 parallel 작업을 하겠다는 신호탄으로 사용된다.
+ 반대로 parallel 작업을 sequential 로 바꾸는 것도 가능하다.
+ 추가설명필요!!
+ 아무튼 위와 같이 숫자들을 더하는 작업을 하는 데 세 가지 방법이 있다. interative style, sequential reduction, parallel reduction. 어떤게 더 빠른지 계속 비교해보자.
##### 참고) 병렬 스트림에 사용된 스레드 풀 조절하기
+ parallel 메서드를 보면 병렬 스트림에 사용된 스레드들이 어디서 오는지, 어떻게 프로세스를 커스터마이징하는지 궁금할 수 있다.
+ parallel 스트림은 내부적으로 기본 ForkJoinPool을 사용하는데, 이건 가지고 있는 프로세서 숫자만큼의 쓰레드를 가지고 있다. (Runtime.getRuntime().available-Processors() 를 돌려보면 그 값을 알 수 있다.)
+ 하지만 아래와 같은 코드를 이용하면 pool 사이즈 조정도 가능하다.
```java
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
```
+ 전역 설정이기 때문에 작동시키고 있는 코드의 모든 병렬 스트림에 영향을 미칠 것이다. 하지만 하나의 특정한 병렬 스트림의 값을 변경하는 것은 불가능하다. 그러므로 꼭 이걸 변경해야할 이유가 없다면 굳이 바꾸지 않기를 추천한다.
#### 7.1.2 스트림 퍼포먼스 측정하기
+ 소프트웨어 개발을 하면서 퍼포먼스를 정확하게 확인하고 싶으면 "측정하고/측정하고/측정해야 한다."
+ 그래서 우리는 Java Microbenchmark Harness (JMH) 이라는 라이브러리를 이용할 것이다. 이것은 annotation 베이스이고, 자바 프로그램의 안정적인 microbenchmark다. (벤치마크 : 컴퓨터 프로그램, 프로그램 세트 또는 기타 작업을 실행하여 오브젝트의 상대적 성능을 평가하는 활동). 사실, JVM 위에서 알맞고 의미있는 벤치마크 를 개발하는 것은 쉽지 않은 일이다. 왜냐면 bytecode를 최적화하기 위해 HotSpot 이 요구하는 warm-up 시간이나, 가비지 컬렉터로 인해 발생하는 오버헤드와 같은 요소들 등 고려해야할 것들이 엄청 많기 때문이다.
+ 만약 Maven과 같은 툴을 쓴다면 JMH를 사용하기 위해, (Maven 빌드 프로세스를 정의하는) pom.xml 파일에 몇가지 dependency 만 추가하면 된다. 아래의 xml 을 보자
```java
<dependency>
<groupId>org.openjdk.jmh</groupId> //JMH 코어를 실행하는 라이브러리
<artifactId>jmh-core</artifactId>
<version>1.17.4</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId> //JAR(Java Archive)를 생성하는 데 도움을 주는 annotation processor를 가지고 있다.
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.17.4</version>
</dependency>
```
+ 더불어, Maven 설정에 아래와 같은 코드를 추가하면 쉽게 벤치마크를 실행할 수 있다.
```java
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
```
+ 저렇게 설정함으로써 sequentialSum 이라는 메서드를 벤치마크할 수 있다. 코드를 보자.
```java
@BenchmarkMode(Mode.AverageTime) //벤치마킹한 메서드에 걸리는 평균 시간을 측정한다는 의미
@OutputTimeUnit(TimeUnit.MILLISECONDS) //milliseconds 단위로 시간을 출력한다.
@Fork(2, jvmArgs={"-Xms4G", "-Xmx4G"}) //4Gb의 힙 저장 공간을 사용하여, 벤치마크를 두배 올린다.
public class ParallelStreamBenchmark {
private static final long N= 10_000_000L;
@Benchmark //sequentialSum 메서드를 벤치마크하겠다는 표시
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce( 0L, Long::sum);
}
@TearDown(Level.Invocation) // 각각의 반복을 수행한 뒤, 가비지 컬렉터를 돌린다.
public void tearDown() {
System.gc();
}
}
```
+ 이 클래스를 돌리면 이전에 설정한 Maven 플러그인이 benchmarks.jar 이름을 가진 JAR 파일을 생성하는데, 아래와 같이 사용할 수 있다.
```java
java -jar ./target/benchmarks.jar ParallelStreamBenchmark
```
+ 그 이하 과정은 생략. 결과만 보면 sequentialSum은 121.843ms/op(응답시간 milliseconds per operation), parallelSum은 604.059ms/op, interativeSum은 3.278ms/op 가 걸린다. parallelSum이 sequentialSum보다 다섯배나 걸리다니. 쿼드 코어의 장점을 활용하지도 못했다. 하지만 여기에는 두 가지 이슈가 있다.
+ 첫째, iterate 는 boxed object (객체형) 를 만들기 때문에, 추가되기 전에 숫자로 unbox 되어야 할 필요가 있다. 둘째, iterate는 parallel을 실행하기 위해 독립적인 덩어리로 나누는 게 쉽지 않다. reduce 를 실행시킬 때 전체 숫자 리스트를 효율적으로 덩어리 덩어리로 나누는 것이 불가능하기 때문이다.
+ 둘째, iterate 와 같이 병행처리에 적합하지 않은 작업에 병행 처리를 하게 되면 전체적으로 프로그램의 퍼포먼스를 떨어뜨릴 수 있다.
+ 즉, 그림 7-1처럼 reduction 과정이 흘러가지 않았다. reduction 과정이 시작될 때 전체 숫자 리스트는 병렬 처리를 위한 chunking 을 하기에 부적절하다.
##### 그러면 더 특화된 함수를 사용해보자
+ 어떻게 효과적으로 병렬처리를 수행하고, 멀티코어 프로세서를 잘 활용할 수 있을까?
+ LongStream.rangeClosed 와 iterate 를 비교해 장점을 살펴보자 :
+ 1) LongStream.rangeClosed은 primitive type 을 사용하기 때문에 boxing, unboxing 오버헤드가 줄어든다. 2) LongStream.rangeClosed는 숫자 범위를 생성하기 때문에 독립 덩어리로 나누는 것이 쉽다. (예. 1-20까지의 숫자를 1-5, 6-10, 11-15, 16-20으로 나누기)
```java
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N)
.reduce(0L, Long::sum);
}
```
+ 위와 같은 함수를 추가하여 unboxing관련 오버헤드가 sequential stream과 관계가 있는지 알 수 있다.
+ 결과를 보면 5.315ms/op 가 걸린다. 불필요한 boxing, unboxing 을 하지 않음으로써 시간이 이렇게 절약되었다. 병렬 처리를 무조건 하는 것보다 적절한 자료 구조를 선택하는 것이 더 중요하다는 것을 알 수 있다. 똑같이 병렬 스트림에 적용해보자.
```java
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
```
+ 시간은 2.677ms/op 가 걸린다. 앞서 7.1에서 설명한 것처럼 작동하는 것이다. 기존의 iterative 보다 20% 더 빠르다.
+ 다만, 병렬 프로세스는 스트림을 반복적으로 partitioning(나누기)해야 하고, 서로 다른 쓰레드에 각각의 substream의 reduce작동을 할당하고, 각각의 결과를 취합해야한다. 하지만 멀티 코어들끼리 데이터를 주고받는 것은 생각보다 소비적(시간이 많이 걸리)이므로, 다른 한 코어에서 병렬처리가 되어야 하는 작업이 코어에서 코어로 데이터를 주고받는 것보다 시간이 더 걸린다.
##### 7.1.3 병렬 스트림 적절하게 사용하기
+ 병렬 스트림을 잘못 사용하면서 에러를 만드는 주요 원인 중의 하나는 공유하고 있는 state 를 mutate 하는 알고리즘을 사용하는 것이다.
+ 명령형 프로그래밍 imperative programming 에 익숙한 개발자들은 아래와 같은 코드에 익숙할 것이다.
```java
public long sideEffectSum(long n) {
// Accumulator 라는 클래스를 초기화하고, 하나씩 숫자를 돌려서 더하는 코드.
Accumulator accumulator = new Accumulator(); // accumulator를 초기화하고
LongStream.rangeClosed(1, n).forEach(accumulator::add); // 원소에 하나씩 접근해서, 더한다. (계속 total에 접근함.)
return accumulator.total; //
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
// 위와 같은 코드를 나중에 동기화 작업으로 고치려고 하면, 병행처리를 하지 못하게 될 것이다.
// parallel 로 고치면 아래와 같다.
public long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
// 아래 코드를 이용해 각각의 결과를 출력해보면
System.out.println("SideEffect parallel sum done in: " +
measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + "
msecs" );
// 이렇게 출력될 것이다.
Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7497810541907
Result: 6435348440385
Result: 6999349840672
Result: 7435914379978
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs
```
+ 지금 퍼포먼스 시간은 중요하지 않다. 50000005000000.이 맞는 값인데, 보면 전부 그 값이 나오지 않고 있다. 멀티 쓰레드가 동시에 accumulator 에 접근하고 total += value 를 수행하고 있기 때문이다. 즉, 공유하고 있는 object 를 계속 mutating 하는 forEach 문에서 이 문제가 발생하고 있는 것이다.
##### 7.1.4 효과적으로 병렬 스트림을 사용하게 하는 몇 가지 팁들
+ 만약 의심이 들면, 측정해봐라. 항상 연속 스트림을 병렬 스트림으로 바꾸는 것은 현명하지 못하다. 미리 연속 스트림과 병렬 스트림 각각의 버전으로 측정해본 다음 퍼포먼스가 더 좋은 것을 선택해라
+ boxing 에 유의하라. 자동 boxing, unboxing 은 퍼포먼스에 중대한 영향력을 미친다. Java 8은 이런 작업들을 피하기 위해 원시 스트림을 가지고 있다.(예를 들어, IntStream, LongStream, DoubleStream 등). 그러므로 가능하다면 이것들을 사용해라.
+ 몇몇 동작들은 병렬 스트림보다 연결 스트림이 더 맞다. limit나 findFirst와 같이 원소들의 순서가 중요한 작업들은 병렬 처리로 하기에는 너무 소모적이다. 예를 들어 findAny보다 findFirst가 더 잘 작동할 것이다. unordered라는 메서드를 실행함으로써 정렬된 스트림을 비정렬된 스트림으로 바꿀 수 있다. 예를 들어 만약 스트림의 N 개의 원소가 필요한데 첫번째 N개의 원소는 필요없다면, 비정렬된 병렬 스트림에 limit을 사용함으로써 더 효과적으로 작업할 수 있을 것이다.
+ 스트림으로 수행되는 동작들의 파이프라인에 사용되는 비용적인 측면들을 고려해라. 예를 들면 몇 개의(P개) 원소들인지, 스트림 파이프라인에 그 중 하나의 원소를 처리하는 비용은 어느 정도인지(Q). P*Q를 곱하면 그 총 비용이 나온다. Q가 높을수록 병행처리에 적합하다고 볼 수 있겠다.
+ 적은 양의 데이터만 처리해야 한다면, 병행 처리를 꼭 사용할 필요가 없다.
+ 스트림을 구성하는 자료구조가 얼마나 잘 분해되는지 고려하라. 예를 들어, ArrayList는 LinkedList보다 더 잘 나눠진다. 왜냐하면 ArrayList는 전체를 훑어볼 필요없이 나눠지는데 비해, LinkedList는 전체를 훑어야 때문이다. 또한, range 팩토리 메서드로 만들어진 원소 스트림은 더 빨리 분해된다. 마지막으로, 7.3 에서 배우겠지만 개발자 자체의 Spliterator 를 사용해서 decomposition 분해 프로세스의 전체를 컨트롤할 수 있다.
+ 마지막으로, 병렬 스트림으로 인해 병렬 처리를 작동시키는 인프라는 fork/join 프레임워크라는 것을 중요하게 생각해야 한다.
*[Table 7.1] 참조*
### 7.2 form/join 프레임워크
+ 이 프레임워크는 반복적으로 task 를 작게 쪼개고, 그 subtask 들의 결과를 합치는 작업을 한다. ExecutorService 인터페이스의 실행이라고 할 수 있는데, ForkJoinPool 이라는 쓰레드 풀에서 워커 쓰레드로 task 들을 분배한다.
##### 7.2.1 RecursiveTask의 작동
+ 이 풀에 작업을 넣기 위해서는 RecursiveTask<R> 이라는 서브 클래스를 만들어야 한다. R 은 병행 작업으로 생성되는 결과의 타입, 혹은 작업의 결과가 없을 때 RecursiveAction의 타입을 말한다. RecursiveTask 를 정의하기 위해서는 compute 라는 하나의 추상 메서드만 실행하면 된다.
```java
protected abstract R compute();
```
+ 이 메서드는 여러 서브 작업으로 나누는 로직과 더이상 나눠질 수 없을 만큼의 작은 서브 태스크들로 나눌 수 없을 때 하나의 서브태스크 결과를 생산하는 알고리즘을 가지고 있다.
```java
// pseudo code 로 나타내보면
if(작업들이 충분히 작거나 더이상 나눠지지 않는다면){
작업들을 순서대로 수행한다
} else {
1. 작업을 두 개의 서브 태스크로 나눈다.
2. 반복해서 이 메서드를 부르고 각각의 서브 태스트들을 나눈다.
3. 모든 서브태스크들이 끝나길 기다린다.
4. 각 서브태스크들의 결과를 합친다.
}
```
+ 더 작은 태스크 단위로 나눌 수 있느냐에 특정한 기준은 없다. 하지만 어느정도 통용되는 규칙들은 7.2.2 에서 더 살펴볼 것이다.
*표 7.3 참조*
+ fork/join 프레임워크를 사용해서 병행처리로 합을 구해보자
```java
// 코드 7.2
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> {
// fork/join 프레임워크와 사용할 수 있는 task를 만들기 위해 RecursiveTask 로 extend 했다.
private final long[] numbers; // 이 배열의 숫자들은 더해질 것이다.
private final int start; // subtask들이 가지고 있는 subarray의 첫번째/마지막 위치
private final int end;
public static final long THRESHOLD = 10_000; // subtask를 나누는 기준이 되는 threshold 값을 지정.
public ForkJoinSumCalculator(long[] numbers) {
// 주요 업무(main task)의 subtask 들을 생성하기 위해 필요한 private 생성자
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
// RecursiveTask의 추상 메서드를 override 한다.
protected Long compute() {
int length = end - start; // 이 task 를 수행한 뒤 모두 더한 subarray 의 크기
if (length <= THRESHOLD) {
return computeSequentially();
// 만약 사이즈가 threshoold 기준보다 작거나 같으면, 결과를 순차적으로 계산한다.
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length/2);
// 배열을 반으로 나누고, 첫번째 반 의 합을 구하는 subtask를 생성한다.
leftTask.fork();
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);
// 두번째 반의 합을 구하는 subtask를 생성한다.
Long rightResult = rightTask.compute();
// 이 두번째 subtask를 동시에 실행한다. 사실 이 작업으로 split 을 더 허용하게 된다.
Long leftResult = leftTask.join();
// 첫번째 subtask 의 결과를 읽는다. 만약 다 끝나지 않았다면 기다린다.
return leftResult + rightResult;
// subtask 두 개의 결과를 합한다.
}
// threshold보다 적은 사이즈들을 구하는 간단한 순차 알고리즘이다.
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
```
+ 첫번째 n개 자연수들의 합을 병렬로 구하는 메서드가 이제 깔끔해졌다. 아래의 ForkJoinSumCalculator 생성자에 원하는 숫자의 배열을 파라미터로 주면 된다.
```java
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
// 보면, LongStream 을 사용해서 첫번째 n 개의 자연수를 포함한 배열을 생성했다. 그리고나서 ForkJoinTask 를 생성했는데 (RecursiveTask의 슈퍼클래스이기도 하다.), 금방 위에서 본 코드 7.2 에서 본 ForkJoinSumCalculator의 public 생성자에 이 배열을 넘겨주었다.
// 그리고 ForkJoinPool 을 새로 만들어서 그것의 invoke 메서드에 해당 task 를 넘겨주었다. 마지막 메서드에서 반환한 값은 ForkJoinPool 내부에서 ForkJoinSumCalculator에 의해 정의된 task 가 실행되었을 때 얻어진 값이다.
```
+ 실제 프로그래밍을 할 때, 하나 이상의 ForkJoinPool 을 쓰는건 타당하지 않다. 그래서 한 번 instance 를 만들고 static 정적 공간에 놔둔뒤 싱글톤으로 사용해야 한다. 그래야 소프트웨어의 어느 파트에서든 편하게 재사용할 수 있다.
##### ForkJoinSumCalculator 돌리기
+ ForkJoinPool 로 ForkJoinSumCalculator task를 보내면, 이 task는 pool 의 쓰레드가 실행하게 되며, 이 task 의 compute 메서드를 부르게 된다. compute 메서드는 task 가 순차적으로 실행되기에 딱 적당하게 작은 크기인지를 체크한다. 그렇지 않다면 더해야 할 숫자들의 배열을 둘로 나눠서 ForkJoinSumCalculator 두 개를 각각 할당한다.
+ 이와 같은 과정은 반복되며, 더 이상 나눌 수 없거나 한 상태가 될 때까지, 원래의 task 를 둘로 나눈다. 각각의 subtask 들은 순차적으로 진행되며, forking 작업을 하면서 생성된 binary tree 형태의 task 들은 root 부터 차례대로 훑게 된다. 이 task 의 결과는 이 나눠진 subtask 들이 수행한 결과를 합한다.
*그림 7.2 참조*
+ 퍼포먼스는 아래와 같은 코드로 측정할 수 있다+
```java
System.out.println("ForkJoin sum done in: " + measureSumPerf(
ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs" );
```
+ 결과는 아마 "ForkJoin sum done in: 41 msecs" 로 도출될 것이다.
+ 병렬 stream 보다 못 미치는 수준이다. 하지만 ForkJoinSumCalculator task 에서 저 작업을 시도하기 전에 어쩔 수 없이 전~체 숫자 stream 을 long[] 에 넣어서 실행했기 때문이다.
##### fork/join 프레임워크를 활용하는 최고의 방법들
+ 책.....
##### Work Stealing 알고리즘
+ task 들이 ForkJoinPool 에서 전체 쓰레드에 나눠진다는 뜻인데, 각각의 쓰레드들은 double linked queue 로 짜여진 task 들을 가지고 있게 되며, task 하나를 끝내자마자 queue 의 head 로부터 하나를 땡겨와서 또 실행하기 시작한다.
+ 다른 쓰레드가 바쁘게 일하고 있더라도 queue 가 비어있을 가능성이 생기는데, idle 상태로 두는 대신, 쓰레드는 random 하게 다른 쓰레드의 queue 를 선택해서 task 를 "훔쳐오면" 된다.
+ 이 알고리즘은 pool 에서 일하는 thread 들 간의 작업 균형을 맞추기 위해 쓰인다. (그림 7.5 참조)
+ stream 을 나누는 데 사용되는 자동 메커니즘은 Spliterator 라고 하는데, 바로 이어서 설명하겠다.
### Spliterator
+ 일반적인 Iterator 처럼 원소들 전체를 살펴보는 데 사용된다. 하지만 "병렬로" 작업한다는 데 차이가 있다.
+ Java8 에서는 Collections 프레임워크에 포함된 모든 자료구조에 대해 기본적으로 Spliterator 를 지원하고 있다.
+ Collectino 인터페이스는 기본 함수인 spliterator() 를 제공하는데, Spliterator 오브젝트를 리턴한다.
```java
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
```
#### 7.3.1 split 하는 과정
+ 위의 함수는 아래 그림과 같이 반복하여 trySplit을 호출한다.
+ 그러다가 Spliterator 들이 null 을 리턴하는 순간 과정이 끝난다.
+ 이와 같은 과정들은 characteristics 메서드에 의해 선언되는 Spliterator의 특성에 영향을 받는다.
##### 7.3.2 직접 Spliterator 를 실행하기
+ Iterative 버전으로 String의 단어 수를 카운팅하는 간단한 메서드를 작성해보자.
```java
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) { // 모든 캐릭터들을 하나씩 훑어 본다.
if (Character.isWhitespace(c)) {
lastSpace = true;
} else {
if (lastSpace) counter++; // 카운팅 숫자를 늘린다.
lastSpace = false;
}
}
return counter;
}
```
+ 만약 아래와 같은 스트링으로 단어의 개수를 뽑는다면 19개가 나올 것이다.
```javascript
final String SENTENCE =
" Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura" +
" ché la dritta via era smarrita ";
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
```
+ 그럼 이제 위의 함수를 병렬 처리하여 작업해보자.
##### 함수형 스타일로 word counting 을 다시 작성해보자
+ 일단 String 을 stream 으로 변환하는 작업이 필요하다. 안타깝게도 int, long, double 에만 primitive stream 원시 스트림이 있기 때문에 Stream<Chatacter>를 써야할 것이다.
```java
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
```
+ 스트림을 나누면서, 단어의 수를 세는 int 변수와, 마지막 단어인지 아닌지를 체크하는 boolean 변수, 두 가지를 체크해야할 것이다. Java는 튜플 (wrapper 오브젝트 필요없이 서로 다른 원소들의 정렬된 리스트를 표현할 때 사용하는 자료구조)이 없기 때문에, WordCounter 라는 새로운 클래스를 만들어보자. 이 클래스가 위의 두 가지 변수들을 가지고 있을 것이다.
```java
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) { // String에서 단어를 하나씩 훑어보는 알고리즘
if (Character.isWhitespace(c)) {
return lastSpace ?
this :
new WordCounter(counter, true);
} else {
return lastSpace ?
new WordCounter(counter+1, false) : // 빈칸을 발견하거나 더 훑어볼 단어가 없으면 +1 한다.
this;
}
}
public WordCounter combine(WordCounter wordCounter) { // 두 개의 wordcounter를 합친다. counter를 더함.
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace); // counter의 합들을 사용함으로써 마지막 빈칸에 대해서는 신경을 안 써도 된다.
}
public int getCounter() {
return counter;
}
}
```
+ 여기서 accumulate 함수는 WordCounter의 상태가 어떻게 변하는지를 보여준다. 더 정확하게 말하면, 불변 클래스이기 때문에
새로운 WordCounter를 생성??
+ 불변 클래스로 state를 증가시킴으로써 다음 단계에서 병렬화될 수 있는 것이다.
+ accumulate 메서드는 새로운 Character 스트림을 훑어볼 때마다 불린다. 특히, 빈칸이 없거나 마지막 단어의 다음이 빈칸일 때 counter가 증가한다.
+ 두 번째 메서드인 combine은 Character 스트림의 두 개의 각각 다른 서브 파트에서 작업한 WordCounter의 부분 결과를 합할 때 불린다. 그래서 내부 counter의 값을 더하는 것이다.
+ 지금까지 생각해봤던 로직들을 함수로 표현하면 아래와 같다.
```java
private int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
// 그리고 위에서 이용한 String 을 다시 넣어서 작업해보자
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream) + " words");
```
+ 그러면 결과는 또
```java
Found 19 words
```
라고 잘 나올것이다.
##### WordCounter를 병렬화시키자.
```java
System.out.println("Found " + countWords(stream.parallel()) + " words");
```
+ 이렇게 코드를 수정하면 이상하게 결과는 Found 25 words 라고 나온다.
+ 원래 String 값이 나눠지는 부분이 랜덤하기 때문에, 때때로 단어는 두 개로 나눠지고 두 번 카운팅되기도 한다. 이렇게 String을 병렬처리로 작업하는 것이 항상 옳은 결과만 보여주지는 않는 다는 것을 얘기하기도 한다. 그래서 필요한게 Spliterator 다.
```java
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null;
}
for (int splitPos = currentSize / 2 + currentChar;
splitPos < string.length(); splitPos++) {
if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar,
splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NON-NULL + IMMUTABLE;
}
}
```