Airflow
Airflow XComs puller 사용하기
pepega
2022. 3. 18. 18:00
공식 문서를 토대로 활용하였다.
XComs
xcom_pull
Examples
https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py
블로그 코드는
https://github.com/GHGHGHKO/airflow/blob/master/dags/xcom_test.py
들어가기 앞서
apache airflow에 따르면
They can have any (serializable) value, but they are only designed for small amounts of data
작은 양의 데이터만 사용할 수 있도록 만들었으니
크기가 큰 데이터프레임이나 무거운 쿼리를 저장하지 않는게 좋을 듯 하다.
저번 포스팅에서
XComs에 쿼리 결과를 push 했다.
Postgres 버전
Oracle 버전
Postgres 버전으로 진행하겠다.
현재 XComs에는 아래와 같은 데이터가 들어가있다.
Key | Value | Timestamp | Logical Date | Task Id | Dag Id |
return_value | [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] | 2022-03-17, 09:02:09 | 2022-02-28, 15:00:00 | postgres-db-task | postgres_db_operator |
puller, PythonOperator를 사용하여
위 XCom의 Value를 가져올 것이다.
def puller(task_instance=None):
print(task_instance.xcom_pull(dag_id='postgres_db_operator', task_ids='postgres-db-task'))
xcom_pull 의 parameter는 xcom_pull의 공식 문서를 참고했다.
puller는 airflow/apache github의 example 코드를 참고하였다.
python_xcom = PythonOperator(
task_id="xcom-python",
python_callable=puller,
)
PythonOperator로 함수를 호출하면 된다.
[2022-03-18, 08:47:40 UTC] {logging_mixin.py:109} INFO - [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']]
[2022-03-18, 08:47:40 UTC] {python.py:175} INFO - Done. Returned value was: None
잘 작동된다.
다음 포스팅에는
XComs Value를 호출하여
메일을 전송할 것이다.