Ch 07. 병렬 데이터 처리와 성능

새로운 스트림 인터페이스를 이용해서 데이터 컬렉션을 선언형으로 제어하는 방법을 살펴보았다. 또한 외부 반복을 내부 반복으로 바꾸면 네이티브 자바 라이브러리가 스트림 요소의 처리를 제어할 수 있음을 확인했다.

자바 7이 등장 하기 전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다.

  1. 우선 데이터를 서브파트로 분할해야 한다.

  2. 분할된 서브파트를 각각의 스레드로 할당한다.

  3. 스레드로 할당한 다음에는 의도치 않은 레이스 컨디션이 발생하지 않도록 적절한 동기화를 추가해야 한다.

  4. 마지막으로 부분결과를 합쳐야 한다.

자바7은 더 쉽게 병렬화를 수행하면서 에러를 최소화할 수 있도록 포크/조인 프레임워크라는 기능을 제공한다.

이번 장에서는 스트림으로 데이터 컬렉션 관련 동작을 얼마나 쉽게 병렬로 실행할 수 있는지 설명한다.

7.1 병렬 스트림

컬렉션에 parallelStream을 호출하면 병렬 스트립이 생성된다. 병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있다.

숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현한다고 가정하자

public static long iterativeSum(long n) {
    long result = 0;
    for(long i = 1L; i<n; i++) {
        result += i;
    }
    ret urn result;
}

특히 n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다. 그렇다면 무엇을 고민해야 할까?

  • 무엇부터 수정해야 할까?

  • 결과 변수는 어떻게 동기화해야 할까?

  • 몇 개의 스레드를 사용해야 할까?

  • 숫자는 어떻게 생성할까?

  • 생성된 숫자는 누가 더할까?

병렬 스트림을 이용하면 걱정, 근심 없이 모든 문제를 쉽게 해결할 수 있다.

7.1.1 순차 스트림을 병렬 스트림으로 변환하기

순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리된다.

public static long parallelSum(long n) {
    return Stream.iterate(1L, i => i + 1)
                                .limit(n)
                                .**parallel()** // 스트림을 병렬 스트림으로 변환
                                .reduce(0L, Long::sum);
}

내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불린 플래그가 설정된다. 반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수 있다. 이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있다.

Stream.**parallel()**
      .filter(...)
      .**sequential()** //전체 파이프라인이 어떻게 처리될지를 결정
      .parallel()
			.**reduce()**;

parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 이 예제에서 파이프라인의 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.

7.1.2 스트림 성능 측정

병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측했다.

성능을 최적화할 때는 측정이 중요하다.

(교제에서는 자바 마이크로벤치마크 하니스(JMH)라는 라이브러리를 이용해 작은 벤치 마크를 구현한다.)

순차 덧셈 함수를 이용해서 천만 개 숫자의 합계에 걸리는 시간을 계산해보자

private static final long N = 10_000_000L;

// 일반 스트림으로 수행한 결과(sequentialSum)
public logn sequentialSum() {
	return Stream.iterate(1L, i -> i + 1).limit(N)
							 .reduce(0L, Long::sum);
}

// 결과값
Sequential sum done in: 121.843 msecs

--- 

// for loop로 실행한 결과(iterativeSum)
public logn sequentialSum() {
	long result = 0;
	for (long i = 1L; i <= N; i++) {
		result += i;
	}
	return result;	
}

// 결과값
Iterative sum done in: 3.278 msecs

---

// 병렬 스트림으로 실행환 결과(parallelSum)
public static long parallelSum(long n) {
    return Stream.iterate(1L, i => i + 1)
                                .limit(n)
                                .parallel()
                                .reduce(0L, Long::sum);
}

// 결과값
Iterative sum done in: 604.059 msecs

병렬 버전이 순차 버전보다 느리다는 것을 확인할 수 있다. 무엇이 문제일까?

  • iterate가 박싱된 객체를 생성하므로 숫자를 더하려면 다시 언박싱을 해야한다.

  • iterate는 병렬로 수행할 수 있는 독립 단위(청크)로 분할하기가 어렵다.

우리에겐 병렬로 수행될 수 있는 스트림 모델이 필요하기 때문이다. 특히 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 iterate 연산을 청크로 분할하기가 어렵다.

리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없다.

스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.

더 특화된 메서드 사용

그렇다면 멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야 할까?

  • LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라진다.

  • LongStream.rangeClosed는 쉽게 철크로 분할할 수 있는 숫자 범위를 생산한다. 예를 들어 1-20 범위의 숫자를 각각 1-5, 6-10,11-15,16-20 범위의 숫자로 분할할 수 있다.

언박싱과 관련한 오버헤드가 얼마나 될까?

다음과 같은 순차 스트림을 처리하는 시간을 측정하자

public static long rangedSum(long n) {
    return LongStream.rangeClosed(1, n)
        .reduce(0L, Long::sum);
}

// 결과값
Ranged sum done in: 5.315 msecs

기존의 iterate 팩토리 메서드로 생성한 순차 버전에 비해 처리속도가 더 빠르다. 특화되지 않은 스트림을 처리할 때는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문이다.

상황에 따라서는 어떤 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다.

하지만 다음과 같은 새로운 버전에 병렬 스트림을 적용하면 무슨 일이 일어날까?

public static long parallelRangedSum(long n) {
    return LongStream.rangeClosed(1, n)
        .parallel()
        .reduce(0L, Long::sum);
}

// 결과값
Parallel range sum done in: 2.677 msecs

드디오 순차 실행보다 빠른 성능을 갖는 병렬 리듀싱을 만들었다.

하지만 병렬화가 완전 공짜는 아니라는 사실을 기억하자. 병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다.

따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.

7.1.3 병렬 스트림의 올바른 사용법

병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public class Accumulator {
    public long total = 0; //<- 데이터 레이스 문제 발생
    public void add(long value) { total += value; }
}

코드에 무슨 문제라도 있는가?

위 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하면 참사가 일어난다. 특히 total을 접근할 때마다 (다수의 스레드에서 동시에 데이터에 접근하는) 데이터 레이스 문제가 일어난다. 동기화로 문제를 해결하다보면 결국 병렬화라는 특성이 없어져 버릴 것이다.

지금까지 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야 한다는 사실을 확인했다.

우선은 병렬 스트림이 올바로 동작하려면 공유된 가변 상태를 피해야 한다는 사실만 기억하자

병렬 스트림 효과적으로 사용하기

어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있다.

  • 확신이 서지 않는다면 직접 측정하라. 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 적절한 벤치마크로 직접 성능을 측정하는 것이 바람직하다

  • 박싱을 주의하라. 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소다. 되도록이면 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 사용하는 것이 좋다

  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. 특히 limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.

  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는 데 드는 비용을 Q하 하면 스트림 파이프라인 처리 비용은 N*Q로 예상할 수 있다. Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.

  • 소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.

  • 스트림을 구성하는 자료구조가 적절한지 확인하라. LinkedList를 분할하려면 모든 요소를 탐색해야 하지만 Arraylist는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문이다.

  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 된다.

  • 최종 연산의 병합 과정(예를 들면 Collector의 combiner 메서드) 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분결과를 합치는 과정에서 상쇄될 수 있다.

  • 병렬 스트림이 수행되는 내부 인프라구조도 살펴봐야 한다.

7.2 포크/조인 프레임워크

포크/조인 프레임워크는 병렬화 할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.

7.2.1 RecursiveTask 활용

스레드 풀을 이용하려면 RecursiveTask의 서브클래스를 만들어야 한다. 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때는 RecursiveAction 형식이다. RecursiveTask를 정의하려면 추상 메서드 compute를 구현해야 한다.

protected abstract R compute();

compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다.

if(태스크가 충분히 작거나 더 이상 분할할 수 없으면){
    순차적으로 태스크 계산
} else {
    태스크를 두 서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
    모든 서브태스크의 연산이 완료될 때까지 기다림
    각 서브태스크의 결과를 합침
}

이 알고리즘은 분할 후 정복 알고리즘의 병렬화 버전이다.

[예제 7-2]의 ForkJoinSumCalculator 코드에서 보여주는 것처럼 먼저 RecursiveTask를 구현해야 한다.

일반적으로 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않는다. 즉, 소프트웨어의 필요한 곳에서 언제든 가져다 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적필드에 싱클턴으로 저장한다.

포크/조인 프레임워크를 제대로 사용하는 방법

포크/조인 프레임워크는 쉽게 사용할 수 있는 편이지만 항상 주의를 기울여야 한다.

  • Join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다. 따라서 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다.

  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말아야 한다. 대신 computed나 fork 메서드를 직접 호출할 수 있따. 순차 코드에서 병렬 계산을 시작할 때만 invoke를 사용한다.

  • 서브태스크에 fork 메서드를 호출해서 ForkJoinPool의 일정을 조절할 수 있다.

    한쪽 작업에는 fork를 호출하는 것보다는 compute를 호출하는 것이 효율적이다. 그러면 두 서브테스크 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.

  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵다.

    포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 스택 트레이스가 도움이 되지 않는다.

  • 병렬 스트림에서 살펴본 것처럼 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠를 거라는 생각은 버려야 한다.

    각 서브태스크의 실행시간은 새로운 태스크를 포킹하는 데 드는 시간보다 길어야 한다.

    JIT 컴파일러에 의해 최적화되려면 몇 차례의 ‘준비 과정' 또는 실행 과정을 거쳐야 한다. 따라서 성능을 측정할 때는 여러 번 프로그램을 실행한 결과를 측정해야 한다.

포크/조인 분할 전략에서는 주어진 서브태스크를 더 분할할 것인지 결정할 기준을 정해야 한다. 다음 절에서는 분할 기준과 몇 가지 힌트를 제공한다.

7.2.3 작업 훔치기

기준값을 바꿔가면서 실험해보는 방법 외에는 좋은 기준을 찾을 뾰족한 방법이 없다.

대부분의 기기에는 코어가 네 개뿐이므로 천 개 이상의 서브태스크는 자원만 낭비하는 것 같아 보일 수 있다. 실제로는 코어 개수와 관계없이 적절한 크기로 분할도니 많은 태스크를 포킹하는 것이 바람직하다.

각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다. 포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 이 문제를 해결한다. 작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 할일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 태스크가 작업을 끝낼 때 까지, 즉 모든 큐가 빌 때까지 이 과정을 반복한다. 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.

다음 절에서는 자동으로 스크림을 분할하는 기법인 Spliterator를 설명한다.

7.3 Spliterator

자바 8에서는 Spliterator라는 새로운 인터페이스를 제공한다. Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator는 병렬 작업에 특화되어 있다

Last updated