공식 문서를 기반으로 작성하였다.
PostgresHook
DbApi
하단 코드는 아래에 있다.
https://github.com/GHGHGHKO/airflow/commit/47de1fd55ff2be812702ba4539adf54e6d9bd292
OracleOperator 사용 할 때
마음대로 custom하지 못하는 상황이 있었다.
예를 들어 난 dual 값을 불러오고 싶은데
OracleOperator를 사용하면 아래처럼 나온다.
oracle_task = OracleOperator(
task_id='oracle',
oracle_conn_id='oracle_test',
sql='SELECT * FROM DUAL',
dag=dag,
)
[2022-03-10, 12:48:15 UTC] {dbapi.py:225} INFO - Running statement: SELECT * FROM DUAL, parameters: None
[2022-03-10, 12:48:16 UTC] {dbapi.py:233} INFO - Rows affected: 0
그래서 OracleHook와 DbApi를 사용하기로 했다.
plugins/oracle_db_operator.py
class OracleDBOperator(BaseOperator):
def __init__(self,
name: str,
oracle_conn_id: str,
sql: str,
**kwargs
) -> None:
super().__init__(**kwargs)
self.name = name
self.oracle_conn_id = oracle_conn_id
self.sql = sql
def execute(self, context):
hook = OracleHook(
oracle_conn_id = self.oracle_conn_id,
)
result = hook.get_first(self.sql)
print(result)
return result
DbApi 중 get_first를 사용하였다.
dags/oracle_custom_db_operator_test.py
oracle_db_task = OracleDBOperator(
task_id="oracle-db-task",
name="pepega",
oracle_conn_id="oracle_test",
sql="select * from dual",
)
작동 됐다.
쿼리문 조회 결과는 아래처럼 나왔다.
[2022-03-17, 07:01:42 UTC] {logging_mixin.py:109} INFO - ('X',)
XComs 또한 정상적으로 입력되었다.
return_value | ['X'] | 2022-03-17, 07:01:42 | 2022-02-28, 15:00:00 | oracle-db-task | oracle_db_operator |
다음 포스팅에는
XComs을 활용한
메일 전송을 포스팅 하겠다.
'Airflow' 카테고리의 다른 글
Airflow XComs Email 전송 (0) | 2022.03.18 |
---|---|
Airflow XComs puller 사용하기 (0) | 2022.03.18 |
Airflow PostgresOperator Custom 하기 (0) | 2022.03.17 |
Airflow custom Operator 만들기 (2) | 2022.03.16 |
Airflow 메일 전송, EmailOperator, SMTP, Gmail (0) | 2022.03.15 |