공식문서를 기반으로 만들었다.
아래 내용을 토대로 만들었다.
전체 코드는 아래에 있다.
https://github.com/GHGHGHKO/airflow/commit/7509b3c13816629c450afc8f8a325143c1da1d65
기존에는 get_records 메서드를 활용했다.
def execute(self, context):
hook = PostgresHook(
postgres_conn_id = self.postgres_conn_id,
)
result = hook.get_records(self.sql)
print(result)
return result
csv 데이터를 추출하기 위해
get_records -> get_pandas_df로 수정하였다.
def execute(self, context):
hook = PostgresHook(
postgres_conn_id = self.postgres_conn_id,
)
get_records = hook.get_pandas_df(self.sql)
get_records.to_csv('/home/airflow/names_df.csv')
쿼리 내용은 아래와 같다.
export_csv = PostgresCsvExportOperator(
task_id="postgres-csv-export",
name="pepega",
postgres_conn_id="postgres_test",
sql="select * from account;",
)
airflow=# select * from account;
user_id | username | email
---------+----------+--------
test | ko | pepega
cuphead | teapot | cup
mugman | teapot1 | cup1
(3 rows)
실행은 됐다.
내용을 확인해보자
$ cat names_df.csv
,user_id,username,email
0,test,ko,pepega
1,cuphead,teapot,cup
2,mugman,teapot1,cup1
잘 추출됐다.
'Airflow' 카테고리의 다른 글
Airflow XComs Email 전송 (0) | 2022.03.18 |
---|---|
Airflow XComs puller 사용하기 (0) | 2022.03.18 |
Airflow OracleOperator Custom 하기 (0) | 2022.03.17 |
Airflow PostgresOperator Custom 하기 (0) | 2022.03.17 |
Airflow custom Operator 만들기 (2) | 2022.03.16 |