Airflow

Airflow OracleOperator Custom 하기

pepega 2022. 3. 17. 17:09

공식 문서를 기반으로 작성하였다.

 

PostgresHook

https://airflow.apache.org/docs/apache-airflow-providers-oracle/stable/_modules/airflow/providers/oracle/hooks/oracle.html#OracleHook

 

DbApi

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html#DbApiHook.get_first

 

하단 코드는 아래에 있다.

https://github.com/GHGHGHKO/airflow/commit/47de1fd55ff2be812702ba4539adf54e6d9bd292

 

 

OracleOperator 사용 할 때

마음대로 custom하지 못하는 상황이 있었다.

 

예를 들어 난 dual 값을 불러오고 싶은데

OracleOperator를 사용하면 아래처럼 나온다.

 

    oracle_task = OracleOperator(
        task_id='oracle',
        oracle_conn_id='oracle_test',
        sql='SELECT * FROM DUAL',
        dag=dag,
    )
[2022-03-10, 12:48:15 UTC] {dbapi.py:225} INFO - Running statement: SELECT * FROM DUAL, parameters: None
[2022-03-10, 12:48:16 UTC] {dbapi.py:233} INFO - Rows affected: 0

 

그래서 OracleHook와 DbApi를 사용하기로 했다.

 

plugins/oracle_db_operator.py

class OracleDBOperator(BaseOperator):

    def __init__(self,
                 name: str,
                 oracle_conn_id: str,
                 sql: str,
                 **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.oracle_conn_id = oracle_conn_id
        self.sql = sql

    def execute(self, context):
        hook = OracleHook(
            oracle_conn_id = self.oracle_conn_id,
        )
        result = hook.get_first(self.sql)
        print(result)
        return result

DbApi 중 get_first를 사용하였다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/hooks/dbapi/index.html#airflow.hooks.dbapi.DbApiHook.get_first

 

dags/oracle_custom_db_operator_test.py

oracle_db_task = OracleDBOperator(
    task_id="oracle-db-task",
    name="pepega",
    oracle_conn_id="oracle_test",
    sql="select * from dual",
)

작동 됐다.

 

쿼리문 조회 결과는 아래처럼 나왔다.

[2022-03-17, 07:01:42 UTC] {logging_mixin.py:109} INFO - ('X',)

 

XComs 또한 정상적으로 입력되었다.

return_value ['X'] 2022-03-17, 07:01:42 2022-02-28, 15:00:00 oracle-db-task  oracle_db_operator

 

다음 포스팅에는

XComs을 활용한

메일 전송을 포스팅 하겠다.

'Airflow' 카테고리의 다른 글

Airflow XComs Email 전송  (0) 2022.03.18
Airflow XComs puller 사용하기  (0) 2022.03.18
Airflow PostgresOperator Custom 하기  (0) 2022.03.17
Airflow custom Operator 만들기  (2) 2022.03.16
Airflow 메일 전송, EmailOperator, SMTP, Gmail  (0) 2022.03.15