Airflow 개요


Airflow에 대하여

아파치 에어플로우

  • Workflow Management Tool

    • 에어비엔비에서 만듬
    • Workflow는 Task들의 연결이라고 볼 수 있음
  • 활용가능 영역

    • ETL 파이프라인
      • 데이터를 source에서 가져와서 데이터 마트, 데이터 웨어하우스 등에 저장
    • 머신러닝 엔지니어링
      • 머신러닝 모델 주기적인 학습(1주 간격), 예측(30분 간격)
      • 실시간 API가 아닌 Batch성 예측
    • 간단한 cron 작업 (스케줄러)
  • 여러 작업들의 연결성(디펜던씨) Management

    • 앞의 작업이 성공해야 뒤 작업을 하도록 설정

    • 여러가지 작업을 효율적으로 관리(시각화 등)

Apache Airflow의 장점

  • Python 기반이고 Scheduling : 특정 간격으로 계속 실행
  • Backfill : 과거 작업 실행
  • 특정 Task 실패시 => Task만 재실행 / DAG 재실행 등 실패 로직도 있음
  • UI 예시 (출처 : https://assets.astronomer.io/website/img/guides/dags_dashboard.png)

  • Graph View ? (출처: https://miro.medium.com/max/3200/0*jdPLCqkK_XiQYM8X.png )

  • UTC란 무엇인가?

    • 협정 세계시로 1972년 1월 1일부터 시행된 국제 표준시
    • 서버에서 시간 처리할 땐, 거의 UTC를 사용하며 한국은 UTC+9hour 임
    • Airflow에서 UTC를 사용하기 때문에, CRON 표시할 때 UTC 기준으로 작성
      • 예 : UTC 30 1 * * * => 한국은 30 10 * * * => 한국 오전 10시 30분
  • Airflow 실행

    • airflow webserver와 airflow scheduler 2개를 모두 실행 필요

    • 터미널 1개에 webserver를 띄우고, command+t로 새로운 터미널 탭을 켜고 scheduler를 띄울 수 있다.

    • 명령어

      airflow webserver
      airflow scheduler
      
    • tutorial DAG을 실행(Links 아래에 있는 재생 버튼)

      • ValueError: unknown locale: UTF-8 에러가 날경우 ~/.zshrc 또는 ~/.bash_profile에 아래 설정 추가
        export LC_ALL=en_US.UTF-8
        export LANG=en_US.UTF-8
      
      • 그 후 터미널에서 아래 커맨드 실행하고 webserver 다시 실행
        source ~/.zshrc
        # 또는 source ~/.bash_profile
      
  • 아키텍쳐

  • Airflow Webserver

    • 웹 UI를 표시해주고, workflow 상태도 표시 및 실행, 재시작, 수동 조작, 로그 확인 등 가능
  • Airflow Scheduler

    • 작업 기준이 충족되는지 여부를 확인
  • DAG란? (출처: https://hazelcast.com/wp-content/uploads/2019/08/diagram-DirectedAcrylicGraph-400x314.png)

    • Directed Acyclic Graphs
    • 방향이 있는 비순환 그래프를 의미하고 이러한 이유로 마지막 Task가 다시 처음 Task로 이어지지 않음
  • Python Code로 알아봅시다

    • 예시1)
  • 1) Default Argument 정의

    • start_date가 중요! 과거 날짜를 설정하면 그 날부터 실행

    • retries, retry_delay : 실패할 경우 몇분 뒤에 재실행할지?

    • priority_weight : 우선 순위

    • 외에도 다양한 옵션이 있는데, 문서 참고

      default_args = {
        'owner': 'your_name',
        'depends_on_past': False,
        'start_date': datetime(2018, 12, 1),
        'email': ['your@mail.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'priority_weight': 10,
        'end_date': datetime(2018, 12, 3),
        # end_date가 없으면 계속 진행함
      }
      
  • 2) DAG 객체 생성

    • 첫 인자는 dag_id인데 고유한 id 작성

    • default_args는 위에서 정의한 argument를 넣고

    • schedule_interval은 crontab 표현 사용

      • schedule_interval=’@once’는 한번만 실행. 디버깅용으로 자주 사용
      • 5 4 * * * 같은 표현을 사용
      • 더 궁금하면 crontab guru 참고
      dag = DAG('bash_dag', default_args=default_args, schedule_interval='@once'))
      
  • 3) Operator로 Task 정의

    • Operator가 Instance가 되면 Task라 부름

    • BashOperator : Bash Command 실행

    • PythonOperator : Python 함수 실행

    • BigQueryOperator : BigQuery 쿼리 날린 후 Table 저장

    • 외에도 다양한 operator가 있고, operator마다 옵션이 다름

    • Airflow Document, Integration Operator 참고

    • mysql_to_hive 등도 있음

      task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
          
      task2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=2,
        dag=dag)
          
      task3 = BashOperator(
        task_id='pwd',
        bash_command='pwd',
        dag=dag)
      
  • 4) task 의존 설정

    • task1 후에 task2를 실행하고 싶다면

      • task1.set_downstream(task2)
      • task2.set_upstream(task1)
    • 더 편해지면서 >><< 사용 가능

    • task1 » task2로 사용 가능

    • task1 » [task2, task3]는 task1 후에 task2, task3 병렬 실행을 의미

      task1 >> task2
      task1 >> task3
      
  • 5) DAG 파일을 DAG 폴더에 저장해 실행되는지 확인
    • DAG 폴더에 넣고 바로 Webserver에 반영되진 않고 약간의 시간이 필요함
    • 수정하고 싶으면 ~/airflow/airflow.cfg에서 dagbag_import_timeout, dag_file_processor_timeout 값을 수정하면 됨

6) 디버깅

  • DAG이 실행되는지 확인 => 실행이 안된다면 DAG의 start_date를 확인
  • 실행되서 초록색 불이 들어오길 기도
  • 만약 초록이 아닌 빨간불이면 Task를 클릭해서 View log 클릭

Airflow BashOperator 사용하기

  • 01-bash_operator.py 참고

  • 앞에서 예제로 보여준 BashOperator 내용을 타이핑해보기 (5분)

    • default_argument에서 start_date는 datetime(2019, 2, 13)
    • DAG의 schedule_interval은 0 10 * * * 입력
  • 파일명은 airflow_test.py

  • (따로 설정 안했다면)

    ~/airflow/dags
    

    에 저장하면 됨

    • dags 폴더가 없다면 생성
  • dags에 airflow_test.py 저장

  • 지금은 간단한 bash command를 사용했지만, bash로 파이썬 파일도 실행할 수 있으니 활용 포인트가 무궁무진함

  • 재실행하고 싶으면 Task 클릭 후 Clear 클릭

  • PythonOperator(task_id, python_callable, op_args, dag, provide_context, templates_dict)로 사용함
    • task_id는 task의 id(예 : print_current_date)
    • python_callable는 호출 수 있는 python 함수를 인자로 넣음
    • op_args : callable 함수가 호출될 때 사용할 함수의 인자
    • dag : DAG 정의한 객체 넣으면 됨
    • provide_context : True로 지정하면 Airflow에서 기본적으로 사용되는 keyword arguments 등이 사용 가능하게 됨
    • templates_dict : op_args 등과 비슷하지만 jinja template이 변환됨
  • Jinja 템플릿쓰기
    • 04-python_operator_with_jinja.py 참고
    • "" 이런 형태로 사용함 : execution_date
    • PythonOperator는 기본 context 변수 사용이 더 쉽지만, 다른 Operator는 Jinja Template이 편함
    • PythonOperator는 templates_dict에 변수를 넣어서 사용
    • Macros Default Variables Document에 정의되어 있음
  • 백필
    • Context Variable이나 Jinja Template을 사용하면 Backfill을 제대로 사용할 수 있음
    • Backfill : 과거 날짜 기준으로 실행
    • airflow backfill -s START_DATE -e END_DATE dag_id
    • 아래 명령어를 입력해보고 Webserver에 가봅시다
airflow backfill -s 2020-01-05 -e 2020-01-10 python_dag_with_jinja

해당 포스트는 카일스쿨자료를 인용하였음을 밝힙니다. 감사합니다.!

출처:https://zzsza.github.io/kyle-school/week6/#/2/25

이 글이 도움이 되셨다면 추천 클릭을 부탁드립니다 :)

Buy me a coffeeBuy me a coffee





© 2020 modified by Tae You Kim

Powered by "shoman2"