在上一篇動手寫 DAG 中,我們用了 Operator 來定義不同的 Task,像是執行 Bash 的 BashOperator,或是執行 Python 的 PythonOperator,不過如果剛好有我們想要的功能是 Airflow 沒有定義的,該怎麼辦呢?還好 Airflow 讓我們可以自定義 Operator,讓 Airflow 更有擴展性!
實作 Operator
設置 Operator 路徑
與 DAG 類似,自定義的 Operator 也會需要一個指定的資料夾存放,這個資料夾預設是在 {AIRFLOW_HOME}/plugins,資料夾的位置也可以在 Airflow 的設定檔裡調整,在第五篇《Airflow 設定檔簡介》裡會再詳細的跟大家介紹。
> mkdir ~/airflow/plugins
> touch ~/airflow/plugins/my_operator.py
目標
這次要實作的 Operator 可以傳入使用者的名字,並且在執行時紀錄跟使用者打招呼的時間。
Constructor
自定義的 Operator 會繼承 BaseOperator,所以我們只需要在 Constructor 中定義要傳入的參數,並實作 execute 函式就能完成 Operator 的基礎功能。
在 Constructor 中,我們可以使用 Airflow 定義好的裝飾器 @apply_defaults ,@apply_defaults 可以幫我們的 Operator 載入 Default Arguments,並存放在 kwargs['params'],讓我們可以在 constructor 中調用。
class GreetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
Execute
execute 函式帶有 context 變數,context 帶有一些好用的功能,像是 ti 或 task_instance 代表 TaskInstance,透過 TaskInstance 我們可以跟 XCom 互動,上一篇我們展示了 BashOperator 透過 xcom_push 參數,把 bash 執行的結果存入 XCom。
在 Operator 中,執行結束回傳的值會被記錄在 XCom 的 return_value 裡,如果我們想要存更多東西在 XCom 裡,可以怎麼做呢?我們可以透過 TaskInstance 的 xcom_push 功能,存入其他的值,像是下面的例子,我們除了回傳 message 存入 return_value 外,我們另外還存入了現在的 timestamp 在 time 裡面。
def execute(self, context):
message = "Hello {}".format(self.name)
context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
print(message)
return message
定義 Plugin
最後定義 AirflowPlugin,讓我們待會再可以在 DAG 中引入我們的 Operator。
class AirflowTestPlugin(AirflowPlugin):
name = 'greet_operator'
operators = [GreetOperator]
完整的 Operator Code
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class GreetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
print(message)
return message
class AirflowTestPlugin(AirflowPlugin):
name = 'greet_operator'
operators = [GreetOperator]
再次實作 DAG
目標
這次我們要實作的 DAG 會使用剛剛定義的 Operator 來打招呼,並且把在我們 Operator 中存入的 time 印出來。
完整 DAG Code
由於上一篇講解過如何實作 DAG,所以這邊就直接給完整的 Code,大家可以先在 {AIRFLOW_HOME}/dags 底下創立另一個 DAG 檔案。
> touch ~/airflow/dags/my_operator_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.greet_operator import GreetOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'someone',
'depends_on_past': False,
'start_date': datetime(2020, 2, 25),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=10),
}
dag = DAG(
dag_id='my_operator_dag',
description='my operator dag',
default_args=default_args,
schedule_interval='*/1 * * * *'
)
greet = GreetOperator(
task_id='greet',
name='someone',
dag=dag
)
def get_timestamp(**context):
print(f"greeting time: {context['task_instance'].xcom_pull(task_ids='greet', key='time')}")
show_time = PythonOperator(
task_id='show_time',
python_callable=get_timestamp,
provide_context=True,
dag=dag
)
greet >> show_time
查詢 XCom
最後大家可以再次開始 Webserver 以及 Scheduler,執行剛剛的 DAG 後,我們可以在 Tree View 裡點擊任一個 greet 方框,之後點選 Task Instance Details。

接下來再點選 XCom,我們就可以在介面上看到 GreetOperator 存入的值。

總結
這次我們從頭實作了自定義的 Operator,透過自定義的 Operator,我們也可以讓 Airflow 有更多擴展,像是在 Airflow 裡面有預設的 redshift_to_s3_operator,但有些公司用的 Warehouse 並不是 Redshift,可能是 Snowflake,這時就可以自定義一個 snowflake_to_s3_operator。
到這邊我們介紹了 Airflow 的基礎功能,接下來我們要瞭解 Airflow 背後是如何運作的,以便我們未來在 Prod 環境部署 Airflow 時,可以更好的做 Troubleshooting。


