Apache Spark

[Spark 완벽 가이드] Adaptive Query Execution이란?

binary-kim 2025. 1. 2. 17:02

서론

이번 포스팅에서는 Spark 3.0 버전부터 등장한 AQE(Adaptive Query Execution)에 대해서 알아가보는 시간을 가져보겠습니다. 이전 Spark 글에서 배경 및 기초 개념 / Catalyst Optimizer에 대한 내용을 공유드렸었는데, 혹여나 아직 보지 않으신 분들은 먼저 보고 오시는 것을 추천드립니다. 그 이유는 AQE의 배경이나 작동 방식 등을 이해하기 위해서는 어느 정도의 개념을 인지하고 있어야 하기 때문입니다. 아마도 보고오시면 해당 포스팅을 이해하는 데 도움이 될 것입니다.

 

https://binary-kim.tistory.com/22

 

[Spark 완벽 가이드] Catalyst Optimizer란?

서론저번 포스팅에서는 Spark가 나오게 된 배경 및 기본 개념을 위주로 정리해보는 시간을 가졌습니다. 이번 시간에는 Spark에서 Job을 제출했을 때 어떻게 최적화를 진행하는지를 시작으로 하여,

binary-kim.tistory.com

 

 

AQE (Adaptive Query Execution)

탄생 배경

Spark 2.xx 버전에서는 Rule-Based로 Logical Plan을 최적화하고, Cost-Based를 기반으로 Physical Plan을 최적화하여 사용자가 입력한 Spark Job의 효율적인 실행을 도왔습니다. 한 번 최적의 Physical Plan이 결정되면, 그 순서대로 쿼리가 수행되며 결과를 사용자에게 전달합니다.

 

그렇다면, 이런 경우도 한 번 생각해보겠습니다.

  • 예상했던 데이터의 분포와 실제 실행할 때의 데이터 분포가 다른 경우
  • UDF(User Defined Function)에서 특정 조건을 활용하여 Spark가 정확한 데이터 분포에 대한 정확한 예측을 할 수 없는 경우 

위 두 가지 Case를 겪게 되면 어떤 결과를 얻을까요? 뭔가 최적의 결과를 얻을 것 같지 않다는 생각이 드시나요? 예상했던 데이터의 분포를 기반으로 최적의 Physical Plan을 생성했는데, 예상했던 분포와 다르면 연산에 있어 지연 현상으로 이어지게 될 것입니다. Spark 2 버전에서 이러한 문제가 발생하는 것을 확인했고, 개선할 필요가 있다고 느꼈겟죠? Cost-Based Model 처럼 컴파일 시간이 아닌, 런타임에 사용자가 입력한 쿼리를 보다 효율적으로 변경시키는 방법론이 Spark 3버전부터 적용된 "AQE (Adaptive Query Execution)"입니다.

 

배경과는 상관 없는 내용이긴 하지만, 

 

저는 Optimizer 개념을 처음 학습할 때, "그럼 실제 분포를 잘 예측할 수 있는 Optimizer를 만들면 되지 않을까?" 하는 생각이 들었습니다. 매 상황에 맞는 Optimizer를 개발하여 활용하는 것이 성능에 좋을 수도 있지만, 매 작업에서 Optimizer를 만드는데 소요되는 시간과 인력이 투입되어야 합니다. 조금만 신중하게 생각해보면, Spark라는 Tool을 활용하는 개발자 입장에서 이런 부분들이 비효율적인 작업이라는 것을 깨닫는 데 오래 걸리지 않았던 것 같아요.

 

 

정의

https://blog.knoldus.com/adaptive-query-execution-aqe-in-spark-3-0/

 

AQE는 SparkSQL에 도입된 쿼리 최적화 엔진으로, 데이터의 런타임 통계를 기반으로 실제 실행 계획을 동적(Dynamic)으로 조정하여 쿼리 성능을 향상시키는 기능입니다. 즉, 데이터의 특성에 따라 런타임에 Logical Plan을 변경하는 작업을 수행하고, 이에 따라 Physical Plan 또한 변경시켜 최적화된 쿼리를 수행할 수 있도록 도와주는 기능입니다.

 

기존에 생성했던 Physical Plan대로 작업을 수행하면서 런타임에 데이터의 분포를 바탕으로 최적화가 필요하다고 느꼈을 때, AQE가 실행되어 다시 Logical Plan 단계로 돌아가 실제 데이터 분포에 대한 최적의 Physical Plan을 추출하는 작업이 진행됩니다.

 

공식 문서에 따르면, AQE에서 지원하는 기능은 3가지가 존재합니다. 

  • Coalesce Post-Shuffle Partition
  • Sort-Merge Join To Broadcast Join
  • Skew Join Optimization

위 세 가지에 대해서 자세하게 알아가보겠습니다.

 

 

Coalesce Post-Shuffle Partition

 

첫 번째로, 셔플 작업 이후에 생성되는 Partition의 수를 동적으로 조정하는 방식입니다. 동적으로 조정한다는 방식이 텍스트를 읽었을 때 잘 와닿지 않을 수 있기에, MapReduce 작업을 수행한다고 가정하며 해당 Case의 예시를 들어보겠습니다.

 

Reduce 작업을 수행할 때 Job의 수가 너무 작으면, 하나의 Partition의 크기가 매우 커져 부하가 걸리게 되고 실제 Executor의 메모리가 Out Of Memory에 빠질 수 있게 됩니다. Reduce 할 때 Job의 수가 너무 많은 반대의 경우는 어떨까요? Executor의 수보다 더 많은 Job이 존재한다고 할 때, 한 번에 Executor가 모든 Job을 수행하지 못하게 됩니다. 이 때 오버헤드 또한 발생하게 되어 성능 하락의 원인이 되는 것이죠.

 

이러한 Partition의 수에 따른 장/단점들을 AQE에서는 동적으로 적절하게 조절함으로써, 하나의 Executor간 수행하는 작업의 균형을 이룰 수 있게 되고 I/O 작업 수가 감소하는 효과를 얻을 수 있습니다.

 

 

Sort-Merge Join To Broadcast Join

https://jeevan-madhur22.medium.com/spark-3-0-aqe-dynamically-choosing-join-strategy-explained-part-2-66d0cfb28c42

 

Spark에서는 기본적으로 사용되는 조인 전략은 "Sort-Merge Join" 입니다. Sort-Merge Join은 Join할 두 Table을 Partition이 될 Key를 기준으로 각각 Sort 한 후, Merge하는 방식으로 대용량 데이터 처리에 적합합니다. 그렇지만, 모든 데이터가 셔플될 때 네트워크 이동이 필연적이며, 이에 비용이 많이 소요되는 작업입니다. 이러한 Join 전략을 사용하지 않고, 특정 Case에서 Broadcast Join을 사용하는 이유는 과연 무엇일까요?

 

Broadcast Join은 한쪽 데이터셋의 크기가 매우 작을 때 효율적으로 사용 가능한 조인 전략으로, 작은 데이터셋을 큰 데이터셋이 분포되어있는 모든 클러스터에 broadcast하여 셔플 작업을 최소화하는 방식입니다. Broadcast Join이 Sort-Merge Join보다 효율적인 이유는 한 테이블만 이동을 수행하면 되기 때문에, 네트워크 이동에서 소요되는 비용을 줄일 수 있습니다.

 

Spark에서는 작은 데이터셋의 크기가 평균적으로 10MB ~ 100MB이하일 때 Sort-Merge Join을 Broadcast Join으로 변경하는 기준으로 삼고 있습니다. 내가 만약 Broadcast Join이 자동으로 일어나게 하는 기준을 정하고 임의로 정하고 싶다면, Spark Session을 생성할 때 "spark.sql.autoBroadcastJoinThreshold" 옵션에 적절한 값을 주면 적용 가능합니다.

 

 

Skew Join Optimization

이전 포스팅에서도 반복적으로 언급했다싶이, Partition간 Data 분포가 불균형(Skew) 할 수록 셔플 작업이 일어나는 연산(Join)시 쿼리 성능을 저하시킬 수 있습니다. AQE는 런타임에 Data Skew를 감지해서 특정 Partition의 데이터를 여러 작은 Partition으로 분할하거나, 혹은 적절한 Join 전략을 적용하여 처리하는 방식을 의미합니다. 위에서 설명한 두 가지를 적절하게 섞어서 활용하다는 의미로 보입니다. 

 

셔플 과정에서 발생하는 병목 현상을 줄이고, 이로써 전체 작업 성능을 향상시키는 것을 목표로 하는 것이 바로 AQE의 큰 특징이라고 알아두시면 좋을 것 같습니다.

 

 

활용

Spark 3.0버전에서 AQE를 활용하려면 "spark.sql.adaptive.enabled=true"로 Spark Session 생성 시에 설정하면 되며, Spark 3.2 버전 이후로는 AQE가 Default Option으로 설정되어 자동으로 적용된다고 합니다.

 

저는 프로젝트를 수행할 때 클러스터 작업 간 병목 현상이 발생하는지에 대한 여부를 "Spark Web UI"를 통해 인지했습니다. 당시, 100만 건 좀 안되는 데이터에 적절한 연산을 더하여 사용자들의 관심도를 측정하는 프로젝트를 수행했었는데, Data Skew로 인해 AWS EMR에서 평균 30분 정도의 시간이 소요됬습니다. 간혹 45분의 시간도 걸렸던 적이 있어 1시간을 주기로 돌아가는 파이프라인을 구축한 상황에서 난감했던 상황이 있었습니다.

 

당시 해당 문제를 해결해보고자 여러가지 방법을 찾아보던 중, "어 나도 AQE 한 번 써볼까? 그러면 병목 현상을 좀 줄일 수 있겠네?" 하는 생각에 바로 Google에 검색을 호다닥 진행했는데....

 

"나도 모르게 이미 AQE를 쓰고있었군아... 였습니다 ㅎㅎ". 아마 프로젝트를 수행하던 시점에서 Spark 3.2버전보다 업그레이드 된 버전을 사용했기에, 저절로 적용되었던 것이었습니다. 그래서 "머쓱타드" 한 잔 적신 후에 다른 Optimizing 기법들 (Salting / Repartitioning ...)을 적절하게 활용하여 해결했던 경험이 있습니다. 

 

 

개인적인 생각

"AQE를 무조건 사용하는 것이 좋은 판단일까요?" 라는 질문에 어떻게 생각하시나요? 

 

런타임에 실제 데이터 분포를 활용하여 Physical Plan을 다시 추출한다는 것은 "그 만큼의 추가 시간 및 자원이 소요된다"라는 의미와도 연결됩니다. Spark Tool을 연구/개발하시는 분들이 생각하시기에 추가적인 Cost가 발생한 것보다 그 이상의 훨씬 큰 장점을 가지고 있기에 AQE를 만들었다고 생각합니다. 또한, 최신 Spark 버전에서는 Default로 제공하는데는 이유가 있지 않을까 생각합니다.

(킹리적 갓심)

 

그렇기 때문에 저는 .... 그냥 사용하겠습니다!!

 

 

마무리하며

이번 글까지 작성하며 Spark에 대한 간단한 개념은 정리되었다고 생각합니다. 이론과 실습은 또 다른 영역이지만, 개념을 잘 알면 그만큼 활용할 수 있는 능력치가 증가할 수 있겠죠? ㅎㅎ

 

다음 기술 포스팅은 어떤 것을 진행해볼까요? 아직 정해둔 것은 없지만, Hadoop 아니면 Trino 쪽을 정리해보지 않을까 생각합니다.

 

감사합니다.