前幾篇實作 DAG & Operator,大家應該可以感受到 Airflow 透過 Operator 讓 Data Pipeline 更有彈性,我們可以透過不同功能的 Operator 跟 Trigger Rule 實作 DAG,甚至實作自己的 Operator。接下來這篇會帶大家瞭解 Airflow 如何透過架構安排讓部屬更有彈性!
架構
在聊聊架構之前,大家可以先想想前幾天我們是如何啟動 Airflow 的?沒錯,就是 WebServer & Scheduler,所以 Airflow 的架構至少有這兩部分。
WebServer 是單純的 UI 介面,但卻可以看到跟 Scheduler 一樣的 DAG、Connections 等等,這代表 WebServer 跟 Scheduler 之間一定連到同一個儲存介面,所以才會拿到一樣的值,這個儲存介面就是 Metadata Database。
回想第一篇,我們在做 airflow initdb
時有產生一個 SQLite 的資料庫 airflow.db
,如果連進去看可以發現有幾個 Table 紀錄著我們熟悉的資訊,像是 DAG 資訊的 dag
,每一次 Task 運行資訊的 task_instance
等等。
> sqlite3 ~/airflow/airflow.db
SQLite version 3.28.0 2019-04-15 14:49:49
Enter ".help" for usage hints.
sqlite> .tables
alembic_version job slot_pool
chart known_event task_fail
connection known_event_type task_instance
dag kube_resource_version task_reschedule
dag_pickle kube_worker_uuid users
dag_run log variable
dag_tag serialized_dag xcom
import_error sla_miss
接下來可以看一下 Airflow 的架構圖,主要分成四個部分 WebServer, Scheduler, Executor 以及 Worker,我們分別來看看這四個 Components。
WebServer
WebServer 顧名思義,就是提供網頁功能的伺服器,WebServer 是可以單獨運作的,所以可以被分開部署,但如果只有 WebServer 而沒有下面三個 Components,也就只能查看 DAG、Task 及 Connections 等一些資訊而已。
Scheduler
Scheduler 負責排程,這裡的排程並不是直接指定哪個 Task 給哪個 Worker 運行,而是從 Metadata Database 中找尋 DAG 跟 Task 的狀態,並判斷是否將哪些 Task 傳送給 Executor 安排執行。
Executor
Executor 是一個 Queue Process,從 Scheduler 接收要執行的 Task,並將這些資訊存進 Queue,並從 Queue 中取出 Task 安排給閒置的 Worker 執行。如果仔細觀察 WebServer 或是 Scheduler 運行時的資訊,會發現 Terminal 有時會印出 INFO - Using executor SequentialExecutor
,代表我們使用的是 SequentialExecutor
,SequentialExecutor
通常用於 debug,一次只能運行一個 Task,也是唯一可以跟 SQLite 配合的 Executor,因為 SQLite 不支援多個連線。
除了 SequentialExecutor
還有其他幾個 Executor,可以在 Airflow 的 Source Code 看到,像是 LocalExecutor
、CeleryExecutor
、KubernetesExecutor
等。
LocalExecutor
:單機版的 Executor,雖然是單機版,但還是可以透過 MultiProcess 同時運行多個 Worker。CeleryExecutor
:使用 Celery 分散式的 Task Queue 當作 Executor,所以可以在多台電腦同時運行多個 Worker。KubernetesExecutor
:使用 Kubernetes 當作 Executor,所以每個 Task 都會被包裝成一個 Pod 運行,下下篇我們要將 Airflow 架設在 Kuberentes 上就會用到KubernetesExecutor
。
Worker
Worker 負責實際執行Task。
Task 狀態
最後來看一下,Task 在 Airflow 裡從開始到結束會有哪些狀態。
- No status: 當我們手動 Trigger DAG 或是 Scheduler 排程 DAG 後,這時 DAG 的 Task 會先被創造成 Task Instance 並寫進 Database ,這時 Task 的狀態就是 no status。
- Scheduled: 當 Scheduler 確認某個 Task 需要被執行時,這時 Task 的狀態就會變成 Scheduled,像是當我們使用
BranchPythonOperator
,執行結束後,Scheduler 會就依照執行的結果決定下一個 Task,這時那個 Task 的狀態就會是 Scheduled。 - Queued: 當 Scheduler 把確定要執行的 Task 發送給 Executor 時,相當於把 Task 放入 Queue 裡,所以 Task 的狀態會變成 Queued。
- Running: 當 Executor 把 Task 發送給閒置的 Worker 時,Task 的狀態就會變成 Running。
- 最後依據 Workere 執行的結果,Executor 會把 Task 標示成 Success 或是 Failed。
總結
今天我們看了 Airflow 的架構圖,以及 Task 在 Airflow 中執行的各種狀態,對這兩點有了基礎的瞭解後,未來如果我們部署的 Airflow 有問題,相信大家能更快定位出問題可能出在哪。像是 Task 已經到了 Queued 的狀態了,卻一直遲遲沒有進到 Running,這時候我們就可以看看是不是 Worker 哪裡出問題了。
下一篇會帶大家看過 Airflow 的設定檔,讓大家知道 Airflow 有哪些可調控的部分。