서론
현재 저는 Hadoop / Spark / Airflow / Kafka를 공부하고 있습니다!! 이번 포스팅부터는 현재 공부 중인 "스파크 완벽 가이드"라는 도서를 읽고 Spark라는 Tool이 무엇인지, 어떻게 작동되는지, 어떤 Data Type이 있고 어떤 특징들을 가지고 있는지 3~4차례에 나눠서 기록해보고자 합니다.
대략적인 Keyword는 아래와 같습니다.
"Spark 정의" -> "Spark 최적화" -> "Spark Streaming" (-> "Spark ML")
우선, Spark를 공부함에 있어 해당 책을 고른 이유는 딥러닝을 공부하면서 "ORELLY" 출판사 책을 굉장히 많이 읽었었는데 내용이 너무 좋았었던 기억이 있어서 이번에도 믿고 구매를 해봤습니다. 그리고, 소프티어 교육을 들으면서 같이 공부하시던 분 중 한 분이 저 책으로 공부하시던 기억이 나서 저도 구매를 해버렸습니다.
구매 후 느낀 점은.... 책이 굉장히 두껍네요 (다 읽을 수 있을려나)
시작이 반이라는 말이 있듯이, Spark를 정복하는 그날까지! 화이팅 해보겠습니다!!
Spark가 나오게 된 배경
빅데이터하면 가장 먼저 떠오르는 두 가지 도구를 선택하라고 하면, 저는 Hadoop과 Spark라고 생각합니다. 분산 환경에서 데이터를 저장하고 처리하는 도구가 나오게 된 가장 핵심 요인은 "데이터를 저장하는 비용이 저렴해졌다"라는 부분입니다. 과거에는 데이터를 저장하는 비용이 굉장히 비쌌기 때문에, 데이터 저장 및 처리 방식에 좀 더 신중할 수 밖에 없었던 것이죠.
하지만, Cloud 기술의 발전과 더불어 Storage 저장 비용이 급격하게 하락하면서 대량의 데이터를 저장하는 것이 가능해졌고, 궁극적으로 데이터를 처리하는 비용보다 저렴해졌습니다. 다양한 기업에서는 이러한 이점으로 인하여 더 많은 데이터를 수집하고 분석하려는 수요가 많아지게 되었고, 이에 맞게 데이터를 효율적으로 빠르게 처리할 수 있는 도구가 필요해진 상황에 직면했습니다.
데이터 저장 비용이 저렴하다 보니 아마도 "xxx님, 일단 그냥 다 저장하고 처리는 나중에 생각해요~"로 이어졌을 것입니다. 근데, 데이터를 모아두고 보니 "어떻게 처리하지?"라는 관점이 생겼고, 이러한 상황에서 Spark라는 Tool이 개발되지 않았나 생각합니다.
(그래서 Spark가 뭔데 씹덕아?)
Spark란?
Spark를 설명하기에 앞서, Hadoop의 개념을 잠시 정리하고 가보겠습니다. Hadoop은 크게 Storage Layer / Resource Management Layer / Application Layer 총 3단계로 나뉩니다. 이 3가지 Layer 중 Spark는 Application Layer에 속하게 됩니다. Application Layer는 데이터를 처리하는 단계를 의미합니다. 즉, 분산되어있는 대량의 데이터를 병렬로 처리함으로써 데이터의 효율성을 극대화할 수 있습니다.
이러한 관점에서 Spark를 바라본다면...
Spark는 대규모 데이터 처리 및 분석을 위해 설계된 오픈 소스 분산 컴퓨팅 시스템입니다. 기본적으로, 컴퓨팅 기능 (In-Memory Caching)을 활용하여 빠른 데이터 처리 속도를 제공하며, 대규모 데이터셋에 대해 복잡한 쿼리 / 분석을 효율적으로 수행할 수 있도록 도와줍니다. 여기서 한 가지 의문이 들 수 있습니다.
Hadoop에도 데이터를 처리 / 집계하는 MapReduce가 있는데, 왜 굳이 Spark를 사용해야 할까요?
해당 질문에 대한 답변은 MapReduce와 Spark의 데이터 처리 방식을 이해하면 납득이 갑니다. "A->B->C->D" 작업을 순차적으로 수행하며, 이전 작업이 현재 작업에 영향을 미치는 상황이라고 가정해보면,
Hadoop의 MapReduce는 한 번의 작업이 수행될 때마다 HDFS에서 File I/O를 수행해야한다. File I/O는 물리적인 Storage에 읽기 / 쓰기 하는 작업이기에, 상당히 느리다. 매 단계에서의 결과를 HDFS에 다시 기록하고, 다음 단계에서 이를 활용해야 한다. 이로 인하여 대규모 데이터 처리에선 적합하지만, 실시간으로 데이터를 처리하는 관점에선 한계점이 존재하게 되는 것이죠.
반면, Spark는 MapReduce와 다르게 중간 결과를 HDFS에 저장하지 않고 메모리에 저장하는 방식을 활용하여 빠르게 데이터를 처리할 수 있다는 장점이 있습니다. 물론, 메모리에 캐싱 느낌으로 저장해서 관리하기 때문에 MapReduce에 비해 빠르지만 Cost가 높은 연산입니다.
결국 핵심은...
"Spark가 메모리 기반의 빠른 데이터 처리를 가능하게 하여, 실시간 데이터 처리 및 복잡한 분석 작업에서 더 뛰어난 성능을 발휘한다"
입니다.
Spark는 어떻게 작동할까?
Driver 생성 및 리소스 요청
Spark 애플리케이션이 시작되면 가장 먼저 Cluster 노드 중 하나에 Spark Driver가 생성됩니다. 그 후에, Yarn / Mesos / Kubenetes 등과 같은 Cluster Manager와 통신하며 작업에 필요한 리소스를 요청합니다. 그 후, Cluster Manager는 CPU / RAM / Executor 수 등 작업에 필요한 리소스를 할당해줍니다.
여기서 Cluster Manager는 Hadoop에서의 Resource Management Layer 역할을 수행하고, Spark Driver가 설치된 Cluster는 Hadoop에서 Application Layer 역할을 수행합니다.
Spark Context 생성
Spark Driver는 Cluster와의 연결을 관리하고, 애플리케이션 실행에 필요한 환경을 설정하는 Spark Context를 생성합니다. Spark Context는 사용자가 입력한 코드에서 Transformation 연산을 수행하며, 해당 연산은 RDD Lineage하게 표현됩니다. Action이 호출되기 전까지 Spark Context는 데이터를 실제로 처리하지 않는다는 부분이 제일 중요한 Key Point 입니다.
Action 호출 (연산 수행)
Action이 호출되면, Spark Context는 이때까지 생성된 RDD Lineage를 기반으로 DAG를 생성합니다. Spark Context는 DAG를 분석해서 Spark Job을 생성합니다. 하나의 Job은 Wide Tranformation 연산에 따라 여러 Stage로 나뉠 수 있으며, 각 Stage는 여러 Task로 나뉠 수 있습니다. 여기서 주목할 만한 부분은 "Stage가 나뉜다"입니다. 그 이유는 ...
"Stage가 나뉜다" = "하나의 Executor에서 데이터를 처리한 결과가 2개 이상이다" = "Cluster 간 Shuffle이 발생한다." = "데이터의 네트워크 이동이 필요하다" = "연산 비용이 증가한다"로 이어질 수 있습니다.
결국엔, count()나 collect()와 같은 Wide Transformation 작업을 얼마나 적게 수행하는지는 실제 Spark Job 연산에 큰 영향을 미칠 수 있습니다.
뭔가 작업을 처리해야하는 상황에서 최적화를 할려면 .... 중요하겠죠?
데이터는 어떻게 저장될까?
Spark는 다양한 데이터 형식으로 데이터를 저장하고 활용할 수 있습니다.
JSON(JavaScript Object Notation)은 가독성이 좋고, 구조화된 데이터를 표현하는 데 적합한 자료구조입니다. 특히, CSV에서는 표현이 불가능한 "데이터 간 계층적인 구조"를 표현하기에 매우 적합합니다. JSON은 주로 비정형 데이터 저장에 많이 활용하는데, 저장 공간이 상대적으로 크다는 단점을 가지고 있습니다.
CSV(Comma Seperated Values)는 텍스트 파일 형식으로, 쉼표(,)를 기준으로 구분되어 저장된 구조를 의미합니다. CSV는 간단한 데이터 저장 및 전송에 적합하며, 대부분의 데이터 처리 도구에서 지원하기 때문에 널리 사용됩니다. 그러나 데이터 타입을 명시할 수 없고, 대규모 데이터셋에서는 성능이 저하될 수 있습니다.
Parquet는 컬럼 기반 저장 형식으로, 대규모 데이터 처리에 최적화되어 있습니다. Apache Hadoop과의 호환성이 뛰어나며, 스키마를 명확하게 정의할 수 있어 대량의 데이터를 효율적으로 처리할 수 있습니다. Parquet는 데이터 압축과 효율적인 I/O를 지원하여 성능이 우수합니다. AWS Redshift가 컬럼 기반 저장 형식의 예시입니다.
Spark에서 메모리는 어떻게 관리하는가?
Spark는 크게 Reserved Memory / User Memory / Spark Memory 총 3개로 나뉩니다.
Reserved Memory는 말 그대로 "예약 메모리"입니다. OS를 설치할 때 기본적으로 잡아먹는 메모리가 존재하듯이, Spark를 작동시키기 위한 시스템 프로세스나 기타 메모리 관련 Overhead를 처리하는데 사용되는 메모리입니다. Default로는 300MB로 설정되어 있습니다.
User Memory는 Java Heap에서 Reserved Memory를 제외한 영역 중 일부를 차지하며, Default는 25%로 설정되어 있습니다. 사용자 정의 객체(User Defined Function)나 사용자가 정의한 변수, 외부 라이브러리를 사용할 때 해당 메모리 영역을 사용하게 됩니다.
Spark Memory는 User Memory를 제외한 나머지 영역을 의미합니다. 해당 메모리는 Execution Memory와 Storage Memory로 나뉘어 활용됩니다. Execution Memory에서는 Spark Job 실행 중에 사용되는 메모리로써, Shuffle / Join / Aggregation 연산으로 인해 생성된 중간 결과를 저장하는데 사용되는 메모리입니다. 작업 실행 중 "Java Heap에서 Out Of Memory"가 발생하면, 해당 메모리 영역이 부족하다는 것을 의미합니다. Storage Memory에서는 RDD나 DataFrame 등을 저장하는 데 사용하는 메모리로, Spark Session에서 .cache()나 .persist()를 사용할 때 활용되는 메모리입니다.
이러한 메모리 영역은 Spark Session을 생성할 때 config로써 변경하여 활용할 수 있습니다. 특정 사용자가 Spark를 사용할 때 처리해야 할 데이터의 규모나 연산량을 예측할 수 있다면, 그에 맞게 메모리를 변경하여 사용하는 것 또한 최적화의 일부이지 않을까 하는 생각이 듭니다.
Dataset vs DataFrame
Dataset과 DataFrame은 Spark에서 고수준 API를 통해 제공되는 데이터 구조로, 대규모 데이터를 처리하고 분석하기 위한 최적의 방식을 제공합니다. 두 데이터 구조를 분석해보자면 ...
Dataset는 RDD와 DataFrame의 장점을 결합한 구조라고 합니다. DataFrame과의 큰 차이점은 "타입 안정성" 측면입니다. DataFrame은 실제 데이터를 활용할 때(RunTime) 저장된 데이터의 Type을 확인하는 반면, Dataset은 컴파일 단계에서 데이터의 Type읠 확인하기 때문에 상대적으로 안정성이 높습니다. 그렇지만, Dataset은 Java나 Scala에서만 활용이 가능하고, Python과 R에서는 지원하지 않는다는 특징을 가지고 있습니다.
DataFrame은 데이터가 행/열로 구조화 된 형태이며, 비정형 및 반정형 데이터를 처리할 때도 효율적으로 활용할 수 있습니다. 그리고, RDD보다 좀 더 높은 수준의 API로 데이터에 대한 추상화를 제공한다는 장점을 가지고 있습니다. DataFrame은 Java / Scala / Python / R에서 모두 제공한다는 특징을 가지고 있습니다.
고수준 API의 공통적인 특징은 "빠르고 편리하다"라고 생각합니다. SQL 쿼리와 같이 친숙한 방식으로써 데이터에 접근할 수 있으며, Spark 내부의 Catalyst Optimizer와 Tungsten을 통해 최적화된 Spark Job을 실행할 수 있다는 장점이 있습니다. 그래서인지, 실습할 때 RDD보다는 DataFrame을 많이 사용했던 것 같네요 ㅎㅎ.
RDD (Resilient Distributed Data)
RDD는 Spark에서 사용되는 저수준 API로써, 탄력적인 분산 데이터셋이라는 이름을 가지고 있습니다. 다수의 서버(Cluster, Node)에 걸쳐 분산 형태로 저장된 요소들의 집합을 RDD라고 이야기합니다. RDD는 크게 3가지 특징을 가지고 있습니다.
첫 번째로, RDD는 추상화된 형태로 제공됩니다. 실제로는 여러 Cluster에 분산되어 데이터가 존재하지만, 사용자가 데이터를 활용할 때는 하나의 통합된 데이터로 생각하여 사용할 수 있습니다. 두 번째로, RDD는 탄력적이며 불변성을 가지고 있습니다. 즉, 네트워크 장애로 인해 연산을 정상적으로 종료하지 못했을 때, 이전 RDD 연산으로 이동하여 스스로 복구할 수 있는 내성을 가지고 있습니다. 마지막으로, RDD를 포함한 Spark 연산은 Lazy Evaluation을 제공합니다. Transformation 연산이 사용자에 의해 호출되더라도, count(), collect() 등의 Action 연산이 호출되기 전까지 작업을 수행하지 않습니다. Action 연산이 호출되었을 때 비로소 RDD Lineage하게 구성된 DAG 형태의 Spark Job이 각 Executor에서 수행됩니다.
RDD는 고수준 API인 DataFrame에 비해 연산 속도가 많이 느립니다. 그 이유는, Dataset이나 DataFrame은 Spark 내부의 Catalyst Optimizer에 의해 최적화된 상태로 작업을 수행하게 되지만 RDD는 그렇지 않습니다. 여기서도 한 가지 의문이 생길 수 있습니다.
"DataFrame보다 느린데, 왜 써야되죠?"
최적화가 되어 있지 않지만, RDD를 사용해야하는 혹은 사용할 수 있는 상황이 몇가지 존재합니다. 예를 들어, 처리하고자 하는 데이터의 형태가 Text / Binary와 같은 비정형 데이터일 때는, DataFrame에 비해 효율적일 수 있습니다. 웹 서버 로그 파일에서 특정 IP 주소에서만 발생한 요청 수를 계산하는 상황에서는 오히려 RDD가 빠를 수 있습니다.
결국엔, 처리하고자하는 상황에 맞는 방법을 선택하여 활용하는 것이 중요해 보입니다.
마무리하며
오늘은 Spark가 나오게 된 배경과 더불어, 데이터 저장 방식 및 API에 대해서 정리하는 글을 작성해봤습니다. 저 또한 Spark를 배워나가는 입장이고, 앞으로도 Spark를 사용할 일이 굉장히 많을 것으로 예상되는데요. 그러한 입장에서, 새로 Spark를 공부하시는 분들이 제 게시물을 읽고 많은 도움이 되었으면 좋겠다는 생각이 들었습니다.
다음 게시글은 Spark 최적화와 관련된 내용으로 다시 오겠습니다!!
긴 글 읽어주셔서 감사합니다. :)
Reference
- [스파크 완벽 가이드]
- [스파크 메모리 정리] https://0x0fff.com/spark-memory-management/
'Apache Spark' 카테고리의 다른 글
[Spark 완벽 가이드] Adaptive Query Execution이란? (0) | 2025.01.02 |
---|---|
[Spark 완벽 가이드] Catalyst Optimizer란? (2) | 2024.11.18 |