[Airflow] AirFlow 란?
0. 관련 포스팅
- [Airflow] Airflow 란?
- [Airflow] Airflow 설치 (with. docker compose)
1. 개요
Airflow는 배치 중심 WorkFlow를 개발, 예약 및 모니터링하기 위한 오픈 소스 플랫폼이며,
2014년 10월 에어비앤비에서 점차 복잡해지는 WorkFlow를 관리하기 위한 해결책으로서 시작하였다.
Airflow의 확장 가능한 Python 프레임워크를 사용하면 거의 모든 기술과 연결되는 WorkFlow를 구축할 수 있다.
제공되는 웹 인터페이스는 작업 흐름 상태를 관리 하는데 유용하고, 단일 프로세스부터 분산 설정까지 다양한 방식으로 배포 할수 있다.
2. 특징
- Dynamic
: Airflow 파이프라인이 Python 코드로 구성되어 동적 파이프라인 생성이 가능 - Extensible
: Airflow 프레임워크에는 다양한 기술과 연결하는 연산자가 포함되어 있다. 모든 Airflow 구성요소는 확장 가능하여 환경에 맞게 쉽게 조정할 수 있다. - Flexible
: Jinja 템플릿 엔진을 활용하여 WorkFlow 매개변수화가 내장되어있다.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# Set dependencies between tasks
hello >> airflow()
3. 기본개념
a. DAG (Directed Acyclic Graph)
방향성 비순환 그래프의 줄임말이며 용어 자체는 서로 순환하지 않는 구조로 짜여진 그래프를 의미한다.
Airflow에서의 DAG는 작업 자체가 아닌 실행방법(실행 순서, 재시도 횟수, 시간 초과 등..)에 초점이 맞춰져있다.
하나 이상의 Task 언제, 얼마나, 어떤 조건에서 동작할지를 나타내고 Task의 흐름을 정의하고 흐름 제어의 조건은 아래와 같이 있다.
- Branching : 조건에 따라 이동할 작업 선택
- Trigger Rules : DAG 작업이 실행되는 조건을 설정
- Setup and Teardown : 설정 및 해제 관계 정의
- Last Only : 현재 시점에서 실행되는 DAG에서만 실행되는 분기
- Depends On Past : 이전 작업이 성공했을 경우에만 작업이 실행
b. Task
Task는 Airflow의 기본 실행 단위이며, DAG에서 순서가 정의되며 DAG 사이에 업스트림 및 다운스트림 종속성을 설정한다.
Task의 기본 종류는 Operator, Sensor, TaskFlow-decorator 가 있지만 내부적으로는 BaseOperator의 하위 클래스이며 Task와 Operator의 개념이 어느 정도 유사하지만 별도의 개념으로 이해하는 것이 편하다고 한다.
c. Operator
Operator는 개념적으로 사전 정의된 Task에 대한 템플릿으로 DAG 내에서 선언적으로 정의할 수 있다.
with DAG("my-dag") as dag:
ping = SimpleHttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
BashOperator, PythonOperator, EmailOperator 등이 존재하고
Database, Docker, File, Slack 등 다양한 Operator 를 제공하여 템플릿에 맞게 데이터를 입력하여 간단하게 Task를 만들어 줄수 있다.
d. Executor
Executor는 Task Instance를 실행하기 위한 매커니즘이다.
Airflow에서 사용하는 Executor의 종류가 몇 가지 존재하며 common API와 pluggable 하기 때문에 상황에 따라 교체 설치하여 사용이 가능하다.
Executor의 타입은 크게 두가지로 나누어져있는데,
내부 방식(scheduler processor)과 원격 방식(pool of worker) 방식이 있다.
(문서에는 내부 방식을 Local Executor로 표기하고 있는데 원격 방식에도 Local 단어가 들어가는 Executor가 존재하니 내부 방식이라 이 포스팅에서는 부르겠다.)
내부 방식의 경우 Airflow 프로세스 안에서 작동하기 때문에 느리고 병렬 처리가 안되는 Executor도 있기에 원격 방식을 많이 사용하는 편이다.
Airflow 사용 예시에서는 Local Executor, Celery Executor, Kubernetes Excutor 등이 주로 언급되는듯 하다.
각 Executor 에 대한 특징들은 다른 블로거 분이 잘 정리해주신 글이 있어서 아래 링크로 첨부하겠다.
https://magpienote.tistory.com/225
[Airflow] Executor의 종류와 사용가이드라인 알아보기(Sequential, Local, Celery, Kubernetes)
Airflow Executor란? executor란 작업자들에게 작업을 실행 시키는 역할을 한다. 종류로는 SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor등이 있다. LocalExecutor는 병렬처리까지만 가능하고 클러스
magpienote.tistory.com
4. 아키텍처
아래 이미지는 Airflow의 아키텍처 구성도이다.
명칭이 직관적인 편이여서 쉽게 이해할 수 있겠지만
기본 개념을 어느정도 살펴보고 보면 조금 더 이해하기 편할듯하여 4번째 순서에 두었다.
- DAG Directory
: DAG 파일이 담긴 디렉토리 (개발자가 작성하는 파일 모음을 생각하면 된다.) - Excutor, Scheduler
: DAG 파일을 실행하는 주체이다. - Worker
: 실질적인 실행을 담당하며, 설정에 따라 병렬처리로 작업이 되기도 한다. (Celery Executor로 실행이 되는 경우 병렬 작업을 조율하기 위해 Redis가 필요로 하지만 다른 MQ 로도 대체 가능해 보인다.) - Webserver
: 작성한 DAG 파일과 기타 정보들을 확인 가능하도록 Webserver, User Interface가 구성되어 있으며 외부 통신 담당을 한다. - Metadata Database
: Airflow의 모든 데이터가 저장된다. Database가 구성되어야하며 일반적으로 많이 사용되는 MySQL, MariaDB, Postgresql 등이 사용되며 각 DB 별 사용 가능한 버전이 정의되어 있으니 확인하고 설치해야한다.
Airflow를 설치하면 아래처럼 Metadata에 대한 테이블과 예제 데이터가 들어가 있는 볼 수 있고,
Web UI 상에서 확인이 가능한 것을 볼 수 있다.
5. 마무리
이전에 Airflow를 사용 검토 했었던 적이 있는데 당시에는 Spring Batch + Scheduler 구성으로 작업하였어서 사이드 플젝 진행하면서 이번에는 Airflow를 조금 더 살펴보고 공부해보기로 하였다.
자료를 찾아보니깐 국내에는 오늘의집(버킷플레이스)에서 사용하고 있어서 해당 포스팅 참고해도 좋을거 같고 사용법이나 용어가 생소한 편이라 참고해가면서 정리해나가야할 것 같다.
(Spring Cloud Data Flow 와 유사한듯하니 함께보면서 비교해봐도 좋을 듯 하다.)
다음 포스팅은 Docker Compose로 Airflow 설치에 관한 포스팅 예정이다.
6. 참고자료
What is Airflow™? — Airflow Documentation
airflow.apache.org