Airflow

Airflow XComs puller 사용하기

pepega 2022. 3. 18. 18:00

공식 문서를 토대로 활용하였다.

 

XComs

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#working-with-custom-xcom-backends-in-containers

 

xcom_pull

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=xcom_pull#airflow.models.baseoperator.BaseOperator.xcom_pull 

 

Examples

https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py

 

블로그 코드는

https://github.com/GHGHGHKO/airflow/blob/master/dags/xcom_test.py

 

 

들어가기 앞서

 apache airflow에 따르면

 

They can have any (serializable) value, but they are only designed for small amounts of data

 

작은 양의 데이터만 사용할 수 있도록 만들었으니

크기가 큰 데이터프레임이나 무거운 쿼리를 저장하지 않는게 좋을 듯 하다.

 

저번 포스팅에서

XComs에 쿼리 결과를 push 했다.

 

Postgres 버전

https://pepega.tistory.com/49

 

Oracle 버전

https://pepega.tistory.com/50

 

 

Postgres 버전으로 진행하겠다.

 

현재 XComs에는 아래와 같은 데이터가 들어가있다.

Key Value Timestamp Logical Date Task Id Dag Id
return_value [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] 2022-03-17, 09:02:09 2022-02-28, 15:00:00 postgres-db-task  postgres_db_operator

 

puller, PythonOperator를 사용하여

위 XCom의 Value를 가져올 것이다.

 

def puller(task_instance=None):
    print(task_instance.xcom_pull(dag_id='postgres_db_operator', task_ids='postgres-db-task'))

xcom_pull 의 parameter는 xcom_pull의 공식 문서를 참고했다.

puller는 airflow/apache github의 example 코드를 참고하였다.

 

    python_xcom = PythonOperator(
        task_id="xcom-python",
        python_callable=puller,
    )

 

PythonOperator로 함수를 호출하면 된다.

 

[2022-03-18, 08:47:40 UTC] {logging_mixin.py:109} INFO - [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']]
[2022-03-18, 08:47:40 UTC] {python.py:175} INFO - Done. Returned value was: None

잘 작동된다.

 

다음 포스팅에는

 

XComs Value를 호출하여

 

메일을 전송할 것이다.

'Airflow' 카테고리의 다른 글

Airflow query csv export, 쿼리로 csv export  (0) 2022.03.20
Airflow XComs Email 전송  (0) 2022.03.18
Airflow OracleOperator Custom 하기  (0) 2022.03.17
Airflow PostgresOperator Custom 하기  (0) 2022.03.17
Airflow custom Operator 만들기  (2) 2022.03.16