Airflow 개요
in Data Science on Engineering
Airflow에 대하여
아파치 에어플로우
Workflow Management Tool
- 에어비엔비에서 만듬
- Workflow는 Task들의 연결이라고 볼 수 있음
활용가능 영역
- ETL 파이프라인
- 데이터를 source에서 가져와서 데이터 마트, 데이터 웨어하우스 등에 저장
- 머신러닝 엔지니어링
- 머신러닝 모델 주기적인 학습(1주 간격), 예측(30분 간격)
- 실시간 API가 아닌 Batch성 예측
- 간단한 cron 작업 (스케줄러)
- ETL 파이프라인
여러 작업들의 연결성(디펜던씨) Management
앞의 작업이 성공해야 뒤 작업을 하도록 설정
여러가지 작업을 효율적으로 관리(시각화 등)
Apache Airflow의 장점
- Python 기반이고 Scheduling : 특정 간격으로 계속 실행
- Backfill : 과거 작업 실행
- 특정 Task 실패시 => Task만 재실행 / DAG 재실행 등 실패 로직도 있음
UI 예시 (출처 : https://assets.astronomer.io/website/img/guides/dags_dashboard.png)
Graph View ? (출처: https://miro.medium.com/max/3200/0*jdPLCqkK_XiQYM8X.png )
UTC란 무엇인가?
- 협정 세계시로 1972년 1월 1일부터 시행된 국제 표준시
- 서버에서 시간 처리할 땐, 거의 UTC를 사용하며 한국은 UTC+9hour 임
- Airflow에서 UTC를 사용하기 때문에, CRON 표시할 때 UTC 기준으로 작성
- 예 : UTC
30 1 * * *
=> 한국은30 10 * * *
=> 한국 오전 10시 30분
- 예 : UTC
Airflow 실행
airflow webserver와 airflow scheduler 2개를 모두 실행 필요
터미널 1개에 webserver를 띄우고, command+t로 새로운 터미널 탭을 켜고 scheduler를 띄울 수 있다.
명령어
airflow webserver airflow scheduler
tutorial DAG을 실행(Links 아래에 있는 재생 버튼)
- ValueError: unknown locale: UTF-8 에러가 날경우
~/.zshrc
또는~/.bash_profile
에 아래 설정 추가
export LC_ALL=en_US.UTF-8 export LANG=en_US.UTF-8
- 그 후 터미널에서 아래 커맨드 실행하고 webserver 다시 실행
source ~/.zshrc # 또는 source ~/.bash_profile
- ValueError: unknown locale: UTF-8 에러가 날경우
아키텍쳐
Airflow Webserver
- 웹 UI를 표시해주고, workflow 상태도 표시 및 실행, 재시작, 수동 조작, 로그 확인 등 가능
Airflow Scheduler
- 작업 기준이 충족되는지 여부를 확인
DAG란? (출처: https://hazelcast.com/wp-content/uploads/2019/08/diagram-DirectedAcrylicGraph-400x314.png)
- Directed Acyclic Graphs
- 방향이 있는 비순환 그래프를 의미하고 이러한 이유로 마지막 Task가 다시 처음 Task로 이어지지 않음
Python Code로 알아봅시다
- 예시1)
-
1) Default Argument 정의
start_date가 중요! 과거 날짜를 설정하면 그 날부터 실행
retries, retry_delay : 실패할 경우 몇분 뒤에 재실행할지?
priority_weight : 우선 순위
외에도 다양한 옵션이 있는데, 문서 참고
default_args = { 'owner': 'your_name', 'depends_on_past': False, 'start_date': datetime(2018, 12, 1), 'email': ['your@mail.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), 'priority_weight': 10, 'end_date': datetime(2018, 12, 3), # end_date가 없으면 계속 진행함 }
2) DAG 객체 생성
첫 인자는 dag_id인데 고유한 id 작성
default_args는 위에서 정의한 argument를 넣고
schedule_interval은 crontab 표현 사용
- schedule_interval=’@once’는 한번만 실행. 디버깅용으로 자주 사용
5 4 * * *
같은 표현을 사용- 더 궁금하면 crontab guru 참고
dag = DAG('bash_dag', default_args=default_args, schedule_interval='@once'))
3) Operator로 Task 정의
Operator가 Instance가 되면 Task라 부름
BashOperator : Bash Command 실행
PythonOperator : Python 함수 실행
BigQueryOperator : BigQuery 쿼리 날린 후 Table 저장
외에도 다양한 operator가 있고, operator마다 옵션이 다름
mysql_to_hive 등도 있음
task1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) task2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=2, dag=dag) task3 = BashOperator( task_id='pwd', bash_command='pwd', dag=dag)
4) task 의존 설정
task1 후에 task2를 실행하고 싶다면
- task1.set_downstream(task2)
- task2.set_upstream(task1)
더 편해지면서
>>
나<<
사용 가능task1 » task2로 사용 가능
task1 » [task2, task3]는 task1 후에 task2, task3 병렬 실행을 의미
task1 >> task2 task1 >> task3
- 5) DAG 파일을 DAG 폴더에 저장해 실행되는지 확인
- DAG 폴더에 넣고 바로 Webserver에 반영되진 않고 약간의 시간이 필요함
- 수정하고 싶으면
~/airflow/airflow.cfg
에서 dagbag_import_timeout, dag_file_processor_timeout 값을 수정하면 됨
6) 디버깅
- DAG이 실행되는지 확인 => 실행이 안된다면 DAG의 start_date를 확인
- 실행되서 초록색 불이 들어오길 기도
- 만약 초록이 아닌 빨간불이면 Task를 클릭해서 View log 클릭
Airflow BashOperator 사용하기
01-bash_operator.py 참고
앞에서 예제로 보여준 BashOperator 내용을 타이핑해보기 (5분)
- default_argument에서 start_date는 datetime(2019, 2, 13)
- DAG의 schedule_interval은
0 10 * * *
입력
파일명은 airflow_test.py
(따로 설정 안했다면)
~/airflow/dags
에 저장하면 됨
- dags 폴더가 없다면 생성
dags에 airflow_test.py 저장
지금은 간단한 bash command를 사용했지만, bash로 파이썬 파일도 실행할 수 있으니 활용 포인트가 무궁무진함
재실행하고 싶으면 Task 클릭 후 Clear 클릭
- PythonOperator(task_id, python_callable, op_args, dag, provide_context, templates_dict)로 사용함
- task_id는 task의 id(예 : print_current_date)
- python_callable는 호출 수 있는 python 함수를 인자로 넣음
- op_args : callable 함수가 호출될 때 사용할 함수의 인자
- dag : DAG 정의한 객체 넣으면 됨
- provide_context : True로 지정하면 Airflow에서 기본적으로 사용되는 keyword arguments 등이 사용 가능하게 됨
- templates_dict : op_args 등과 비슷하지만 jinja template이 변환됨
- Jinja 템플릿쓰기
- 04-python_operator_with_jinja.py 참고
""
이런 형태로 사용함 : execution_date- PythonOperator는 기본 context 변수 사용이 더 쉽지만, 다른 Operator는 Jinja Template이 편함
- PythonOperator는 templates_dict에 변수를 넣어서 사용
- Macros Default Variables Document에 정의되어 있음
- 백필
- Context Variable이나 Jinja Template을 사용하면 Backfill을 제대로 사용할 수 있음
- Backfill : 과거 날짜 기준으로 실행
- airflow backfill -s START_DATE -e END_DATE dag_id
- 아래 명령어를 입력해보고 Webserver에 가봅시다
airflow backfill -s 2020-01-05 -e 2020-01-10 python_dag_with_jinja
해당 포스트는 카일스쿨자료를 인용하였음을 밝힙니다. 감사합니다.!
출처:https://zzsza.github.io/kyle-school/week6/#/2/25
이 글이 도움이 되셨다면 추천 클릭을 부탁드립니다 :)