是的,Airflow 使用 KubernetesExecutor 通过 Kubernetes API 创建 Pod 来执行任务。KubernetesExecutor 包含以下功能:
将 Airflow 任务转换为 Kubernetes 的资源对象并提交到 k8s 集群运行。
监视任务的状态,并将状态信息写回 Airflow 数据库。
处理任务容器中的日志输出,并将日志写回 Airflow 数据库。
在任务完成后删除对应的 Pod 和相关资源。
使用 KubernetesExecutor 时,需要在 Airflow 的配置文件中设置 executor 参数为 KubernetesExecutor,并配置相应的 kube_config_path、image_pull_secrets、image、namespace、service_account_name 等参数来指定 k8s 集群和相关配置。
然后,在 DAG 文件中,可以通过 BashOperator 或者其他 Operator 来定义任务,并指定 executor_config 参数来配置任务所需的 Kubernetes 资源,如 requests_memory、limit_memory、image_pull_policy 等。
例如,以下代码片段展示了使用 KubernetesExecutor 运行简单 BashOperator 的示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 0
}
with DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1)) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World!"',
executor_config={
'KubernetesExecutor': {
'request_memory': '128Mi',
'limit_memory': '256Mi',
'image_pull_policy': 'Always'
}
}
)
在执行 DAG 时,Airflow 将使用 KubernetesExecutor 在 k8s 集群上创建一个名为 task1 的 Pod,并在其中运行 echo “Hello World!” 命令。当任务完成后,Pod 和相关资源将被自动删除。
需要注意的是,在使用 KubernetesExecutor 运行任务时,需要确保 Airflow 所在的 Pod 具有足够的权限来访问 k8s 集群。可以通过定义 ServiceAccount 和 RoleBinding 来授权。