Airflow 可以通过 KubernetesExecutor 来让任务在 k8s 上运行。KubernetesExecutor 是 Airflow 的一个执行器,它可以将任务转换为 Kubernetes 资源对象并提交给 k8s 集群运行。
下面是使用 KubernetesExecutor 在 k8s 上运行 Airflow 任务的步骤:
安装 Kubernetes 运行环境。可以使用 Minikube 或者其他 k8s 发行版。
配置 KubernetesExecutor。在 Airflow 的配置文件中设置以下参数:
# 使用 KubernetesExecutor
executor = KubernetesExecutor
# Kubernetes 集群的 API 地址
kube_config_path = /path/to/kube/config
# 指定 Docker 镜像仓库地址和镜像名称
image_pull_secrets = [{"name": "my-registry-key"}]
image = dockerhub/my-image:latest
# Kubernetes 的 Namespace 名称
namespace = default
# Kubernetes 的 ServiceAccount 名称
service_account_name = airflow
- 编写 DAG 文件。在 DAG 中定义任务和依赖关系,并指定对应的 Kubernetes 配置,如下所示:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 0
}
with DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1)) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World!"',
executor_config={
'KubernetesExecutor': {
'request_memory': '128Mi',
'limit_memory': '256Mi',
'image_pull_policy': 'Always'
}
}
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Task 2 completed"',
executor_config={
'KubernetesExecutor': {
'request_memory': '128Mi',
'limit_memory': '256Mi',
'image_pull_policy': 'Always'
}
},
trigger_rule='all_done'
)
task1 >> task2
- 启动 Airflow。使用以下命令启动 Airflow:
airflow scheduler --daemon
airflow webserver --daemon
运行 DAG。在 Airflow 的 Web UI 中启用 DAG,并手动触发任务运行。
查看任务状态。可以通过 k8s 的命令或者 Dashboard 来查看任务的运行状态和日志输出。
注意:在使用 KubernetesExecutor 时,需要确保 Airflow 所在的 Pod 具有足够的权限来访问 k8s 集群。可以通过定义 ServiceAccount 和 RoleBinding 来授权。