공식 문서를 토대로 활용하였다.
XComs
xcom_pull
Examples
https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py
블로그 코드는
https://github.com/GHGHGHKO/airflow/blob/master/dags/xcom_test.py
들어가기 앞서
apache airflow에 따르면
They can have any (serializable) value, but they are only designed for small amounts of data
작은 양의 데이터만 사용할 수 있도록 만들었으니
크기가 큰 데이터프레임이나 무거운 쿼리를 저장하지 않는게 좋을 듯 하다.
저번 포스팅에서
XComs에 쿼리 결과를 push 했다.
Postgres 버전
Oracle 버전
Postgres 버전으로 진행하겠다.
현재 XComs에는 아래와 같은 데이터가 들어가있다.
Key | Value | Timestamp | Logical Date | Task Id | Dag Id |
return_value | [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']] | 2022-03-17, 09:02:09 | 2022-02-28, 15:00:00 | postgres-db-task | postgres_db_operator |
puller, PythonOperator를 사용하여
위 XCom의 Value를 가져올 것이다.
def puller(task_instance=None):
print(task_instance.xcom_pull(dag_id='postgres_db_operator', task_ids='postgres-db-task'))
xcom_pull 의 parameter는 xcom_pull의 공식 문서를 참고했다.
puller는 airflow/apache github의 example 코드를 참고하였다.
python_xcom = PythonOperator(
task_id="xcom-python",
python_callable=puller,
)
PythonOperator로 함수를 호출하면 된다.
[2022-03-18, 08:47:40 UTC] {logging_mixin.py:109} INFO - [['test', 'ko', 'pepega'], ['cuphead', 'teapot', 'cup'], ['mugman', 'teapot1', 'cup1']]
[2022-03-18, 08:47:40 UTC] {python.py:175} INFO - Done. Returned value was: None
잘 작동된다.
다음 포스팅에는
XComs Value를 호출하여
메일을 전송할 것이다.
'Airflow' 카테고리의 다른 글
Airflow query csv export, 쿼리로 csv export (0) | 2022.03.20 |
---|---|
Airflow XComs Email 전송 (0) | 2022.03.18 |
Airflow OracleOperator Custom 하기 (0) | 2022.03.17 |
Airflow PostgresOperator Custom 하기 (0) | 2022.03.17 |
Airflow custom Operator 만들기 (2) | 2022.03.16 |