Airflow 動手玩:(三)動手寫 Operator


在上一篇動手寫 DAG 中,我們用了 Operator 來定義不同的 Task,像是執行 Bash 的 BashOperator,或是執行 Python 的 PythonOperator,不過如果剛好有我們想要的功能是 Airflow 沒有定義的,該怎麼辦呢?還好 Airflow 讓我們可以自定義 Operator,讓 Airflow 更有擴展性!

實作 Operator

設置 Operator 路徑

與 DAG 類似,自定義的 Operator 也會需要一個指定的資料夾存放,這個資料夾預設是在 {AIRFLOW_HOME}/plugins,資料夾的位置也可以在 Airflow 的設定檔裡調整,在第五篇《Airflow 設定檔簡介》裡會再詳細的跟大家介紹。

> mkdir ~/airflow/plugins
> touch ~/airflow/plugins/my_operator.py

目標

這次要實作的 Operator 可以傳入使用者的名字,並且在執行時紀錄跟使用者打招呼的時間。

Constructor

自定義的 Operator 會繼承 BaseOperator,所以我們只需要在 Constructor 中定義要傳入的參數,並實作 execute 函式就能完成 Operator 的基礎功能。
在 Constructor 中,我們可以使用 Airflow 定義好的裝飾器 @apply_defaults@apply_defaults 可以幫我們的 Operator 載入 Default Arguments,並存放在 kwargs['params'],讓我們可以在 constructor 中調用。

class GreetOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

Execute

execute 函式帶有 context 變數,context 帶有一些好用的功能,像是 titask_instance 代表 TaskInstance,透過 TaskInstance 我們可以跟 XCom 互動,上一篇我們展示了 BashOperator 透過 xcom_push 參數,把 bash 執行的結果存入 XCom

在 Operator 中,執行結束回傳的值會被記錄在 XComreturn_value 裡,如果我們想要存更多東西在 XCom 裡,可以怎麼做呢?我們可以透過 TaskInstancexcom_push 功能,存入其他的值,像是下面的例子,我們除了回傳 message 存入 return_value 外,我們另外還存入了現在的 timestamp 在 time 裡面。

    def execute(self, context):
        message = "Hello {}".format(self.name)
        context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
        print(message)
        return message

定義 Plugin

最後定義 AirflowPlugin,讓我們待會再可以在 DAG 中引入我們的 Operator。

class AirflowTestPlugin(AirflowPlugin):
    name = 'greet_operator'
    operators = [GreetOperator]

完整的 Operator Code

from datetime import datetime

from airflow.plugins_manager import AirflowPlugin
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults


class GreetOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        context['ti'].xcom_push(key='time', value=datetime.now().timestamp())
        print(message)
        return message


class AirflowTestPlugin(AirflowPlugin):
    name = 'greet_operator'
    operators = [GreetOperator]

再次實作 DAG

目標

這次我們要實作的 DAG 會使用剛剛定義的 Operator 來打招呼,並且把在我們 Operator 中存入的 time 印出來。

完整 DAG Code

由於上一篇講解過如何實作 DAG,所以這邊就直接給完整的 Code,大家可以先在 {AIRFLOW_HOME}/dags 底下創立另一個 DAG 檔案。

> touch ~/airflow/dags/my_operator_dag.py
from datetime import datetime, timedelta


from airflow import DAG
from airflow.operators.python_operator import PythonOperator


from airflow.operators.greet_operator import GreetOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'someone',
    'depends_on_past': False,
    'start_date': datetime(2020, 2, 25),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=10),
}


dag = DAG(
    dag_id='my_operator_dag',
    description='my operator dag',
    default_args=default_args,
    schedule_interval='*/1 * * * *'
)


greet = GreetOperator(
    task_id='greet',
    name='someone',
    dag=dag
)


def get_timestamp(**context):
    print(f"greeting time: {context['task_instance'].xcom_pull(task_ids='greet', key='time')}")


show_time = PythonOperator(
    task_id='show_time',
    python_callable=get_timestamp,
    provide_context=True,
    dag=dag
)


greet >> show_time

查詢 XCom

最後大家可以再次開始 Webserver 以及 Scheduler,執行剛剛的 DAG 後,我們可以在 Tree View 裡點擊任一個 greet 方框,之後點選 Task Instance Details。

Task Instance Details

接下來再點選 XCom,我們就可以在介面上看到 GreetOperator 存入的值。

XCom

總結

這次我們從頭實作了自定義的 Operator,透過自定義的 Operator,我們也可以讓 Airflow 有更多擴展,像是在 Airflow 裡面有預設的 redshift_to_s3_operator,但有些公司用的 Warehouse 並不是 Redshift,可能是 Snowflake,這時就可以自定義一個 snowflake_to_s3_operator
到這邊我們介紹了 Airflow 的基礎功能,接下來我們要瞭解 Airflow 背後是如何運作的,以便我們未來在 Prod 環境部署 Airflow 時,可以更好的做 Troubleshooting。

#Airflow #Data Pipeline #Data Cleaning #ETL







你可能感興趣的文章

什麼是 樣板引擎(Template Engine) ?

什麼是 樣板引擎(Template Engine) ?

Follow Along Links

Follow Along Links

How to Request an AWS SSL/TLS Certificate

How to Request an AWS SSL/TLS Certificate






留言討論