Airflow

Airflow PostgresOperator Custom 하기

pepega 2022. 3. 17. 16:58

공식 문서를 기반으로 작성하였다.

 

PostgresHook

https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/_api/airflow/providers/postgres/hooks/postgres/index.html#airflow.providers.postgres.hooks.postgres.PostgresHook.conn_name_attr

 

DbApi

https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/hooks/dbapi.html#DbApiHook.get_first

 

하단 코드는 아래에 있다.

https://github.com/GHGHGHKO/airflow/commit/aef9f2f8934223f3285443b0011b5cfb5fd0d712

 

 

PostgresOperator 사용 할 때

마음대로 custom하지 못하는 상황이 있었다.

 

예를 들어 난 account에 있는 email을 XComs으로 내보내고 싶은데

PostgresOperator를 사용하면 아래처럼 나온다.

get_all_account = PostgresOperator(
    task_id="get_all_account",
    postgres_conn_id="postgres_test",
    sql="sql/all_get_account.sql",
)
[2022-03-17, 07:25:06 UTC] {dbapi.py:225} INFO - Running statement: select email from account;, parameters: None
[2022-03-17, 07:25:06 UTC] {dbapi.py:233} INFO - Rows affected: 3

 

그래서 PostgresHook와 DbApi를 사용하기로 했다.

 

plugins/postgres_db_operator.py

class PostgresDBOperator(BaseOperator):

    def __init__(self,
                 name: str,
                 postgres_conn_id: str,
                 sql: str,
                 **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.postgres_conn_id = postgres_conn_id
        self.sql = sql

    def execute(self, context):
        hook = PostgresHook(
            postgres_conn_id = self.postgres_conn_id,
        )
        result = hook.get_records(self.sql)
        print(result)
        return result

DbApi 중 get_records를 사용하였다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/hooks/dbapi/index.html#airflow.hooks.dbapi.DbApiHook.get_records

 

 

dags/postgres_custom_db_operator_test.py

hello_db_task = PostgresDBOperator(
    task_id="postgres-db-task",
    name="pepega",
    postgres_conn_id="postgres_test",
    sql="select * from account;",
)

 

예시에서는 모든 내용을 출력했다.

 

작동 됐다.

 

쿼리문 조회 결과는 아래처럼 나왔다.

[2022-03-17, 06:12:24 UTC] {logging_mixin.py:109} INFO - [('test', 'ko', 'pepega'), ('cuphead', 'teapot', 'cup'), ('mugman', 'teapot1', 'cup1')]

 

 

XComs 또한 정상적으로 입력되었다.

return_value [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] 2022-03-17, 06:07:43 2022-02-28, 15:00:00 postgres-db-task  postgres_custom_db_operator

 

다음 포스팅에는

OracleOperator를 custom 하겠다.