apache/airflow github commit을 활용했다.
활용한 전체 소스코드는
github에 올렸다.
https://github.com/GHGHGHKO/airflow/blob/master/dags/postgres_test.py
Airflow에서 제공하는
BranchSQLOperator를 활용한 예제이다.
쿼리의 결과(true or false)에 따라
TAG를 활용 할 수 있다.
추후 이 예시를 통해
테이블 스페이스가 가득 찰 경우
알람 메일이 전송되는 스케줄을 등록 할 것이다.
true_branch_operator = BranchSQLOperator(
task_id="true_sql_branch_id",
conn_id="postgres_test",
sql="sql/account_count.sql",
follow_task_ids_if_true="true_operator_1st",
follow_task_ids_if_false="false_operator_1st",
)
브랜치 되기 전 DAG이다.
account_count.sql 내용과 결과는 아래와 같다.
select count(*) from account;
count
-------
1
(1 row)
위 DAG 실행 결과는
아래처럼 나온다.
false인 경우도 실행하였다.
false_branch_operator = BranchSQLOperator(
task_id="false_sql_branch_id",
conn_id="postgres_test",
sql="sql/zero_count.sql",
follow_task_ids_if_true="true_operator_2nd",
follow_task_ids_if_false="false_operator_2nd",
)
브랜치 되기 전 DAG이다.
zero_count.sql 내용과 결과는 아래와 같다.
select count(*) from account
where email = 'pepe';
count
-------
0
(1 row)
위 DAG 실행 결과는
아래처럼 나온다.
결과 별로 브랜치를 나눠봤다.
다음 글에서는
EmailOperator를 활용하여 메일을 보내도록 하겠다.
'Airflow' 카테고리의 다른 글
Airflow PostgresOperator Custom 하기 (0) | 2022.03.17 |
---|---|
Airflow custom Operator 만들기 (2) | 2022.03.16 |
Airflow 메일 전송, EmailOperator, SMTP, Gmail (0) | 2022.03.15 |
Docker Airflow Oracle 활용하기 (0) | 2022.03.10 |
Docker Airflow 설치하기 (2) | 2022.03.09 |