💡 이번 포스팅에서는 이런 것들을 다뤄요
-Airflow의 간단한 소개
-Airflow의 DAG 개념과 예시를 살펴봅니다.
-Airflow의 Task 개념과 예시를 살펴봅니다.
-Airflow의 Operator 개념과 예시를 살펴봅니다.
🏃들어가며
DAG와 Task는 Airflow의 핵심 구성요소 중 하나이다. 여기서는 Airflow의 목적에 대해 간단히 소개하고 DAG와 Task, Operator에 대해서 설명하고자 한다.
🌊Airflow란?
Apache Airflow는 데이터 파이프라인을 작성하고 관리하는 플랫폼이다. 여러 작업을 순차적으로 혹은 병렬로 실행할 수 있는 Workflow를 설계하고 이를 자동화하고 모니터링하는 데에 유용한 툴이다. 데이터 엔지니어링, ETL(Extract, Transform, Load), 데이터 분석 파이프라인 구축 등에 사용된다.
🗺️DAG(Directed Acyclic Graph) 란?
DAG는 작업(Task) 간의 의존 관계와 실행 순서를 나타내는 그래프이다. DAG는 방향성을 가지며 순환(자기 자신으로 돌아가는 경로)이 없도록 설계된다.
다시말해 DAG는 작업의 순서를 결정하는 작업의 설계도 역할을 한다고 보면 된다. 더불어 Airflow에만 쓰이는 용어가 아니고 그래프의 이름이니 유의하자!
DAG의 주요 속성은 다음과 같다.
- 작업간 관계 정의: Python 파일로 정의되며, 작업 간의 관계를 코드로 표현한다.
- 스케줄링: 특정 시간 간격으로 워크플로를 실행하도록 설정할 수 있다.
- 의존 관계 관리: 작업의 실행 순서를 명확히 설정할 수 있다.
DAG는 다음과 같은 구조로 작성될 수 있다.
import datetime, pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
# DAG 작성
with DAG(
dag_id="01_tutorial_dag",
start_date=pendulum.datetime(2024, 11, 1, tz='Asia/Seoul'),
schedule="30 10 * * *", # cron 표현식
tags = ['20241127'],
default_args = default_args
) as dag:
# Task 작성
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task4 = EmptyOperator(task_id="task4")
task5 = EmptyOperator(task_id="task5")
# task1 >> task2 >> task3
task1 >> task2 >> task4
task1 >> task3 >> task5
✅TASK란?
Task는 DAG 내에서 실행되는 개별 작업 단위를 의미한다. Task는 Airflow Operator에 의해 정의되며, 각 Operator는 특정 작업 유형을 처리한다. 예를 들어, Python 코드를 실행하거나, 데이터를 이동시키거나, Bash 명령어를 실행하는 등의 역할을 수행할 수 있다.
Task는 다음과 같은 구조로 작성될 수 있다.
#예시: BashOperator
t1 = BashOperator(
task_id="print_first_date",
bash_command="date",
)
# 5초간 대기
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
# 현재 날짜 및 시간 출력
t3 = BashOperator(
task_id="print_second_date",
bash_command="date",
)
🕹️Operator란?
Operator는 Airflow에서 작업(Task)을 정의하는 가장 기본적인 단위다.
DAG는 작업 간의 관계와 순서를 정의하고, 작업(Task)은 실제로 수행할 일을 지정한다. 이때, Operator는 작업의 구체적인 유형과 동작을 정의한다. 즉, Operator는 "무엇을 해야 하는가?"를 결정하는 역할을 한다.
Operator의 종류
Operator의 종류는 다양하다. 다음은 대표적인 Operator들이다.
1) PythonOperator
: Python 함수를 실행한다.
from airflow.operators.python_operator import PythonOperator
def my_function():
print("Executing a Python function!")
task = PythonOperator(
task_id='run_python_function', # Task의 고유 식별자
python_callable=my_function, # 실행할 Python 함수
dag=dag # 이 Task가 속할 DAG
)
2) BashOperator
: Bash 명령어를 실행한다.
from airflow.operators.bash_operator import BashOperator
task = BashOperator(
task_id='run_bash_command', # Task의 고유 식별자
bash_command='echo "Hello, Airflow!"', # 실행할 Bash 명령어
dag=dag # 이 Task가 속할 DAG
)
3) EmailOperator
: 이메일을 전송한다.
from airflow.operators.email_operator import EmailOperator
task = EmailOperator(
task_id='send_email', # Task의 고유 식별자
to='example@example.com', # 수신자 이메일 주소
subject='Airflow Test Email', # 이메일 제목
html_content='This is a test email from Airflow.', # 이메일 본문 (HTML 가능)
dag=dag # 이 Task가 속할 DAG
)
4) PostgresOperator
: PostgreSQL 데이터베이스에서 SQL 쿼리를 실행한다.
from airflow.providers.postgres.operators.postgres import PostgresOperator
task = PostgresOperator(
task_id='execute_sql_query', # Task의 고유 식별자
postgres_conn_id='my_postgres_conn', # Airflow에 등록된 PostgreSQL 연결 ID
sql='SELECT * FROM my_table;', # 실행할 SQL 쿼리
dag=dag # 이 Task가 속할 DAG
)
5) Sensor
: 특정 조건이 충족될 때까지 기다린다. Sensor는 작업(Task) 실행 전에 의존성이나 외부 이벤트가 완료되었는지 확인하는 역할을 한다.
from airflow.sensors.filesystem import FileSensor
task = FileSensor(
task_id='wait_for_file', # Task의 고유 식별자
filepath='/path/to/file', # 확인할 파일 경로
poke_interval=30, # 파일을 확인할 주기 (초 단위)
timeout=600, # 최대 대기 시간 (초 단위)
mode='poke', # 실행 모드: 'poke' 또는 'reschedule'
dag=dag # 이 Task가 속할 DAG
)
6) BigQueryOperator
: Google BigQuery에서 SQL 쿼리를 실행한다. 클라우드 환경에서 데이터를 분석하거나 변환하는 데 주로 사용된다.
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
task = BigQueryInsertJobOperator(
task_id='execute_bigquery_query', # Task의 고유 식별자
gcp_conn_id='my_gcp_connection', # Airflow에 등록된 GCP 연결 ID
configuration={
"query": {
"query": "SELECT * FROM `my_project.my_dataset.my_table`", # 실행할 SQL 쿼리
"useLegacySql": False, # Standard SQL 사용 여부
}
},
dag=dag # 이 Task가 속할 DAG
)
7) S3FileTransformOperator
: AWS S3 버킷에서 데이터를 가져오고, 변환을 수행한 후 결과를 다시 S3에 저장한다. ETL(Extract, Transform, Load) 작업에서 유용하다.
from airflow.providers.amazon.aws.transfers.s3_file_transform import S3FileTransformOperator
task = S3FileTransformOperator(
task_id='transform_s3_file', # Task의 고유 식별자
source_s3_key='s3://my-bucket/input/data.csv', # 원본 파일 경로
dest_s3_key='s3://my-bucket/output/data_transformed.csv', # 결과 파일 경로
transform_script='/path/to/transform_script.py', # 데이터 변환 스크립트
aws_conn_id='my_aws_connection', # Airflow에 등록된 AWS 연결 ID
replace=True, # 기존 파일 대체 여부
dag=dag # 이 Task가 속할 DAG
)
8) SSHOperator
: 원격 서버에 SSH로 접속해 명령어를 실행한다. 데이터 처리, 파일 조작, 기타 서버 작업을 자동화할 때 유용하다.
from airflow.providers.ssh.operators.ssh import SSHOperator
task = SSHOperator(
task_id='run_remote_command', # Task의 고유 식별자
ssh_conn_id='my_ssh_connection', # Airflow에 등록된 SSH 연결 ID
command='ls -l /path/to/directory', # 실행할 명령어
timeout=10, # 명령어 실행 제한 시간 (초)
dag=dag # 이 Task가 속할 DAG
)
9) MySQLToGCSOperator
: MySQL 데이터베이스에서 데이터를 추출하여 Google Cloud Storage(GCS)에 저장한다. ETL 파이프라인에서 MySQL 데이터를 클라우드로 이동할 때 사용된다.
from airflow.providers.google.cloud.transfers.mysql_to_gcs import MySQLToGCSOperator
task = MySQLToGCSOperator(
task_id='mysql_to_gcs', # Task의 고유 식별자
mysql_conn_id='my_mysql_connection', # Airflow에 등록된 MySQL 연결 ID
sql='SELECT * FROM my_table', # 실행할 SQL 쿼리
bucket_name='my-gcs-bucket', # 데이터를 저장할 GCS 버킷 이름
object_name='data/exported_data.json', # 저장할 파일 경로 및 이름
export_format='json', # 데이터 저장 형식 (예: JSON, CSV)
field_delimiter=',', # 필드 구분자 (CSV일 경우)
dag=dag # 이 Task가 속할 DAG
)
10) GCSToBigQueryOperator
: Google Cloud Storage(GCS)에서 데이터를 가져와 Google BigQuery에 로드한다. 데이터 분석 파이프라인에서 클라우드 스토리지 데이터를 BigQuery로 전송할 때 유용하다.
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
task = GCSToBigQueryOperator(
task_id='load_gcs_to_bq', # Task의 고유 식별자
bucket='my-gcs-bucket', # 데이터가 저장된 GCS 버킷 이름
source_objects=['data/exported_data.json'], # GCS에서 가져올 파일 경로(리스트 형태)
destination_project_dataset_table='my_project.my_dataset.my_table', # BigQuery 대상 테이블
write_disposition='WRITE_TRUNCATE', # 기존 데이터를 덮어쓸지 여부
source_format='NEWLINE_DELIMITED_JSON', # 원본 데이터 형식 (예: JSON, CSV 등)
autodetect=True, # 데이터 스키마 자동 감지 여부
dag=dag # 이 Task가 속할 DAG
)
'👩🏻💻TECH > 개념정리' 카테고리의 다른 글
[개념정리] On-premise, Cloud, Serverless 데이터 웨어하우스 (0) | 2024.12.09 |
---|---|
[GCP] BigQuery 성능 최적화 (0) | 2024.12.08 |
[개념정리] 데이터 모델링: 모델링의 단계, 구성요소 (0) | 2024.11.24 |
[개념정리] 데이터베이스 정규화 (3) | 2024.11.22 |
[개념정리] K-폴드 교차검증 (0) | 2024.10.28 |