본문 바로가기
👩🏻‍💻TECH/개념정리

[개념정리] Airflow: DAG, Task, Operator

by Alicia03 2024. 12. 2.

 

💡 이번 포스팅에서는 이런 것들을 다뤄요
-Airflow의 간단한 소개 
-Airflow의 DAG 개념과 예시를 살펴봅니다. 

-Airflow의 Task 개념과 예시를 살펴봅니다.
-Airflow의 Operator 개념과 예시를 살펴봅니다. 

 

🏃들어가며

DAG와 Task는 Airflow의 핵심 구성요소 중 하나이다. 여기서는 Airflow의 목적에 대해 간단히 소개하고 DAG와 Task, Operator에 대해서 설명하고자 한다. 

 

🌊Airflow란?

Apache Airflow는 데이터 파이프라인을 작성하고 관리하는 플랫폼이다. 여러 작업을 순차적으로 혹은 병렬로 실행할 수 있는 Workflow를 설계하고 이를 자동화하고 모니터링하는 데에 유용한 툴이다. 데이터 엔지니어링, ETL(Extract, Transform, Load), 데이터 분석 파이프라인 구축 등에 사용된다. 

 

🗺️DAG(Directed Acyclic Graph) 란?

DAG는 작업(Task) 간의 의존 관계와 실행 순서를 나타내는 그래프이다. DAG는 방향성을 가지며 순환(자기 자신으로 돌아가는 경로)이 없도록 설계된다. 

 

다시말해 DAG는 작업의 순서를 결정하는 작업의 설계도 역할을 한다고 보면 된다. 더불어 Airflow에만 쓰이는 용어가 아니고 그래프의 이름이니 유의하자!

 

DAG의 주요 속성은 다음과 같다. 

  • 작업간 관계 정의: Python 파일로 정의되며, 작업 간의 관계를 코드로 표현한다.
  • 스케줄링: 특정 시간 간격으로 워크플로를 실행하도록 설정할 수 있다.
  • 의존 관계 관리: 작업의 실행 순서를 명확히 설정할 수 있다.

DAG는 다음과 같은 구조로 작성될 수 있다. 

import datetime, pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator

# DAG 작성
with DAG(
    dag_id="01_tutorial_dag",
    start_date=pendulum.datetime(2024, 11, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *", # cron 표현식
    tags = ['20241127'],
    default_args = default_args
) as dag:

# Task 작성
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4")
    task5 = EmptyOperator(task_id="task5")
    
    # task1 >> task2 >> task3
    task1 >> task2 >> task4
    task1 >> task3 >> task5

 

✅TASK란? 

Task는 DAG 내에서 실행되는 개별 작업 단위를 의미한다. Task는 Airflow Operator에 의해 정의되며, 각 Operator는 특정 작업 유형을 처리한다. 예를 들어, Python 코드를 실행하거나, 데이터를 이동시키거나, Bash 명령어를 실행하는 등의 역할을 수행할 수 있다.

 

 Task는 다음과 같은 구조로 작성될 수 있다. 

 #예시: BashOperator
 t1 = BashOperator(
        task_id="print_first_date",
        bash_command="date",
    )
    
    # 5초간 대기
    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    
    # 현재 날짜 및 시간 출력
    t3 = BashOperator(
        task_id="print_second_date",
        bash_command="date",
    )

 

🕹️Operator란?

Operator는 Airflow에서 작업(Task)을 정의하는 가장 기본적인 단위다. 
DAG는 작업 간의 관계와 순서를 정의하고, 작업(Task)은 실제로 수행할 일을 지정한다. 이때, Operator는 작업의 구체적인 유형과 동작을 정의한다. 즉, Operator는 "무엇을 해야 하는가?"를 결정하는 역할을 한다.

 

Operator의 종류

Operator의 종류는 다양하다. 다음은 대표적인 Operator들이다. 

 

1) PythonOperator

: Python 함수를 실행한다.

from airflow.operators.python_operator import PythonOperator

def my_function():
    print("Executing a Python function!")

task = PythonOperator(
    task_id='run_python_function',  # Task의 고유 식별자
    python_callable=my_function,  # 실행할 Python 함수
    dag=dag  # 이 Task가 속할 DAG
)

 

2) BashOperator

: Bash 명령어를 실행한다.

from airflow.operators.bash_operator import BashOperator

task = BashOperator(
    task_id='run_bash_command',  # Task의 고유 식별자
    bash_command='echo "Hello, Airflow!"',  # 실행할 Bash 명령어
    dag=dag  # 이 Task가 속할 DAG
)

 

3) EmailOperator

: 이메일을 전송한다.

from airflow.operators.email_operator import EmailOperator

task = EmailOperator(
    task_id='send_email',  # Task의 고유 식별자
    to='example@example.com',  # 수신자 이메일 주소
    subject='Airflow Test Email',  # 이메일 제목
    html_content='This is a test email from Airflow.',  # 이메일 본문 (HTML 가능)
    dag=dag  # 이 Task가 속할 DAG
)

 

4) PostgresOperator

: PostgreSQL 데이터베이스에서 SQL 쿼리를 실행한다.

from airflow.providers.postgres.operators.postgres import PostgresOperator

task = PostgresOperator(
    task_id='execute_sql_query',  # Task의 고유 식별자
    postgres_conn_id='my_postgres_conn',  # Airflow에 등록된 PostgreSQL 연결 ID
    sql='SELECT * FROM my_table;',  # 실행할 SQL 쿼리
    dag=dag  # 이 Task가 속할 DAG
)

 

5) Sensor

: 특정 조건이 충족될 때까지 기다린다. Sensor는 작업(Task) 실행 전에 의존성이나 외부 이벤트가 완료되었는지 확인하는 역할을 한다.

from airflow.sensors.filesystem import FileSensor

task = FileSensor(
    task_id='wait_for_file',  # Task의 고유 식별자
    filepath='/path/to/file',  # 확인할 파일 경로
    poke_interval=30,  # 파일을 확인할 주기 (초 단위)
    timeout=600,  # 최대 대기 시간 (초 단위)
    mode='poke',  # 실행 모드: 'poke' 또는 'reschedule'
    dag=dag  # 이 Task가 속할 DAG
)

 

6) BigQueryOperator

: Google BigQuery에서 SQL 쿼리를 실행한다. 클라우드 환경에서 데이터를 분석하거나 변환하는 데 주로 사용된다.

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

task = BigQueryInsertJobOperator(
    task_id='execute_bigquery_query',  # Task의 고유 식별자
    gcp_conn_id='my_gcp_connection',  # Airflow에 등록된 GCP 연결 ID
    configuration={
        "query": {
            "query": "SELECT * FROM `my_project.my_dataset.my_table`",  # 실행할 SQL 쿼리
            "useLegacySql": False,  # Standard SQL 사용 여부
        }
    },
    dag=dag  # 이 Task가 속할 DAG
)

 

7) S3FileTransformOperator

: AWS S3 버킷에서 데이터를 가져오고, 변환을 수행한 후 결과를 다시 S3에 저장한다. ETL(Extract, Transform, Load) 작업에서 유용하다.

from airflow.providers.amazon.aws.transfers.s3_file_transform import S3FileTransformOperator

task = S3FileTransformOperator(
    task_id='transform_s3_file',  # Task의 고유 식별자
    source_s3_key='s3://my-bucket/input/data.csv',  # 원본 파일 경로
    dest_s3_key='s3://my-bucket/output/data_transformed.csv',  # 결과 파일 경로
    transform_script='/path/to/transform_script.py',  # 데이터 변환 스크립트
    aws_conn_id='my_aws_connection',  # Airflow에 등록된 AWS 연결 ID
    replace=True,  # 기존 파일 대체 여부
    dag=dag  # 이 Task가 속할 DAG
)

 

8) SSHOperator

: 원격 서버에 SSH로 접속해 명령어를 실행한다. 데이터 처리, 파일 조작, 기타 서버 작업을 자동화할 때 유용하다.

from airflow.providers.ssh.operators.ssh import SSHOperator

task = SSHOperator(
    task_id='run_remote_command',  # Task의 고유 식별자
    ssh_conn_id='my_ssh_connection',  # Airflow에 등록된 SSH 연결 ID
    command='ls -l /path/to/directory',  # 실행할 명령어
    timeout=10,  # 명령어 실행 제한 시간 (초)
    dag=dag  # 이 Task가 속할 DAG
)

 

9) MySQLToGCSOperator

: MySQL 데이터베이스에서 데이터를 추출하여 Google Cloud Storage(GCS)에 저장한다. ETL 파이프라인에서 MySQL 데이터를 클라우드로 이동할 때 사용된다.

from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator

task = MySQLToGCSOperator(
    task_id='mysql_to_gcs',  # Task의 고유 식별자
    mysql_conn_id='my_mysql_connection',  # Airflow에 등록된 MySQL 연결 ID
    sql='SELECT * FROM my_table',  # 실행할 SQL 쿼리
    bucket_name='my-gcs-bucket',  # 데이터를 저장할 GCS 버킷 이름
    object_name='data/exported_data.json',  # 저장할 파일 경로 및 이름
    export_format='json',  # 데이터 저장 형식 (예: JSON, CSV)
    field_delimiter=',',  # 필드 구분자 (CSV일 경우)
    dag=dag  # 이 Task가 속할 DAG
)

 

10) GCSToBigQueryOperator

: Google Cloud Storage(GCS)에서 데이터를 가져와 Google BigQuery에 로드한다. 데이터 분석 파이프라인에서 클라우드 스토리지 데이터를 BigQuery로 전송할 때 유용하다.

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

task = GCSToBigQueryOperator(
    task_id='load_gcs_to_bq',  # Task의 고유 식별자
    bucket='my-gcs-bucket',  # 데이터가 저장된 GCS 버킷 이름
    source_objects=['data/exported_data.json'],  # GCS에서 가져올 파일 경로(리스트 형태)
    destination_project_dataset_table='my_project.my_dataset.my_table',  # BigQuery 대상 테이블
    write_disposition='WRITE_TRUNCATE',  # 기존 데이터를 덮어쓸지 여부
    source_format='NEWLINE_DELIMITED_JSON',  # 원본 데이터 형식 (예: JSON, CSV 등)
    autodetect=True,  # 데이터 스키마 자동 감지 여부
    dag=dag  # 이 Task가 속할 DAG
)