Learn to author, schedule and monitor data pipelines through practical examples using Apache Airflow
5. Why Airflow?
- ETL 할때 에어플로우 안쓰면 10시에 배치가 돈다고 가정했을때 API 필요하고 스노우플레이크 필요하고 Dbt 실수안해야함
- 추가로 100번 하려면 100개 필요함
6. What is Airflow?
- 데이터 파이프라인 만드는 오픈소스 (공짜) - 파이썬
- 스케쥴링 모니터링 가능함
- 스케이러블 함 = 많은 태스크 다루기 가능
- UI 괜찮음
- 익스텐스성 높음
7. Core Components
- 웹서버
- 스케쥴러
- 메타데이터베이스 스토어
- 트리거
- executor =
- Queue
- Worker
8. Core Concepts
Core Component
- Web Server
flask 서버 기반의 UI 환경이 주어진다. - Scheduler
백그라운드에서 스케쥴링된 workflow를 할당한다. - MetaStore
에어플로우에서 쓰는 메타데이터를 저장하는 데이터 베이스 - Executor
에어플로우에서 Task 인스턴스를 실행하는 주체 (이부분은 좀 복잡해서 나중에 따로 다루겠다.)
(celery excutor, kubernetes excuter가 대표적으로 존재) - Worker
Executor가 정해준 방식으로 task를 할당받아 작업하는 노드 또는 프로세서이다.
Dag
- A DAG means directed acyclic graph, and it's nothing more than a graph with nodes, directed edges and no cycles
DAG는 방향성 비순환 그래프를 의미하며 노드, 방향성 에지 edge 가 있고 주기가 없는 그래프에 불과
- cycle 하지 않는 이유 -> 순차대로 흘러가야 하니까 (아니면 오류 일어남)
Operator
Operator
- Action operator : 실제 환경에서 해당 작업을 명령할 내용을 실행시켜 주는 operator
구체적인 예시로 bashoperator, python operator등등이 존재한다.
- Transfer Operator : 원본 데이터를 다른 목적지로 이동시켜 주는 오퍼레이터
구체적인 예시로 mysql db에서 gcp bucket으로 옮기는 오퍼레이터. - Sensor Operator : 시간, 파일, 외부 이벤트를 기다리며 해당 조건을 충족해야만 이후의 작업을 진행할 수 있게 해주는 Airflow의 오퍼레이터
- It's nothing more than a task : 작업을 캡슐화 하는 방법
첫 번째는 작업 연산자입니다. 작업 연산자는 무언가를 실행합니다.
예를 들어 PythonOperator는 Python 함수를 실행하고 BashOperator는 bash 명령을 실행합니다.
+ 전송 연산자가 있습니다. 기본적으로 A지점에서 B지점으로 데이터를 전송할 수 있습니다. 예를 들어 MySQL에서 Redshift로 데이터를 전송하려고 할때 사용합니다.
마지막으로 센서sensors가 있고 센서는 다음 작업으로 이동하기 전에 어떤 일이 발생할 때까지 기다릴 수 있기 때문에 매우 유용합니다.
8. airflow에 대해 주의할 점
- airflow는 data streaming soltion이 아니다.
- airflow는 데이터 처리 프레임워크가 아니다
airflow는 데이터 처리를 위해 많은 메모리를 수용하지 못한다.
airflow를 통해 다른 프레임워크에 작업을 내려 해당 프레임워크 안에서 동작하게 워크 플로우를 작성해야 한다.
9. One 노드 아키텍쳐
Single Node Architecture
싱글 노드의 경우 Web server, Scheduler, Metastore, Executor가 단일 머신 안에서 동작한다.
모든 컴포넌트는 Metastore(Meta DB)를 통해 통신한다.
- 싱글 노드에서는 Queue는 Executor 내부에 존재하며, 어떠한 순서로 Task가 실행될지를 Queue를 통해 정의
- 싱글 노드는 단순한 테스트 용도로는 적합하지만, 어느 정도 이상의 task를 수행하기 위해서는 멀티 노드 구조가 필요하다.
Multi Node Architecture
마스터 노드와 메타 정보 노드와 워커 노드를 정의한 아키텍쳐
워커 노드를 통제하는 마스터 노드를 통해 여러가지 업무를 동시다발적으로 수행할 수 있다.
- Queue는 싱글 노드와 달리 Executor에서 분리되었으며, RabbitMQ나 Redis와 같은 서드파티 Queue를 사용한다.
- 또한, 싱글 노드와 달리 많은 수의 airflow worker를 가지게 된다
따라서, 노드#1-2가 아닌 다른 노드로 Task가 분산되어 처리된다. - Web server는 동일하게 Metastore의 정보를 fetch하여 보여주고
- Scheduler는 Executor와 Metastore에 새로운 task가 생겼음을 알려준다
- 각 Worker들은 Queue를 바라보고 있다가, Task가 생긴경우 fetch하여 실행한다.
멀티 노드 아키텍쳐에서의 역할
WEBSERVER: 메타 데이터와 상호작용 및 DAG trigger UI를 보여주는 역할.
Scheduler: Dag file들을 sync 해서 작업들의 스케쥴 할당하는 역할
Executor: task instance를 실행하는 주체 > 큐와 상호 작용 및 워커에 작업을 수행시킴.
how it works?
12.어떻게 동작하는가 ?
Airflow 실행 구조
- Task로 구성된 DAG를 작성
- Web Server와 Scheduler가 DAG를 파싱
- Scheduler는 Metadata Database를 통해 DAG Run Object 생성 및 실행
- DAG Run : 작성한 DAG의 인스턴스
- Scheduler는 Task Instance를 스케쥴링
- Trigger에 의해 Executor는 Task Instance를 실행
- Task를 수행 후 Metadata Database에 DAG의 상태를 완료로 변경
- Web Server와 Scheduler는 Metadata Database의 업데이트 된 정보를 확인
구체적인 스케줄 예시
- 새로운 파이프라인을 생성하여 dag.py를 dags 디렉토리에 추가하였다
- Web server와 Scheduler는 dag.py를 각각 Parse한다.
- Web server는 Parse하여 웹 UI에 노출시키고, Scheduler는 Metastore에 DagRun Object(DAG의 인스턴스)를 생성한다.
- Dag가 트리거되면, DagRun의 status가 Running으로 바뀌고, 진행해야할 Task가 TaskInstance로 Metastore에 생성된다.
- TaskInstance가 생성된 이후, TaskInstance는 Scheduler에 의해서 Executor로 전달된다.
- Executor가 TaskInstance를 실행한 이후에(Task 종료 이후) Metastore의 TaskInstance 상태를 업데이트한다.
- Scheduler는 지속적으로 Metastore를 확인하여 TaskInstance가 모두 종료되었는지 확인하고, 종료되었다면 DagRun의 status를 Completed로 업데이트
- Webserver는 DagRun의 상태를 보고 웹 UI에서 업데이트
Airflow 설치법(LOCAL) - 도커랑 vs code 필요
Installing Apache Airflow
Prerequisites
First, make sure you have installed Docker Desktop and Visual Studio. If not, take a look at these links:
Docker needs privilege rights to work, make sure you have them.
Follow the documentation first
If you have troubles to install these tools, here are some videos to help you
Install Apache Airflow with Docker





Well done, you've just installed Apache Airflow with Docker! 🎉
Open your web browser and go to localhost:8080

Troubleshoots
If you don't see this page, make sure you have nothing already running on the port 8080
Also, go back to your terminal on Visual Studio Code and check your application with docker-compose ps
All of your "containers" should be healthy as follow:

If a container is not healthy. You can check the logs with docker logs materials_name_of_the_container
Try to spot the error; once you fix it, restart Airflow with docker-compose down then docker-compose up -d
and wait until your container states move from starting to healthy.
If you still have trouble, reach me on the Q/A with your error.
참고 : https://velog.io/@hyunwoozz/airflow-DAG%EB%9E%80-%EB%AC%B4%EC%97%87%EC%9D%B8%EA%B0%80
[airflow] DAG에 대해 알아보고 만들어보기 #1
DAG에 대한 전반적인 설명 기술
velog.io
'Data Analysis > Python' 카테고리의 다른 글
[Python] Pandas: 한 셀의 데이터를 여러 행으로 나누기 (0) | 2023.01.06 |
---|---|
[Python] Python에서 youtube 불러오기 (0) | 2023.01.06 |
[Python] 왜 For 문 옆에는 : 을 붙여야 할까? (2) | 2023.01.05 |
[Airflow] Airflow 설치법(LOCAL) - 도커랑 vs code 필요 (0) | 2023.01.04 |
Pandas Cheatsheet: 125+ exercises (0) | 2022.12.29 |
댓글