Airflow
- Airflow는 스케쥴, workflow 모니터 플랫폼이다.
- 작업의 단위는 DAG(Directed acyclic graphs)로 표현한다.
- Python 언어로 DAG File를 구성하고, 그 내부에는 여러개의 Task가 존재를 한다.
특징
- Dynamic : Airflow는 Pipeline의 설정은 코드(Python) 언어로 작성이 되기 때문에 동적으로 작동되는 코드를 작성을 할수가 있다.
- Extensible : operators, executors, ...
- Elegant : 간결하고, 명확, jinja 템플릿 엔진을 이용한 템플릿 기능 등.
- Scalable : 스케일 아웃이 가능하다. message queue를 이용하여 task를 worker에게 분배를 한다. (celery, dask 등을 이용하여 scaling out이 가능하다.)
Quick Start
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install apache-airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
Tutorial
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Default Arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
- DAG 정의 Python Code
- Default Argument
- Operator Task을 기능을 정의
- Jinja를 이용한 Template 기능
- set_upstream를 이용한 taks의 Dependencies설정
DAG Schedule
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval' : None ## Schedule config
}
- schedule_interval
- None - Schedule 없는 경우
- 'None'이 아닌 파이썬 객체 None이다.
- @once- 한번만 실행
- @hourly
- @daily
- @weekly
- @monthly
- @yearly
- cron : * * * * * (min, hour, date, month, day)
Variables
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
Jinja Template
date = "{{ ds }}"
t = BashOperator(
task_id='test_env',
bash_command='/tmp/test.sh ',
dag=dag,
env={'EXECUTION_DATE': date})
UI
DAGs View
Tree View
Graph View
Reference
- Apache Airflow (https://airflow.apache.org/)
- Airflow UI (https://airflow.apache.org/ui.html)
'배치 데이터 처리' 카테고리의 다른 글
Jenkins을 Batch Scheduler로 활용하기 (0) | 2018.06.17 |
---|---|
Spring batch (0) | 2017.07.13 |