Airflow

Airflow XComs puller 사용하기

pepega 2022. 3. 18. 18:00

공식 문서를 토대로 활용하였다.

 

XComs

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#working-with-custom-xcom-backends-in-containers

 

xcom_pull

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html?highlight=xcom_pull#airflow.models.baseoperator.BaseOperator.xcom_pull 

 

Examples

https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py

 

블로그 코드는

https://github.com/GHGHGHKO/airflow/blob/master/dags/xcom_test.py

 

 

들어가기 앞서

 apache airflow에 따르면

 

They can have any (serializable) value, but they are only designed for small amounts of data

 

작은 양의 데이터만 사용할 수 있도록 만들었으니

크기가 큰 데이터프레임이나 무거운 쿼리를 저장하지 않는게 좋을 듯 하다.

 

저번 포스팅에서

XComs에 쿼리 결과를 push 했다.

 

Postgres 버전

https://pepega.tistory.com/49

 

Oracle 버전

https://pepega.tistory.com/50

 

 

Postgres 버전으로 진행하겠다.

 

현재 XComs에는 아래와 같은 데이터가 들어가있다.

Key Value Timestamp Logical Date Task Id Dag Id
return_value [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] 2022-03-17, 09:02:09 2022-02-28, 15:00:00 postgres-db-task  postgres_db_operator

 

puller, PythonOperator를 사용하여

위 XCom의 Value를 가져올 것이다.

 

def puller(task_instance=None):
    print(task_instance.xcom_pull(dag_id='postgres_db_operator', task_ids='postgres-db-task'))

xcom_pull 의 parameter는 xcom_pull의 공식 문서를 참고했다.

puller는 airflow/apache github의 example 코드를 참고하였다.

 

    python_xcom = PythonOperator(
        task_id="xcom-python",
        python_callable=puller,
    )

 

PythonOperator로 함수를 호출하면 된다.

 

[2022-03-18, 08:47:40 UTC] {logging_mixin.py:109} INFO - [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']]
[2022-03-18, 08:47:40 UTC] {python.py:175} INFO - Done. Returned value was: None

잘 작동된다.

 

다음 포스팅에는

 

XComs Value를 호출하여

 

메일을 전송할 것이다.