본문 바로가기
Data Analysis/Data Engineering

Airflow에서 데코레이터(decorator)를 사용하는 이유

by Hagrid 2024. 4. 19.
반응형

 

 

사용이유

코드를 간결하게 하고, 재사용성을 높이며, 함수나 메소드에 추가 기능을 손쉽게 적용하기 위해서입니다. Airflow에서 특히 자주 사용되는 데코레이터는 @task 데코레이터인데, 이는 몇 가지 중요한 이유로 사용됩니다

  1. 코드의 간결성: @task 데코레이터를 사용하면, 일반 Python 함수를 Airflow 태스크로 변환할 수 있습니다. 이는 DAG 내에서 태스크를 정의할 때 보다 간결하고 명확한 코드를 작성할 수 있도록 도와줍니다.
  2. 재사용성 증가: 함수를 사용하여 태스크 로직을 정의하면, 이 함수를 다른 DAG에서도 재사용할 수 있습니다. 데코레이터는 이런 함수들을 태스크로 쉽게 변환해주므로 코드 재사용성이 높아집니다.
  3. 추가 기능의 적용: Airflow의 @task 데코레이터는 태스크의 실행 방법을 설정하는 여러 파라미터를 제공합니다. 예를 들어, retry, timeout, pool 등의 태스크 실행 옵션을 데코레이터를 통해 손쉽게 설정할 수 있습니다.
  4. 유지보수성 향상: 데코레이터를 사용하면, 태스크의 행동을 변경하거나 확장하는 것이 더 쉬워집니다. 예를 들어, 새로운 로깅 데코레이터를 추가하여 모든 태스크 실행 시 로깅을 자동화할 수 있습니다.

 

. 코드의 간결성

일반적인 Airflow 코드는 태스크를 정의할 때 PythonOperator와 같은 연산자를 사용합니다. 그러나 @task 데코레이터를 사용하면 함수 자체가 태스크가 되어 코드를 훨씬 간결하게 만들 수 있습니다.

전통적인 방법:

 
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def my_function():
    print("Hello from my function!")

dag = DAG('my_dag', start_date=datetime(2021, 1, 1))

task = PythonOperator(
    task_id='my_task',
    python_callable=my_function,
    dag=dag
)

@task 데코레이터 사용:

from airflow.decorators import dag, task
from datetime import datetime

@dag(start_date=datetime(2021, 1, 1))
def my_dag():
    @task
    def my_function():
        print("Hello from my function!")

    my_function()

dag = my_dag()
 

2. 재사용성 증가

태스크로 사용될 함수를 정의하고, 이 함수를 다양한 DAG에서 호출하여 재사용할 수 있습니다.

재사용 가능한 태스크 함수:

from airflow.decorators import task

@task
def process_data(data):
    # 데이터 처리 로직
    return modified_data

# 이 함수는 여러 DAG에서 재사용 가능
 

3. 추가 기능의 적용

@task 데코레이터를 통해 태스크의 여러 실행 옵션을 설정할 수 있습니다.

예시:

from airflow.decorators import task

@task(retries=3, timeout=300)
def my_task():
    # 복잡한 연산 수행
    pass

이 함수는 실패 시 최대 3번 재시도하고, 실행 시간이 300초를 초과하면 타임아웃됩니다.

4. 유지보수성 향상

새로운 로깅 기능을 태스크에 추가하고자 할 때, 로깅 데코레이터를 만들어 적용할 수 있습니다.

로깅 데코레이터 예시:

from functools import wraps

def logging_decorator(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"Calling {func.__name__}")
        result = func(*args, **kwargs)
        print(f"{func.__name__} completed")
        return result
    return wrapper

@task
@logging_decorator
def my_task():
    print("Executing my task")

이 예시에서 logging_decorator는 함수가 호출될 때와 완료될 때 로그를 출력합니다. 이런 방식으로 특정 기능을 태스크에 쉽게 추가하고 관리할 수 있습니다.

 
반응형

댓글