본문 바로가기
Data Analysis/Python

[Airflow] 기본 정리

by Hagrid 2023. 1. 23.
반응형

 

  • Apache Airflow의 장점
    • Python 기반
      • 데이터 분석을 하는 분들도 쉽게 코드를 작성할 수 있음
    • Scheduling : 특정 간격으로 계속 실행
      • Airflow 콘솔이 따로 존재해 Task 관리를 서버에서 들어가 관리하지 않아도 됨
      • 각 작업별 시간이 나오기 때문에 bottleneck을 찾을 때에도 유용함
    • Backfill : 과거 작업 실행
      • 캐치업 Catch up  ->  그동안 밀린거 다 실행할건지 여부 / True - YES! 
    • 특정 Task 실패시 => Task만 재실행 / DAG 재실행 등 실패 로직도 있음
    • 데이터 엔지니어링에서 많이 사용됨
    • Google Cloud Platform에 있는 대부분의 기능을 지원
    • Google Cloud Platform엔 Managed Service(관리형 서비스)인 Composer 존재
 
  • UTC
    • 협정 세계시로 1972년 1월 1일부터 시행된 국제 표준시
    • 서버에서 시간 처리할 땐, 거의 UTC를 사용함
    • 한국 시간은 UTC+9 hour
      • SQL에서 구현하려면 date_time + interval 9 hour 혹은 Data_add()함수 사용
    • Airflow에서 UTC를 사용하기 때문에, CRON 표시할 때 UTC 기준으로 작성
      • 예 : UTC 30 1 * * * => 한국은 30 10 * * * => 한국 오전 10시 30분
  • Airflow 실행
    • airflow webserver와 airflow scheduler 2개 실행해야 함
    • 터미널 1개에 webserver를 띄우고, command+t로 새로운 터미널을 띄워서 scheduler를 띄우기
airflow webserver
airflow scheduler
  • webserver는 웹 서버를 담당하고,
  • scheduler가 DAG들을 스케줄링(실행)함
airflow initdb
# airflow initdb는 처음 db를 생성하는 작업을 함
 
  • Airflow Architecture
    • https://zzsza.github.io/kyle-school/week6/#/2/8
    • Airflow Webserver
      • 웹 UI를 표현하고, workflow 상태 표시하고 실행, 재시작, 수동 조작, 로그 확인 등 가능
    • Airflow Scheduler
      • 작업 기준이 충족되는지 여부를 확인
      • 종속 작업이 성공적으로 완료되었고, 예약 간격이 주어지면 실행할 수 있는 작업인지, 실행 조건이 충족되는지 등
      • 위 충족 여부가 DB에 기록되면, task들이 worker에게 선택되서 작업을 실행함
 
  • DAG
    • Airflow의 DAG으로 모델링됨: Directed Acyclic Graphs
    • 방향이 있는 비순환 그래프
    • 비순환이기 때문에 마지막 Task가 다시 처음 Task로 이어지지 않음

코드로 보는 DAG

-

코드를 dags 폴더 아래에 test.py로 저장하고 웹서버에서 test DAG 옆에 있는 toggle 버튼을 ON으로 변경

 

  • 1) Default Argument 정의
    • start_date가 중요! 과거 날짜를 설정하면 그 날부터 실행
    • retries, retry_delay : 실패할 경우 몇분 뒤에 재실행할지?
    • priority_weight : 우선 순위
    • 외에도 다양한 옵션이 있는데, 문서 참고

 

  • 2) DAG 객체 생성
    • 첫 인자는 dag_id인데 고유한 id 작성
    • default_args는 위에서 정의한 argument를 넣고
    • schedule_interval은 crontab 표현 사용
      • schedule_interval='@once'는 한번만 실행. 디버깅용으로 자주 사용
      • 5 4 * * * 같은 표현을 사용
      • 더 궁금하면 crontab guru 참고
      dag = DAG('bash_dag', default_args=default_args, schedule_interval='@once'))
  • 3) Operator로 Task 정의
    • Operator를 사용해 Task를 정의함
      • Operator가 Instance가 되면 Task라 부름
      • BashOperator : Bash Command 실행
      • PythonOperator : Python 함수 실행
      • BigQueryOperator : BigQuery 쿼리 날린 후 Table 저장
    • 외에도 다양한 operator가 있고, operator마다 옵션이 다름
    • mysql_to_hive 등도 있음
      • Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음
        • 이름 (id) 다르게 안하면 구분을 못함 
      • Airflow 2.0 부터는 Task group 으로 관리함 
 

Benefits of the Airflow Refactored Scheduler - Astronomer

Dive into the horizontally scalable Apache Airflow 2.0 Scheduler. Learn how to leverage Airflow’s scheduling functionality to start tasks quick and easy.

www.astronomer.io

 


Airflow 2 마이그레이션

기존 Airflow 1 버전은 DAG 개수가 늘어나면서 연속적인 Task Instance 스케줄링 간 Lag이 길었고 기타 자잘한 버그들이 있었습니다. Airflow 2에서 대표적으로 개선된 부분들은 아래와 같습니다.

  • 스케줄링 퍼포먼스가 개선되었습니다.
    • Airflow 2는 DAG Serialization과 Fast-Follow를 도입하여 Scheduler의 반복적인 DAG 파싱 작업을 줄이고 Task Scheduling 과정을 개선하였습니다. Astronomer 블로그에서 벤치마크 테스트를 했을 때 10배 이상의 성능 개선이 있었다고 합니다. 팀에서 경험하는 문제였던 Task instance 스케줄링 간 Lag 현상도 많이 줄었습니다.
  • Scheduler HA(High Availability)를 지원해서 스케줄러의 Scale Out이 용이합니다.
    • Active-Active 클러스터 형태로 Scheduler의 HA를 지원합니다. 이때 Meta DB는 SELECT ... FOR UPDATE로 Row Level Lock이 걸리게 됩니다.
  • 웹 서버 사용성이 개선되었습니다.
    • DAG Serialization을 통해 더 빠르게 웹 UI에서 더 빠르게 DAG 정보를 불러올 수 있으며 Auto Refresh 기능 등 사용성이 개선되었습니다. 저희 팀에서도 사용자의 Airflow 사용성을 위해 지속적으로 하위호환성을 고려하며 버전을 업그레이드하고 있습니다. 글을 쓰는 시점인 2.3 버전은 더 직관적인 UI를 제공해 주어 저희 팀에서도 2.3 버전으로 업그레이드를 완료하였습니다.

이 외에도 TaskFlow API 도입, Airflow Core Component에서 Provider 분리, Task Group, Smart Sensor 도입 등등 많은 변화가 있습니다. 더 궁금하신 분들은 해당 글을 읽어보시면 도움이 될 것 같습니다.


Airflow 명령어

  • DAG 파일을 추가하고, 웹서버에 보이기까지 시간이 좀 걸릴 수 있음
  • DAG 파일들을 확인하고 싶은 경우
    • airflow의 dags 폴더 아래에 있는 dag들을 출력함
    •   airflow list_dags
      
  • 특정 DAG의 task를 출력하고 싶은 경우
    • test라는 DAG의 task 출력
    •   airflow list_tasks test
      
    • Tree 형태로 출력
    •   airflow list_tasks test --tree
      
  • 특정 Task를 test하고 싶은 경우
    • date 날짜로 실행함
    •   airflow test [DAG id] [Task id] [date]
      
    • 예시 : airflow test test print_date 2020-02-09
  • Airflow scheduler 실행
    • DAG들이 실행됨
    •   airflow scheduler
      
  • Airflow 관련 help 명령어
  airflow -h

 

  • Airflow에서 Jupyter Notebook의 특정 값만 바꾸며 실행하고 싶은 경우
  • UTC 시간대를 항상 생각하는 습관 갖기
  • execution_date이 너무 헷갈림
    • 2017년에 Airflow 처음 사용할 때 매우 헷갈렸던 개념
    • 박호균님의 블로그 참고
    • 추후 다른 글로 정리할 예정
  • Task가 실패했을 경우 슬랙 메세지 전송하기
  • Hook이란?
    • Hook은 외부 플랫폼, 데이터베이스(예: Hive, S3, MySQL, Postgres, Google Cloud Platfom 등)에 접근할 수 있도록 만든 인터페이스
    • 대부분 Operator가 실행되기 전에 Hook을 통해 통신함
    • 공식 문서 참고
  • 머신러닝에서 사용한 예시는 Github 참고
  • airflow Github에 많은 예제 파일이 있음
  • Context Variable이나 Jinja Template의 ds를 사용해 Airflow에서 날짜를 컨트롤 하는 경우, Backfill을 사용할 수 있음
    • 과거값 기준으로 재실행
    • 단, 쿼리에 CURRENT_DATE() 등을 쓰면 Airflow에서 날짜를 컨트롤하지 않고 쿼리에서 날짜 컨트롤하는 경우라 backfill해도 CURRENT_DATE()이 있어서 현재 날짜 기준으로 될 것임
      airflow backfill -s 2020-01-05 -e 2020-01-10 dag_id
    
    • backfill은 기본적으로 실행되지 않은 Task들만 실행함. DAG을 아예 다시 실행하고 싶다면 --reset_dagruns를 같이 인자로 줘야 함
      airflow backfill -s 2020-01-05 -e 2020-01-10 --reset_dagruns dag_id
    
    • 실패한 Task만 재실행하고 싶다면 --rerun_failed_task를 사용함
      airflow backfill -s 2020-01-05 -e 2020-01-10 --reset_failed_task dag_id
    
  • airflow backfill이 아닌 강제로 trigger를 하고 싶다면 다음 명령어 사용
    •   airflow trigger_dag dag_id -e 2020-01-01
      

       

  • 재시도시 Exponential하게 실행되길 원하면
    • retry_exponential_backoff 참고
 
 
 
 
반응형

댓글