這篇會聊聊 Airflow Best Practices,雖然說是 Best Practices,但是工程不像科學,工程只有適不適合,所以大家並不一定要完全採納,只要適合的就是好的。
Tips
使用 Default Arguments
大多數人第一次寫 DAG,最早碰到的應該就是 Default Arguments,使用 Default Arguments 可以大大減少不斷地在 Task 重複同樣的設定參數。
default_args = {
'owner': 'someone',
'depends_on_past': False,
'start_date': datetime(2020, 2, 24),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'end_date': datetime(2020, 2, 29),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
}
使用 params
雖然在實作中沒講到,但是透過 params 可以傳遞自定義的參數進去 Task,而這個參數如果在很多 Task 都重複時,放在 params 就很方便,因為不用每個 Task 都重複一遍。在 Task 中,我們可以從 context.params
拿到這些參數。
dag = DAG(
...,
params={
"s3_bucket": "test"
}
}
使用 Variables,但有限制的使用
一樣在實作中沒講到,但是我們可以在 WebServer 的 Admin → Variables 頁面看到,這裡可以使用 key value 形式創造 Variable。標題寫有限制的使用是因為 value 也可以是 json 的格式,所以我們可以用一個 Variable 就設定好一個 DAG 所需的全部參數。
然後在 DAG 裡面,我們可以透過 Variable.get("some key")
拿到 Variable,習慣上還會把 Variable 當成 params,因為像上面的 s3_bucket 可能就會隨環境而變。
dag = DAG(
...,
params=Variable.get("test", deserialize_json=True)
)
使用 Connections 儲存敏感資料
如果從第一篇看過來,應該會覺得用 Connections 儲存連線資訊很正常,但如果還不了解 Airflow 的功能,可能會想要透過環境參數傳連線資訊,所以這裡還是提醒一下用 Connections 儲存敏感資料,除了可以用 Connections 連線之外,如果想在 Task 中拿到敏感資訊也很容易。
from airflow.hooks.base_hook import BaseHook
redis_password = BaseHook.get_connection('redis_default').password
使用 context manager
上面幾點都是有關設定的,接下來看看有關 coding 的。每個 Task 我們都要設定 dag=dag
,有時 Task 一多就可能忘記,所以 Airflow 也提供了一個方便的功能 context manager,讓在 context manager 下的 Task 都屬於同一個 DAG。
default_args = {...}
with DAG(dag_id='test', default_args=args, schedule_interval='0/1 0 * * *') as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "say hi"')
task_2 = DummyOperator(task_id='task_2')
task_1 >> task_2
同一個 level 的 Task 使用 List 表示
使用 List 表示,可以簡化程式碼。
get_timestamp >> branching >> [store_in_redis, skip]
總結
感謝大家看到這邊,總算完成這次的寫作松了,可以開瓶慶祝了🍾🍾🍾。不過最重要的還是希望在這七篇中可以更了解如何使用 Airflow!謝謝大家!