공식 문서를 기반으로 작성하였다.
PostgresHook
DbApi
하단 코드는 아래에 있다.
https://github.com/GHGHGHKO/airflow/commit/aef9f2f8934223f3285443b0011b5cfb5fd0d712
PostgresOperator 사용 할 때
마음대로 custom하지 못하는 상황이 있었다.
예를 들어 난 account에 있는 email을 XComs으로 내보내고 싶은데
PostgresOperator를 사용하면 아래처럼 나온다.
get_all_account = PostgresOperator(
task_id="get_all_account",
postgres_conn_id="postgres_test",
sql="sql/all_get_account.sql",
)
[2022-03-17, 07:25:06 UTC] {dbapi.py:225} INFO - Running statement: select email from account;, parameters: None
[2022-03-17, 07:25:06 UTC] {dbapi.py:233} INFO - Rows affected: 3
그래서 PostgresHook와 DbApi를 사용하기로 했다.
plugins/postgres_db_operator.py
class PostgresDBOperator(BaseOperator):
def __init__(self,
name: str,
postgres_conn_id: str,
sql: str,
**kwargs
) -> None:
super().__init__(**kwargs)
self.name = name
self.postgres_conn_id = postgres_conn_id
self.sql = sql
def execute(self, context):
hook = PostgresHook(
postgres_conn_id = self.postgres_conn_id,
)
result = hook.get_records(self.sql)
print(result)
return result
DbApi 중 get_records를 사용하였다.
dags/postgres_custom_db_operator_test.py
hello_db_task = PostgresDBOperator(
task_id="postgres-db-task",
name="pepega",
postgres_conn_id="postgres_test",
sql="select * from account;",
)
예시에서는 모든 내용을 출력했다.
작동 됐다.
쿼리문 조회 결과는 아래처럼 나왔다.
[2022-03-17, 06:12:24 UTC] {logging_mixin.py:109} INFO - [('test', 'ko', 'pepega'), ('cuphead', 'teapot', 'cup'), ('mugman', 'teapot1', 'cup1')]
XComs 또한 정상적으로 입력되었다.
return_value | [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] | 2022-03-17, 06:07:43 | 2022-02-28, 15:00:00 | postgres-db-task | postgres_custom_db_operator |
다음 포스팅에는
OracleOperator를 custom 하겠다.
'Airflow' 카테고리의 다른 글
Airflow XComs puller 사용하기 (0) | 2022.03.18 |
---|---|
Airflow OracleOperator Custom 하기 (0) | 2022.03.17 |
Airflow custom Operator 만들기 (2) | 2022.03.16 |
Airflow 메일 전송, EmailOperator, SMTP, Gmail (0) | 2022.03.15 |
Airflow BranchSQLOperator 활용 (0) | 2022.03.14 |