Executor
Executor란 ?
Airflow에서 이야기하는 Executor은 task가 실행되는 매커니즘으로
Executor을 어떻게 설정하느냐에 따라 task 실행방식이 달라집니다.
Airflow에서 제공하는 전체 Executor은 아래의 링크에서 확인할 수 있습니다.
https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html
Sequential Executor
Sequential Executor은 Airflow에서 제공하는 기본 Executor입니다.
sqlite와 함께 사용할 수 있는 Executor로 한번에 하나의 task만 실행할 수 있어
병렬로 실행이 안된다고 볼 수 있습니다. 그렇기 때문에 실제로 운영 서비스에 넣기에는 적합하지 않습니다.
Local Executor
Local Executor은 Sequential Executor과 달리 task를 병렬로 실행하는 것이 가능합니다.
옵션값을 통해 최대 몇 개의 task를 병렬로 실행할지 설정하는 것이 가능합니다.
self.parallelism 옵션 값을 이용해 설정하며 이 설정값을 0으로 설정하는 경우 Local Executor는 task를 제한없이 무제한으로 실행하게 됩니다. 이를 Unlimited parallelism 이라고 합니다.
반대로 self.parallelism을 지정하는 경우에는 Limited parallelism 형식으로 동작합니다.
Celery Executor
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/celery.html
Celery Executor 역시 Local Executor과 마찬가지로 task를 병렬로 실행할 수 있습니다.
Celery는 추가적으로 Redis나 RabbitMQ같은 MQ(셀러리 백엔드)를 추가적으로 필요로 하는데 이는 Celery Executor가 클러스터 형식으로 구성되고 MQ에 있는 task를 실행하는 구조로 동작하기 때문입니다.
redis란?
Key, Value 구조의 비정형 데이터를 저장하고 관리하기 위한 오픈 소스 기반의 비관계형 데이터 베이스 관리 시스템 (DBMS)이다.데이터베이스, 캐시, 메세지 브로커로 사용되며 인메모리 데이터 구조를 가진 저장소이다.
레디스는 아래와 같은 특징을 지니고 있다.
한 번에 하나의 명령만 처리할 수 있으며, Key-Value 구조이기 때문에 쿼리를 사용할 필요가 없다.데이터를 디스크에 쓰는 구조가 아니라 메모리에서 데이터를 처리하기 때문에 속도가 빠른편이다.
따라서 Celery Executor 클러스터 형식으로 구성할 수 있어 Executor에 대한 HA 구성과 Scale out이 자연스럽게 가능하며 LocalExecutor보다 실제 운영환경에 적합하다고 판단됩니다.
다만 DAG 파일 역시 Celery Executor로 사용하고 있는 모든 Worker에 배포되어야 하기 때문에 git을 이용해 DAG를 관리하고 배포하는 시스템을 구축해야 한다.
Local Executor 는 Worker의 수가 1개인데 Celery Executor 는 Worker의 수가 N개 일 수 있습니다.
LocalExecutor와 CeleryExecutor의 아키텍처를 비교해보겠습니다.
LocalExecutor와 달리 CeleryExecutor는 N개의 Worker가 각각 subprocess를 가질 수 있습니다. 또한 scheduler가 subprocess를 할당한다기보다는, 각 Worker가 queue에 들어있는 task를 polling해가는 형태의 구조를 갖고 있는 것을 볼 수 있습니다(Executor가 task를 실행시킨다는 표현도 틀린 표현은 아닙니다). queue가 꼭 1개일거란 보장도 없습니다.
n개의 Executor가 각자 독립적으로 queue에서 task를 빼가는 방식인데, 비동기 방식으로 구동된다면 어떻게 될까요?
- 1번 executor가 queue에서 Task를 인지하고 실행합니다.
- task의 상태는 실행되었다고 DB에 기록합니다.
- 변경된 task는 queue에서 빠지게 되고, 그 다음 task를 executor가 실행합니다. 어떤 executor일지는 따로 설정하지 않으면 알 수 없습니다.
여기서 비동기 방식을 지원한다면 task의 상태가 변경되기 전에 다른 Executor에서 동일한 task를 실행할 수 있게 됩니다. queue의 FIFO 방식이 원활하게 보장되려면 task의 상태 변경이 된 이후에 그 다음 task를 실행한다는 순서가 보장되어야 합니다.
그렇기 때문에 task 할당으로 Queue를 사용하는 CeleryExecutor는 비동기 방식으로 task를 실행하지 않습니다.
Queue에는 어떤 것들이 들어갈까요?
# Task instance that is sent over Celery queues
# TaskInstanceKey, Command, queue_name, CallableTask
TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task]
Queue에는 Tuple이 들어가고,task instance key / command, queue_name, task(Callable) 이 들어있습니다. 여기에서는
- task가 실행 가능한 무언가(command, python function..)이겠구나
- queue 이름을 지정할 수 있는걸로 보아 queue는 여러개일수도 있겠구나
정도를 생각해볼 수 있겠습니다.
중간 정리를 해보자면,
- 사용자가 Webserver에 올린 Task를 DB에서 읽어들인다.
- scheduler는 실행 전 상태인 Task를 DB에서 읽어 queue에 넣는다.( task queue(message broker)에 작업을 순차적으로 적재)
- queue에 있는 task를 worker가 실행시킨다.(task들을 queue에서 하나씩 빼서 worker node(subscriber)에 할당하는 방식)
- task 상태를 변경한다.
CeleryExecutor의 task 실행 순서는 이렇게 정리해볼 수 있습니다.
참고
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/celery.html
https://dydwnsekd.tistory.com/98
https://eprj453.github.io/airflow/2022/05/01/Airflow-Executor-Deep-Dive-2-1-CeleryExecutor/
'Data Analysis > Python' 카테고리의 다른 글
Installing Apache Spark (0) | 2023.02.16 |
---|---|
[Airflow] Remove DAG examples DAG 예시 파일 제거 방법 (0) | 2023.01.24 |
[Airflow] 기본 정리 (2) | 2023.01.23 |
[Pandas] 조건걸고 새로운 칼럼 추가하기 (0) | 2023.01.07 |
[Airflow] The important views of the Airflow UI (1) | 2023.01.07 |
댓글