在上一篇動手寫 DAG 中,我們用了 Operator 來定義不同的 Task,像是執行 Bash 的 BashOperator
,或是執行 Python 的 PythonOperator
,不過如果剛好有我們想要的功能是 Airflow 沒有定義的,該怎麼辦呢?還好 Airflow 讓我們可以自定義 Operator,讓 Airflow 更有擴展性!
實作 Operator
設置 Operator 路徑
與 DAG 類似,自定義的 Operator 也會需要一個指定的資料夾存放,這個資料夾預設是在 {AIRFLOW_HOME}/plugins
,資料夾的位置也可以在 Airflow 的設定檔裡調整,在第五篇《Airflow 設定檔簡介》裡會再詳細的跟大家介紹。
> mkdir ~/airflow/plugins
> touch ~/airflow/plugins/my_operator.py
目標
這次要實作的 Operator 可以傳入使用者的名字,並且在執行時紀錄跟使用者打招呼的時間。
Constructor
自定義的 Operator 會繼承 BaseOperator
,所以我們只需要在 Constructor 中定義要傳入的參數,並實作 execute
函式就能完成 Operator 的基礎功能。
在 Constructor 中,我們可以使用 Airflow 定義好的裝飾器 @apply_defaults
,@apply_defaults
可以幫我們的 Operator 載入 Default Arguments,並存放在 kwargs['params']
,讓我們可以在 constructor 中調用。
class GreetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
Execute
execute
函式帶有 context
變數,context 帶有一些好用的功能,像是 ti
或 task_instance
代表 TaskInstance
,透過 TaskInstance
我們可以跟 XCom
互動,上一篇我們展示了 BashOperator
透過 xcom_push
參數,把 bash 執行的結果存入 XCom
。
在 Operator 中,執行結束回傳的值會被記錄在 XCom
的 return_value
裡,如果我們想要存更多東西在 XCom
裡,可以怎麼做呢?我們可以透過 TaskInstance
的 xcom_push
功能,存入其他的值,像是下面的例子,我們除了回傳 message
存入 return_value
外,我們另外還存入了現在的 timestamp 在 time
裡面。
def execute(self, context):
message = "Hello {}".format(self.name)
context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
print(message)
return message
定義 Plugin
最後定義 AirflowPlugin
,讓我們待會再可以在 DAG 中引入我們的 Operator。
class AirflowTestPlugin(AirflowPlugin):
name = 'greet_operator'
operators = [GreetOperator]
完整的 Operator Code
from datetime import datetime
from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class GreetOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
print(message)
return message
class AirflowTestPlugin(AirflowPlugin):
name = 'greet_operator'
operators = [GreetOperator]
再次實作 DAG
目標
這次我們要實作的 DAG 會使用剛剛定義的 Operator 來打招呼,並且把在我們 Operator 中存入的 time
印出來。
完整 DAG Code
由於上一篇講解過如何實作 DAG,所以這邊就直接給完整的 Code,大家可以先在 {AIRFLOW_HOME}/dags
底下創立另一個 DAG 檔案。
> touch ~/airflow/dags/my_operator_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.greet_operator import GreetOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'someone',
'depends_on_past': False,
'start_date': datetime(2020, 2, 25),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=10),
}
dag = DAG(
dag_id='my_operator_dag',
description='my operator dag',
default_args=default_args,
schedule_interval='*/1 * * * *'
)
greet = GreetOperator(
task_id='greet',
name='someone',
dag=dag
)
def get_timestamp(**context):
print(f"greeting time: {context['task_instance'].xcom_pull(task_ids='greet', key='time')}")
show_time = PythonOperator(
task_id='show_time',
python_callable=get_timestamp,
provide_context=True,
dag=dag
)
greet >> show_time
查詢 XCom
最後大家可以再次開始 Webserver 以及 Scheduler,執行剛剛的 DAG 後,我們可以在 Tree View 裡點擊任一個 greet
方框,之後點選 Task Instance Details。
接下來再點選 XCom
,我們就可以在介面上看到 GreetOperator
存入的值。
總結
這次我們從頭實作了自定義的 Operator,透過自定義的 Operator,我們也可以讓 Airflow 有更多擴展,像是在 Airflow 裡面有預設的 redshift_to_s3_operator
,但有些公司用的 Warehouse 並不是 Redshift,可能是 Snowflake,這時就可以自定義一個 snowflake_to_s3_operator
。
到這邊我們介紹了 Airflow 的基礎功能,接下來我們要瞭解 Airflow 背後是如何運作的,以便我們未來在 Prod 環境部署 Airflow 時,可以更好的做 Troubleshooting。