Airflow 動手玩:(一)簡介


前言

在資料科學盛行的今天,相信大家多少有碰過 Data Cleaning 或是 ETL(Extract-Transform-Load),像是把資料從 File 搬到 Database,或是把資料從 OLTP(on-line transaction processing)搬到 OLAP(on-line analytical processing)的 Warehouse 等等。過去在大學時,也曾經有個案子要做類似的事,我們要定期去資料庫撈資料以及做些前處理,把處理後的資料存成 JSON 檔,讓網頁方便讀取。那時還沒什麼好工具,只能用些簡單的做法,像是寫 Script 搭配 Crontab 來做到定期處理,這種做法雖然簡單,但會比較難維護,像是如何知道每次的處理有沒有成功?如果有錯又是錯在哪?如果要對接資料庫或是其他需要帳密才能讀取的系統,要如何存 Credential 資料?還好現在有了 Airflow,讓這些事情都變得容易許多,我們只要瞭解 Airflow 的使用方法,接下來專注在 ETL 的邏輯上即可!

安裝

Airflow 是由 Python 編寫的,可以透過 pip 或是其他 Python 套件管理安裝 Airflow,從 Airflow 的 GitHub Repostiory 上,會看到目前 Airflow 的版本支援 Python 2.7, 3.5, 3.6, 3.7,不過我用 Python 3.7 在不同電腦上安裝都沒有成功,所以這篇文章會使用的 Python 3.6 來做示範。
Airflow 支援的 Python 版本

> python -V
Python 3.6.8
> pip install apache-airflow

如果有安裝成功,執行 airflow -h 會看到 airflow 可以使用的指令。

> airflow -h
usage: airflow [-h]
               {backfill,list_dag_runs,list_tasks,clear,pause,unpause,trigger_dag,delete_dag,show_dag,pool,variables,kerberos,render,run,initdb,list_dags,dag_state,task_failed_deps,task_state,serve_logs,test,webserver,resetdb,upgradedb,checkdb,shell,scheduler,worker,flower,version,connections,create_user,delete_user,list_users,sync_perm,next_execution,rotate_fernet_key}
               ...

啟動

設定 AIRFLOW_HOME

啟動 Airflow 時,需要 Airflow 的設定檔 airflow.cfg,所以我們需要設定一個路徑,讓 Airflow 知道可以去哪裡讀取設定檔,至於設定檔則會在初始化 Database 時,自動產生預設的設定檔。

> mkdir ~/airflow
> export AIRFLOW_HOME=~/airflow

初始化 Database

Airflow 在運行需要 Database 紀錄每一次跑過的 Task 的結果,所以接下來我們要初始化 Database,Airflow 預設會使用 SQLite,之後我們也可以把 Database 變成 PostgreSQL 或是 MySQL 等等。

> airflow initdb

執行 airflow initdb 後,再回到 airflow 的資料夾看,會發現多了幾個檔案,像是設定檔 airflow.cfg 以及 SQLite Database airflow.db

> ls ~/airflow
airflow.cfg   airflow.db    logs          unittests.cfg

啟動 webserver

有了設定檔及資料庫之後,接下來要啟動 Airflow 的管理介面。

> airflow webserver -p 8080
____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-02-22 15:03:03,442] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:03:03,443] {dagbag.py:403} INFO - Filling up the DagBag from ~/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
=================================================================
[2020-02-22 15:03:04 +0800] [3831] [INFO] Starting gunicorn 19.10.0
[2020-02-22 15:03:04 +0800] [3831] [INFO] Listening at: http://0.0.0.0:8080 (3831)
[2020-02-22 15:03:04 +0800] [3831] [INFO] Using worker: sync
[2020-02-22 15:03:04 +0800] [3838] [INFO] Booting worker with pid: 3838
[2020-02-22 15:03:04 +0800] [3839] [INFO] Booting worker with pid: 3839
[2020-02-22 15:03:04 +0800] [3840] [INFO] Booting worker with pid: 3840
[2020-02-22 15:03:04 +0800] [3841] [INFO] Booting worker with pid: 3841
[2020-02-22 15:03:05,167] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:03:05,168] {dagbag.py:403} INFO - Filling up the DagBag from ~/airflow/dags
[2020-02-22 15:03:05,267] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:03:05,268] {dagbag.py:403} INFO - Filling up the DagBag from ~/airflow/dags
[2020-02-22 15:03:05,312] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:03:05,314] {dagbag.py:403} INFO - Filling up the DagBag from ~/airflow/dags
[2020-02-22 15:03:05,320] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:03:05,322] {dagbag.py:403} INFO - Filling up the DagBag from ~/airflow/dags

這時我們打開 http://localhost:8080 就會進到 Airflow 的管理頁面了!首頁標題是 DAGs,這是 Directed Acyclic Graph 的簡稱,也是 Airflow 區分不同 Data Pipeline 的方式,這裡我們要先知道的是一個 DAG 是由多個 Task 組成,每個 Task 是分開執行的,而 Task 是 Airflow 執行基礎單位,後面我們會在更深入的探討 DAG & Task 的關係。
Airflow DAGs 頁面,上面有兩行 Warning。

啟動 Scheduler

如果剛剛打開的網頁仔細看的話,會發現有兩行小小的 Warning,這段的意思是 - Airflow 找不到 Scheduler,所以沒辦法幫新的 Tasks 做排程,如果 DAG 的程式碼有更新的話,也沒辦法執行最新的程式碼。所以接下來要另外啟動 Scheduler。

> airflow scheduler
____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-02-22 15:48:31,019] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-02-22 15:48:31,024] {scheduler_job.py:1344} INFO - Starting the scheduler
[2020-02-22 15:48:31,024] {scheduler_job.py:1352} INFO - Running execute loop for -1 seconds
[2020-02-22 15:48:31,024] {scheduler_job.py:1353} INFO - Processing each file at most -1 times
[2020-02-22 15:48:31,024] {scheduler_job.py:1356} INFO - Searching for files in ~/airflow/dags
[2020-02-22 15:48:31,031] {scheduler_job.py:1358} INFO - There are 23 files in ~/airflow/dags
[2020-02-22 15:48:31,031] {scheduler_job.py:1409} INFO - Resetting orphaned tasks for active dag runs
[2020-02-22 15:48:31,039] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 14517
[2020-02-22 15:48:31,047] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>

Airflow 介面

DAGs

成功開啟 Webserver 以及 Scheduler 後,我們再次進到 Airflow DAGs 頁面。

  • DAG 欄位代表的是每一個 DAG 的名稱,DAG 的名稱同時也是 DAG 的 id,所以之後我們在實作自己的 DAG 時,不能與已經有的 DAG 名稱重複。
  • Schedule 欄位代表什麼時間要執行 DAG,表示方式跟 Crontab 類似。
  • Owner 欄位則代表這個 DAG 是屬於誰的,實際開發中,可以把 Owner 欄位當成實作該 DAG 人員的名稱。
  • 最左邊有一排 On & Off 的開關,如果是 Off 的狀態代表 Scheduler 不會依照 Schedule 欄位的時間去排程,但我們一樣可以手動 trigger 一個 DAG 去執行。
    DAGs 頁面

Tree View

在 Airflow DAG 的頁面點進隨意一個 DAG,會進入一個 Tree View 的介面,雖然現在 Tree View 還沒有什麼東西,但未來我們會很常進入 Tree View 查看每一次 DAG 的執行狀況。
Tree View

Graph View

點進 Tree View 左邊的 Graph View 後,Airflow 會用圖形的方式呈現 Directed Acyclic Graph,在這一頁雖然沒辦法很明顯看到每次執行的情況, 但對於理解每個 DAG 執行的流程很有幫助。
Graph View

Code

在 Code 分頁可以看到每一個 DAG 實際的程式碼,至於實際如何寫一個 DAG 會在下篇文章討論。
Code

Connections

最後來看看 Airflow 如何管理 Credential,在最上面的 Admins 點開後,就會看到 Connections 的選項,近來後會發現已經有一些預設的 Connections。
Connections
在這個頁面的最上面有一個 Warning,意思是目前密碼都是用明文(plaintext)的方式儲存,如果想要用加密方式儲存的話,要在安裝 cryptography。安裝 cryptography 後,再重新執行 Webserver,這個 Warning 就會消失了。

> pip install apache-airflow[crypto]

執行 DAG

隨意在進入一個 DAG 的 Tree View,這裡以 example_branch_dop_operator_v3 做示範,進去後先點擊標題左邊的 Off 變成 On,過一陣子後再重新整理 Tree View,會發現 Scheduler 已經依照 Schedule */1 * * * * 開始執行 example_branch_dop_operator_v3 了。

點一下 Trigger DAG,這代表手動 trigger 一個 DAG,會發現手動 trigger 的執行是沒有黑色框框的,可以看圖片中最後一次的執行。Tree View 用這樣的方式呈現哪些是 Scheduler 依照 Schedule 自動排程的 DAG,哪些是使用者手動 trigger 的。
DAG 執行頁面

最後,隨意點擊一個有顏色的方形框框,右上角會有一個 View Log 按鈕,點進去後可以看到 Task 執行的 Log,透過這個功能,我們也可以查找每一次 Task 執行的紀錄。
Task 執行成果

結語

今天帶大家回顧做 Data Cleaning 或是 ETL 時可能會遇到的問題,並且向大家展示了 Airflow 從安裝、基礎功能到實際執行。從中我們也了解到了 Airflow 如何記錄每次執行的成果,以及如何儲存 Credential 資料。明天會帶大家動手做 DAG,相信在寫過 DAG 後,大家會對 Airflow 的功能更有感受!

#Airflow #Data Pipeline #Data Cleaning #ETL







你可能感興趣的文章

AppWorks School Batch #16 Front-End Class 學習筆記&心得(駐點階段二:專題研討+協作練習專案)

AppWorks School Batch #16 Front-End Class 學習筆記&心得(駐點階段二:專題研討+協作練習專案)

Laravel x Vue Conf TW 2021

Laravel x Vue Conf TW 2021

使用 node.js 寫出串接 API 的程式

使用 node.js 寫出串接 API 的程式






留言討論