Airflow

Airflow BranchSQLOperator 활용

pepega 2022. 3. 14. 18:42

apache/airflow github commit을 활용했다.

https://github.com/apache/airflow/blob/39e395f9816c04ef2f033eb0b4f635fc3018d803/airflow/operators/sql.py#L470

 

활용한 전체 소스코드는

github에 올렸다.

https://github.com/GHGHGHKO/airflow/blob/master/dags/postgres_test.py

 

 

Airflow에서 제공하는

BranchSQLOperator를 활용한 예제이다.

 

쿼리의 결과(true or false)에 따라

TAG를 활용 할 수 있다.

 

 

추후 이 예시를 통해

 

테이블 스페이스가 가득 찰 경우

알람 메일이 전송되는 스케줄을 등록 할 것이다.

 

 

    true_branch_operator = BranchSQLOperator(
        task_id="true_sql_branch_id",
        conn_id="postgres_test",
        sql="sql/account_count.sql",
        follow_task_ids_if_true="true_operator_1st",
        follow_task_ids_if_false="false_operator_1st",
    )

 

브랜치 되기 전 DAG이다.

 

account_count.sql 내용과 결과는 아래와 같다.

select count(*) from account;
 count
-------
     1
(1 row)

 

위 DAG 실행 결과는

아래처럼 나온다.

 

 

false인 경우도 실행하였다.

    false_branch_operator = BranchSQLOperator(
        task_id="false_sql_branch_id",
        conn_id="postgres_test",
        sql="sql/zero_count.sql",
        follow_task_ids_if_true="true_operator_2nd",
        follow_task_ids_if_false="false_operator_2nd",
    )

브랜치 되기 전 DAG이다.

 

zero_count.sql 내용과 결과는 아래와 같다.

select count(*) from account
where email = 'pepe';
 count
-------
     0
(1 row)

 

위 DAG 실행 결과는

아래처럼 나온다.

 

 

결과 별로 브랜치를 나눠봤다.

 

다음 글에서는

EmailOperator를 활용하여 메일을 보내도록 하겠다.

'Airflow' 카테고리의 다른 글

Airflow PostgresOperator Custom 하기  (0) 2022.03.17
Airflow custom Operator 만들기  (2) 2022.03.16
Airflow 메일 전송, EmailOperator, SMTP, Gmail  (0) 2022.03.15
Docker Airflow Oracle 활용하기  (0) 2022.03.10
Docker Airflow 설치하기  (2) 2022.03.09