Airflow

Airflow custom Operator 만들기

pepega 2022. 3. 16. 20:15

공식 문서를 기반으로 글과 코드를 작성하였다.

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

 

코드는 아래에 있다.

https://github.com/GHGHGHKO/airflow/commit/37966fd4e8774e04c95f6393fd796b86573d4dc2

 

예시를 들어서 내용을 적도록 하겠다.

 

테이블 스페이스가 부족할 때

경고 메일을 보내는 스케줄을 추가하려고 한다.

 

메일 내용에

특정 테이블 스페이스 이름을 넣은 채로

메일을 보내고 싶은데

--테이블 스페이스 이름 조회
select name from table_space;

 

Airflow에서 제공하는

SQLOperator는 Airflow의 XCOM으로 return 할 수 없다.

 

그래서 해결 할 방법으로

 

Operator를 custom하는 방법이다.

 

원하는 Operator를 상속 받아서 만들면 된다.

 

dags, logs, plugins 디렉터리 중 plugins 디렉터리 안에 생성했다.

 

plugins/hello_operator.py

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

 

아래와 같이 사용할 수 있다.

import datetime
import pendulum

from airflow import DAG

from hello_operator import HelloOperator

KST = pendulum.timezone("Asia/Seoul")

default_args = {
    'owner': 'pepega',
}

with DAG(
    dag_id="custom_operator",
    start_date=datetime.datetime(2022, 3, 1, tzinfo=KST),
    schedule_interval="@once",
    catchup=True,
    tags=['pepega'],
) as dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")

잘 작동한다.

 

다음 글은

Hook를 사용하도록 하겠다.