배치 데이터 처리

Apache Airflow

BUST 2018. 7. 7. 21:27

Airflow

- Airflow는 스케쥴, workflow 모니터 플랫폼이다.
- 작업의 단위는 DAG(Directed acyclic graphs)로 표현한다.
- Python 언어로 DAG File를 구성하고, 그 내부에는 여러개의 Task가 존재를 한다.

특징

- Dynamic : Airflow는 Pipeline의 설정은 코드(Python) 언어로 작성이 되기 때문에 동적으로 작동되는 코드를 작성을 할수가 있다.
- Extensible : operators, executors, ...
- Elegant : 간결하고, 명확, jinja 템플릿 엔진을 이용한 템플릿 기능 등.
- Scalable : 스케일 아웃이 가능하다. message queue를 이용하여 task를 worker에게 분배를 한다. (celery, dask 등을 이용하여 scaling out이 가능하다.)

Quick Start

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

Tutorial

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# Default Arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

- DAG 정의 Python Code

- Default Argument

- Operator Task을 기능을 정의

- Jinja를 이용한 Template 기능

- set_upstream를 이용한 taks의 Dependencies설정



DAG Schedule

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval' : None  ## Schedule config
}
  • schedule_interval
    • None - Schedule 없는 경우
      • 'None'이 아닌 파이썬 객체 None이다.
    • @once- 한번만 실행
    • @hourly
    • @daily
    • @weekly
    • @monthly
    • @yearly
    • cron : * * * * * (min, hour, date, month, day)

Variables

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)


Jinja Template

date = "{{ ds }}"
t = BashOperator(
    task_id='test_env',
    bash_command='/tmp/test.sh ',
    dag=dag,
    env={'EXECUTION_DATE': date})

UI

DAGs View

_images/dags.png

Tree View

_images/tree.png

Graph View

_images/graph.png

Reference

- Apache Airflow (https://airflow.apache.org/)
- Airflow UI (https://airflow.apache.org/ui.html)


'배치 데이터 처리' 카테고리의 다른 글

Jenkins을 Batch Scheduler로 활용하기  (0) 2018.06.17
Spring batch  (0) 2017.07.13