반응형
- Apache Airflow의 장점
- Python 기반
- 데이터 분석을 하는 분들도 쉽게 코드를 작성할 수 있음
- Scheduling : 특정 간격으로 계속 실행
- Airflow 콘솔이 따로 존재해 Task 관리를 서버에서 들어가 관리하지 않아도 됨
- 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용함
- Backfill : 과거 작업 실행
- 캐치업 Catch up -> 그동안 밀린거 다 실행할건지 여부 / True - YES!
- 특정 Task 실패시 => Task만 재실행 / DAG 재실행 등 실패 로직도 있음
- 데이터 엔지니어링에서 많이 사용됨
- Google Cloud Platform에 있는 대부분의 기능을 지원
- Google Cloud Platform엔 Managed Service(관리형 서비스)인 Composer 존재
- Python 기반
- UTC
- 협정 세계시로 1972년 1월 1일부터 시행된 국제 표준시
- 서버에서 시간 처리할 땐, 거의 UTC를 사용함
- 한국 시간은 UTC+9 hour
- SQL에서 구현하려면 date_time + interval 9 hour 혹은 Data_add()함수 사용
- Airflow에서 UTC를 사용하기 때문에, CRON 표시할 때 UTC 기준으로 작성
- 예 : UTC 30 1 * * * => 한국은 30 10 * * * => 한국 오전 10시 30분
- Airflow 실행
- airflow webserver와 airflow scheduler 2개 실행해야 함
- 터미널 1개에 webserver를 띄우고, command+t로 새로운 터미널을 띄워서 scheduler를 띄우기
airflow webserver
airflow scheduler
- webserver는 웹 서버를 담당하고,
- scheduler가 DAG들을 스케줄링(실행)함
airflow initdb
# airflow initdb는 처음 db를 생성하는 작업을 함
- Airflow Architecture
- Airflow Webserver
- 웹 UI를 표현하고, workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인 등 가능
- Airflow Scheduler
- 작업 기준이 충족되는지 여부를 확인
- 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
- 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함
- DAG
- Airflow의 DAG으로 모델링됨: Directed Acyclic Graphs
- 방향이 있는 비순환 그래프
- 비순환이기 때문에 마지막 Task가 다시 처음 Task로 이어지지 않음
코드로 보는 DAG
-
- 1) Default Argument 정의
- start_date가 중요! 과거 날짜를 설정하면 그 날부터 실행
- retries, retry_delay : 실패할 경우 몇분 뒤에 재실행할지?
- priority_weight : 우선 순위
- 외에도 다양한 옵션이 있는데, 문서 참고
- 2) DAG 객체 생성
- 첫 인자는 dag_id인데 고유한 id 작성
- default_args는 위에서 정의한 argument를 넣고
- schedule_interval은 crontab 표현 사용
- schedule_interval='@once'는 한번만 실행. 디버깅용으로 자주 사용
- 5 4 * * * 같은 표현을 사용
- 더 궁금하면 crontab guru 참고
dag = DAG('bash_dag', default_args=default_args, schedule_interval='@once'))
- 3) Operator로 Task 정의
- Operator를 사용해 Task를 정의함
- Operator가 Instance가 되면 Task라 부름
- BashOperator : Bash Command 실행
- PythonOperator : Python 함수 실행
- BigQueryOperator : BigQuery 쿼리 날린 후 Table 저장
- 외에도 다양한 operator가 있고, operator마다 옵션이 다름
- mysql_to_hive 등도 있음
- Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음
- 이름 (id) 다르게 안하면 구분을 못함
- Airflow 2.0 부터는 Task group 으로 관리함
- Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음
- Operator를 사용해 Task를 정의함
Airflow 2 마이그레이션
기존 Airflow 1 버전은 DAG 개수가 늘어나면서 연속적인 Task Instance 스케줄링 간 Lag이 길었고 기타 자잘한 버그들이 있었습니다. Airflow 2에서 대표적으로 개선된 부분들은 아래와 같습니다.
- 스케줄링 퍼포먼스가 개선되었습니다.
- Airflow 2는 DAG Serialization과 Fast-Follow를 도입하여 Scheduler의 반복적인 DAG 파싱 작업을 줄이고 Task Scheduling 과정을 개선하였습니다. Astronomer 블로그에서 벤치마크 테스트를 했을 때 10배 이상의 성능 개선이 있었다고 합니다. 팀에서 경험하는 문제였던 Task instance 스케줄링 간 Lag 현상도 많이 줄었습니다.
- Scheduler HA(High Availability)를 지원해서 스케줄러의 Scale Out이 용이합니다.
- Active-Active 클러스터 형태로 Scheduler의 HA를 지원합니다. 이때 Meta DB는 SELECT ... FOR UPDATE로 Row Level Lock이 걸리게 됩니다.
- 웹 서버 사용성이 개선되었습니다.
- DAG Serialization을 통해 더 빠르게 웹 UI에서 더 빠르게 DAG 정보를 불러올 수 있으며 Auto Refresh 기능 등 사용성이 개선되었습니다. 저희 팀에서도 사용자의 Airflow 사용성을 위해 지속적으로 하위호환성을 고려하며 버전을 업그레이드하고 있습니다. 글을 쓰는 시점인 2.3 버전은 더 직관적인 UI를 제공해 주어 저희 팀에서도 2.3 버전으로 업그레이드를 완료하였습니다.
이 외에도 TaskFlow API 도입, Airflow Core Component에서 Provider 분리, Task Group, Smart Sensor 도입 등등 많은 변화가 있습니다. 더 궁금하신 분들은 해당 글을 읽어보시면 도움이 될 것 같습니다.
Airflow 명령어
- DAG 파일을 추가하고, 웹서버에 보이기까지 시간이 좀 걸릴 수 있음
- DAG 파일들을 확인하고 싶은 경우
- airflow의 dags 폴더 아래에 있는 dag들을 출력함
-
airflow list_dags
- 특정 DAG의 task를 출력하고 싶은 경우
- test라는 DAG의 task 출력
-
airflow list_tasks test
- Tree 형태로 출력
-
airflow list_tasks test --tree
- 특정 Task를 test하고 싶은 경우
- date 날짜로 실행함
-
airflow test [DAG id] [Task id] [date]
- 예시 : airflow test test print_date 2020-02-09
- Airflow scheduler 실행
- DAG들이 실행됨
-
airflow scheduler
- Airflow 관련 help 명령어
airflow -h
- Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우
- Papermill 사용
- 참고 문서
- UTC 시간대를 항상 생각하는 습관 갖기
- execution_date이 너무 헷갈림
- 2017년에 Airflow 처음 사용할 때 매우 헷갈렸던 개념
- 박호균님의 블로그 참고
- 추후 다른 글로 정리할 예정
- Task가 실패했을 경우 슬랙 메세지 전송하기
- Integrating Slack Alerts in Airflow 참고하면 잘 나와있음
- Hook이란?
- Hook은 외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스
- 대부분 Operator가 실행되기 전에 Hook을 통해 통신함
- 공식 문서 참고
- 머신러닝에서 사용한 예시는 Github 참고
- airflow Github에 많은 예제 파일이 있음
- Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음
- 과거값 기준으로 재실행
- 단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임
airflow backfill -s 2020-01-05 -e 2020-01-10 dag_id
- backfill은 기본적으로 실행되지 않은 Task들만 실행함. DAG을 아예 다시 실행하고 싶다면 --reset_dagruns를 같이 인자로 줘야 함
airflow backfill -s 2020-01-05 -e 2020-01-10 --reset_dagruns dag_id
- 실패한 Task만 재실행하고 싶다면 --rerun_failed_task를 사용함
airflow backfill -s 2020-01-05 -e 2020-01-10 --reset_failed_task dag_id
- airflow backfill이 아닌 강제로 trigger를 하고 싶다면 다음 명령어 사용
-
airflow trigger_dag dag_id -e 2020-01-01
-
- 재시도시 Exponential하게 실행되길 원하면
- retry_exponential_backoff 참고
반응형
'Data Analysis > Python' 카테고리의 다른 글
[Airflow] Remove DAG examples DAG 예시 파일 제거 방법 (0) | 2023.01.24 |
---|---|
[Airflow]. Airflow Local Executor와 Celery Executor (0) | 2023.01.23 |
[Pandas] 조건걸고 새로운 칼럼 추가하기 (0) | 2023.01.07 |
[Airflow] The important views of the Airflow UI (1) | 2023.01.07 |
[Python] 대용량 데이터csv 읽어오기 (PyArrow) (0) | 2023.01.07 |
댓글