Apache Spark

[Spark 완벽 가이드] Catalyst Optimizer란?

binary-kim 2024. 11. 18. 15:06

서론

저번 포스팅에서는 Spark가 나오게 된 배경 및 기본 개념을 위주로 정리해보는 시간을 가졌습니다. 이번 시간에는 Spark에서 Job을 제출했을 때 어떻게 최적화를 진행하는지를 시작으로 하여, 여러가지 성능 튜닝 기법들에 대해서 정리해보는 시간을 가져볼까 합니다. Spark에서 사용할 수 있는 여러가지 Method (map, filter, ...)들을 다루는 포스팅도 생각해봤지만, 해당 영역은 개념적인 부분보다 실습에 가까운 부분이기 때문에 생략을 해보려고 합니다.

 

Spark Optimization이 중요한 이유는 뭘까요? 그리고 왜 필요할까요? 해당 질문에 대해 생각해보면 ...

 

최적화라는 것은 주어진 자원 내에서 성능을 최대한 끌어올려 원하는 작업을 더 빠르게, 더 효율적으로 수행하도록 만드는 과정입니다. Spark는 분산 처리 시스템으로 대용량 데이터를 병렬 처리할 수 있는 장점이 있지만, Hadoop의 MapReduce에 비해 기본적으로 많은 자원을 소비합니다. 따라서 자원을 효율적으로 사용하지 않으면 예상보다 긴 시간과 높은 비용이 소모될 수 있습니다.

 

여기서 Spark Optimization을 적용하면 처리 속도 및 비용 절감이라는 이점을 가지고 오게 되는 것이죠.

 

대용량 데이터를 다루는 데이터 파이프라인에서 비효율적인 처리는 시스템 부하를 증가시키고, 응답 시간을 늦추며, 비용을 크게 증가시킵니다. 빅데이터를 기반으로 서비스를 제공하는 기업 측면에서는 이러한 최적화 결과에 따라 서비스의 품질 및 경쟁력을 확보할 수 있게 되는 것입니다.

 

본 게시물에서는 Catalyst Optimizer에 대해서 게시글을 작성해보고자 합니다.

 

Spark, Spark, Spark Let's go!!

 

 

 

Catalyst Optimizer

Catalyst Optimizer는 사용자가 입력한 쿼리와 DataFrame 연산을 최적화하여 더 나은 성능을 제공하기 위해 사용되는 최적화 기법 중 하나입니다. Optimizer 단계는 크게 4단계로 구성되어 있으며, 각 단계에서는 분석, 최적화, 실행 및 코드 생성을 순차적으로 수행합니다.  

 

전반적인 흐름은 위 그림과 같습니다. 각 단계를 순차적으로 알아보는 시간을 가져보겠습니다.

 

분석 (Analysis)

사용자가 DataFrame이나 SQL Query를 입력하면, Spark 엔진은 이를 Unresolved Logical Plan으로 변환하여 처리합니다. Unresolved라는 의미는 데이터 소스나 테이블 정보가 명확하게 정의되지 않은 상태이며, 추가적인 검증 및 해석 등의 과정이 필요하다는 것을 의미하죠.

 

Unresolved Logical Plan을 활용하여 Resolved Logical Plan을 만드는 과정에서 Spark 엔진 내부의 Catalog를 활용합니다. Catalog는 메타데이터를 저장하는 저장소 역할을 수행하며, 테이블, 컬럼, 로우, 데이터 타입 등의 정보를 관리합니다. 결과적으로, 사용자가 입력한 쿼리에 대해 여러가지의 Resolved Logical Plan이 생성됩니다. 

 

여기서 잠깐 생각해봅시다. Optimizing의 최종 목표는 동일한 작업에 대해 소모되는 Cost를 최대한으로 줄이는 것이 목표일 것입니다. 이 관점에서 생각해보면, 여러 가지 Resolved Logical Plan을 모두 실행시키는 것은 Cost적인 측면에서 상당히 비효율적이겠죠? 따라서, 가장 Optimized된 Logical Plan을 탐색하고 추출하는 과정을 추가로 수행하게 됩니다. 

 

Logical Plan Optimization

본 단계에서는 표준화된 최적화 방법에 따라 Rule-Based로 쿼리 최적화를 수행하며, 결과로 Optimized된 Logical Plan을 추출하는 작업을 수행합니다. 총 4가지의 기준을 바탕으로 Optimized된 Plan을 추출하는 작업을 수행하는데, 지금부터 알아가보겠습니다.

 

Constant Folding은 컴파일 시점에 상수 연산을 미리 계산하여 표현식을 단순화하는 방식입니다. 예를 들어, SELECT 2 + 3과 같은 쿼리는 실행 전에 SELECT 5로 변환되어 불필요한 연산을 방지합니다. 이를 통해 실행 시점의 불필요한 계산을 줄이고 성능을 높이는 데 기여할 수 있는 것이죠.

 

Predicate Pushdown은 필터링 조건을 데이터 소스와 최대한 가깝게 적용하여 데이터를 필터링하는 최적화 방법입니다. 이해를 돕기 위해 간단한 예를 들어보겠습니다.

 

만약 DataFrame을 불러온 후 특정 열의 값이 오늘 날짜와 일치하는 행만 추출한다고 가정해보겠습니다. DataFrame의 크기가 클수록 데이터 로드 및 필터링 과정에서 많은 부하가 발생할 수 있습니다. 그러나 필터링 조건을 데이터 소스에 가깝게 이동시켜 적용하면, 처리해야 할 데이터 양이 줄어들고, 네트워크 및 I/O 비용이 절감됩니다. 이러한 최적화는 Optimized Logical Plan을 생성하는 데 큰 역할을 합니다.

 

Join Reordering은 조인 순서를 최적화하여 중간 결과의 크기를 최소화하고 쿼리 처리 속도를 개선하는 기법입니다. Spark는 다양한 조인 전략을 고려하며, 데이터 크기와 분포에 따라 효율적인 순서를 선택하여 불필요한 리소스 사용을 줄일 수 있습니다.

 

Projection Pruning은 쿼리에서 필요하지 않은 열을 제거하여 처리량을 줄이는 최적화 방법입니다. 이로 인해 Spark는 메모리 사용량과 I/O 비용을 최소화하고, 필요한 데이터에만 집중할 수 있도록 최적화합니다. 예를 들어, 특정 열만 조회하는 쿼리에서는 불필요한 데이터를 무시하여 리소스를 절약합니다.

 

Resolved Logical Plan에 위 4가지 과정을 적용하여 하나의 Optimized된 Logical Plan을 추출하는 과정을 Logical Plan Optimization이라고 부릅니다.

 

Physical Plan Optimization

하나의 Optimized된 Logical Plan이 생성된 후에는 여러 개의 Physical Plan을 생성하는 과정과, 그 중 Optimized된 Physical Plan을 추출하는 과정을 수행합니다. Optimized한 Logical Plan을 추출하는 과정에서 Predicate Pushdown, ... 등이 활용되었다면, Optimized Physical Plan을 추출하는 과정에서는 Cost Model이 활용됩니다.

 

Cost Model은 각 Physical Plan의 "CPU 사용, 메모리 사용, I/O 크기" 등 연산을 수행할 때 소모되는 비용을 측정합니다.

 

Join 전략을 선택한다고 가정해보죠. Spark에서 제공하는 Join 전략은 여러가지가 존재합니다. Sort-Merge Join / Broadcast Join / Hash Join 등 여러가지 Join 전략 중 어떤 전략이 가장 Cost를 적게 사용할 수 있을지를 본 과정에서 확인하게 됩니다.

 

Data Skew (데이터 분포 편향)도 중요한 요소로 고려됩니다. 데이터가 일부 노드에 집중되거나 특정 파티션에만 존재하면 연산 부하가 집중되어 성능이 저하될 수 있습니다. Spark에서는 이를 방지하기 위해 데이터 분포를 최적화하거나 조인 전략을 변경하는 등 다양한 방법을 사용합니다.

 

이전 게시물에서 언급했던 것처럼, Spark에서는 Shuffle을 줄이는 것이 연산 비용을 줄이는 핵심적인 방법 중 하나입니다. 네트워크 간 데이터 이동이 최소화될수록 연산 비용과 I/O 비용이 절감되기 때문에, Spark는 이러한 요소를 최적화하여 최적의 Physical Plan을 생성합니다.

 

결과적으로, Physical Plan Optimization 단계는 다양한 Physical Plan 중 가장 효율적인 계획을 선택함으로써 데이터 처리 성능을 극대화하고 자원을 효율적으로 사용할 수 있도록 합니다. 이를 통해 Spark는 대규모 데이터를 빠르고 효과적으로 처리할 수 있게 됩니다.

 

Code Generation

Optimized된 Physical Plan은 Java Bytecode로 변환됩니다. Spark는 JVM(자바 가상 머신) 위에서 실행되는 도구이기 때문에, 최적화된 실행 계획이 Java Bytecode로 컴파일되어 Spark의 런타임에서 효율적으로 실행됩니다. 이를 통해 Spark는 데이터를 처리하는 동안 높은 성능을 유지하고, 불필요한 연산을 줄이며 자원 활용을 최적화할 수 있게 됩니다.

 

Tungsten이라는 엔진과 함께 Physical Plan을 단일 Java 함수로 변환하는 과정을 거치며, 이를 Whole-Stage Code Generation이라고 합니다. 하나의 Java 함수로 변환했을 때 장점이 뭘까 궁금해서 검색해보니 ... 여러가지 장점이 있다고 합니다!! (깊게 찾아보지는 않아 넘어가도록 하겠습니다)

 

 

마무리하며

Catalyst Optimizer는 현재 배포되어있는 Spark 3.xx 버전에서도 Optimization을 위해 활용되고 있습니다. 덕분에 분산 환경에서 데이터를 효율적으로 처리할 수 있게 됩니다. 근데, 여기서 한 가지 의문점이 들 수 있죠.

 

Catalyst Optimizer는 항상 Optimized된 Plan을 수행할까요? 

 

사실, Catalyst Optimizer가 모든 Case에서 최적의 실행 Plan을 보장하는 것은 아닙니다. 어떤 Optimizer든 Rule-Based + Cost-Based를 기반으로 최적화를 수행하지만, 실제 분산 환경에서 그렇지 않는 Case를 마주할 수도 있습니다. 이러한 문제를 해결하기 위해서 Spark 3.xx 버전부터는 AQE (Adaptive Query Execution)와 같은 기능이 추가되어, 더욱 효율적으로 Spark Job을 수행할 수 있게 되었습니다.

 

따라서, 다음 게시글은 AQE에 관련된 내용을 기술해보고자 합니다!!

 

긴 글 읽어주셔서 감사합니다. 다음 게시글에서 뵙겠습니다 :)

 

 

References