공식 문서를 기반으로 글과 코드를 작성하였다.
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
코드는 아래에 있다.
https://github.com/GHGHGHKO/airflow/commit/37966fd4e8774e04c95f6393fd796b86573d4dc2
예시를 들어서 내용을 적도록 하겠다.
테이블 스페이스가 부족할 때
경고 메일을 보내는 스케줄을 추가하려고 한다.
메일 내용에
특정 테이블 스페이스 이름을 넣은 채로
메일을 보내고 싶은데
--테이블 스페이스 이름 조회
select name from table_space;
Airflow에서 제공하는
SQLOperator는 Airflow의 XCOM으로 return 할 수 없다.
그래서 해결 할 방법으로
Operator를 custom하는 방법이다.
원하는 Operator를 상속 받아서 만들면 된다.
dags, logs, plugins 디렉터리 중 plugins 디렉터리 안에 생성했다.
plugins/hello_operator.py
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
아래와 같이 사용할 수 있다.
import datetime
import pendulum
from airflow import DAG
from hello_operator import HelloOperator
KST = pendulum.timezone("Asia/Seoul")
default_args = {
'owner': 'pepega',
}
with DAG(
dag_id="custom_operator",
start_date=datetime.datetime(2022, 3, 1, tzinfo=KST),
schedule_interval="@once",
catchup=True,
tags=['pepega'],
) as dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
잘 작동한다.
다음 글은
Hook를 사용하도록 하겠다.
'Airflow' 카테고리의 다른 글
Airflow OracleOperator Custom 하기 (0) | 2022.03.17 |
---|---|
Airflow PostgresOperator Custom 하기 (0) | 2022.03.17 |
Airflow 메일 전송, EmailOperator, SMTP, Gmail (0) | 2022.03.15 |
Airflow BranchSQLOperator 활용 (0) | 2022.03.14 |
Docker Airflow Oracle 활용하기 (0) | 2022.03.10 |