Aiflow

airflow 구축 하기(with celery executor)

susumong 2023. 2. 7. 16:14

version

  • airflow version 2.5.1
  • python 3.7.7
  • postgresql

구축 과정

  • airflow 공식 문서에서 제공하는 docker-compose.yml 파일

Running Airflow in Docker - Airflow Documentation

curl -LfO '<https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml>'

docker-compose up을 하기전에 실행 경로에 ./dags ./logs ./plugins 세개의 디렉토리를 생성한다.

만약 다른 경로에 만들고 싶다면 AIRFLOW_PROJ_DIR 를 수정해주면 된다.

데이터 마이그레이션을 실행하고 첫 번째 사용자 계정을 생성하기 위해 아래와 같은 명령어를 입력한다.

docker compose up airflow-init

초기화가 끝난면 다음과 같은 메시지를 볼 수 있다.

이후 docker-compose up 을 하면 celery executor를 airflow 환경을 확인 할 수 있다.

만약 docker log를 보고 싶지 않다면 -d 옵션을 주면 된다.

  • airflow webui

flower를 실행하기 위해 docker-compose up -d flower 를 한다.

  • flower

만약 뭔가 꼬여서 처음부터 다시 하고 싶다면..!

docker-compose down --volumes --remove-orphans

추가로 airflow cluster를 구성해 다른 서버에 worker를 띄우는 방법은 다음과 같다.

먼저 fernet_key를 생성한다.

그리고 이 fernet_key를 docker-compose.yml의 AIRFLOW__CORE__FERNET_KEY 에 넣어준다.

그리고 docker-compose를 --forece-recreate 옵션을 사용해 다시 실행한다.

다음으로 worker node용 Dockerfile을 작성한다.

FROM python:3.7.16-slim

# install airflow
ENV AIRFLOW_HOME="/root/airflow"
ENV AIRFLOW__WORKER__NAME="worker_node"

ENV AIRFLOW__MASTER__IP="HOST_IP"
ENV AIRFLOW__POSTGRESQL__HOST=${AIRFLOW__MASTER__IP}:5432
ENV AIRFLOW__REDIS__HOST=${AIRFLOW__MASTER__IP}:6379
ENV AIRFLOW__CORE__EXECUTOR="CeleryExecutor"
ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CELERY__RESULT_BACKEND="db+postgresql://airflow:airflow@${AIRFLOW__POSTGRESQL__HOST}/airflow"
ENV AIRFLOW__CELERY__BROKER_URL="redis://:@${AIRFLOW__REDIS__HOST}/0"
ENV AIRFLOW__LOGGING__BASE_LOG_FOLDER="${AIRFLOW_HOME}/log"
ENV AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY="${AIRFLOW_HOME}/log/scheduler"
ENV AIRFLOW__LOGGING__DAG_PROCESSOR_MANAGER_LOG_LOCATION="${AIRFLOW_HOME}/log/dag_processor_manager/dag_processor_manager.log"
ENV AIRFLOW__CORE__HOSTNAME_CALLABLE="airflow.utils.net.get_host_ip_address"
ENV AIRFLOW__CORE__FERNET_KEY=""
ENV AIRFLOW__WEBSERVER__SECRET_KEY=""
ENV AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION="true"
ENV AIRFLOW__CORE__LOAD_EXAMPLES="false"
ENV AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth"
ENV _pip_ADDITIONAL_REQUIREMENTS="${_pip_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-google}"

# pip update
RUN apt-get update && apt-get install -y python3-distutils python3-setuptools
RUN python3 -m pip install pip --upgrade pip
# RUN apt-get install python3.7-dev libmysqlclient-dev gcc -y
RUN python3 -m pip install cffi

# install airflow
RUN pip install apache-airflow[celery]==2.5.0
RUN pip install psycopg2-binary
RUN pip install Redis
RUN airflow db init
RUN mkdir -p ./airflow/dags
RUN mkdir ./airflow/plugins

# healthcheck
HEALTHCHECK --interval=10s --timeout=10s --retries=5 CMD airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"

AIRFLOW__CORE__FERNET_KEY에 앞서 발급한 fernet_key를 넣어준다.

Dockerfile을 빌드하고 실행을 하면

docker build . worker:test
docker run -it worker:test airflow celery 

flower에서 worker node가 하나 더 생겼음을 확인할 수 있다.

worker node를 실행시키는 명령어 -H 와 -q 옵션을 통해 worker와 queue의 이름을 정할 수 있다.

참고 문서

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

 

Running Airflow in Docker — Airflow Documentation

 

airflow.apache.org

https://jaeyung1001.tistory.com/286

 

[Airflow] 다중 클러스터에서 Airflow Celery Worker 환경 구성하기

이번에 다중 클러스터(즉, 여러대의 컴퓨터)에서 Airflow Celery Worker환경을 세팅해야하는 업무가 생겨서 작업을했다. 과정중에 무수히 많은 시행착오가 있었고, 다음에는 이를 방지하기위해 글을

jaeyung1001.tistory.com