Airflow

Airflow query csv export, 쿼리로 csv export

pepega 2022. 3. 20. 03:51

공식문서를 기반으로 만들었다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/hooks/dbapi/index.html#airflow.hooks.dbapi.DbApiHook.get_pandas_df

 

아래 내용을 토대로 만들었다.

https://pepega.tistory.com/49

 

전체 코드는 아래에 있다.

https://github.com/GHGHGHKO/airflow/commit/7509b3c13816629c450afc8f8a325143c1da1d65

 

기존에는 get_records 메서드를 활용했다.

    def execute(self, context):
        hook = PostgresHook(
            postgres_conn_id = self.postgres_conn_id,
        )
        result = hook.get_records(self.sql)
        print(result)
        return result

 

csv 데이터를 추출하기 위해

get_records -> get_pandas_df로 수정하였다.

 

    def execute(self, context):
        hook = PostgresHook(
            postgres_conn_id = self.postgres_conn_id,
        )
        get_records = hook.get_pandas_df(self.sql)
        get_records.to_csv('/home/airflow/names_df.csv')

 

쿼리 내용은 아래와 같다.

    export_csv = PostgresCsvExportOperator(
        task_id="postgres-csv-export",
        name="pepega",
        postgres_conn_id="postgres_test",
        sql="select * from account;",
    )
airflow=# select * from account;
 user_id | username | email
---------+----------+--------
 test    | ko       | pepega
 cuphead | teapot   | cup
 mugman  | teapot1  | cup1
(3 rows)

실행은 됐다.

내용을 확인해보자

 

$ cat names_df.csv
,user_id,username,email
0,test,ko,pepega
1,cuphead,teapot,cup
2,mugman,teapot1,cup1

 

잘 추출됐다.

'Airflow' 카테고리의 다른 글

Airflow XComs Email 전송  (0) 2022.03.18
Airflow XComs puller 사용하기  (0) 2022.03.18
Airflow OracleOperator Custom 하기  (0) 2022.03.17
Airflow PostgresOperator Custom 하기  (0) 2022.03.17
Airflow custom Operator 만들기  (2) 2022.03.16