0%

Airflow入门篇

Airflow是啥?

Airflow is a platform to programmatically author, schedule and monitor workflows.
Airflow 是一个以编程方式创作、安排和监控工作流的平台。

  • Airflow 可以将工作流编排为任务的有向无环图 (DAG)。
  • Airflow 调度器调度任务在一组work上执行,同时满足指定的依赖项。
  • Airflow 拥有大量的命令行工具,可以轻松在DAG上执行复杂的操作。
  • Airflow 界面友好,管道、进度、问题一目了然。

更多内容参考:Airflow官方文档

Airflow视频教程:Airflow从零到神

安装和初始化

安装配置airflow,主要参考Airflow本地快速运行Airflow Installation

前置条件:安装配置好python3.6,详情参考《MacOS上软件配置》

1、安装airflow

1
pip install "apache-airflow[celery]==2.0.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.0.0/constraints-3.6.txt"

2、初始化

1
2
3
4
5
6
7
airflow db init
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org

默认安装目录为 /Users/voidking/airflow,如果要指定安装目录的话,可以在执行初始化命令前配置 AIRFLOW_HOME 变量。

启停Airflow

1、启动airflow

1
2
airflow webserver -D  # 启动web
airflow scheduler -D # 启动调度器

2、停止airflow,参考Airflow任务调度系统

1
2
3
4
5
6
7
8
9
10
# 停止airflow webserver
ps -ef | grep 'airflow' | grep 'webserver' | awk '{print $2}' | xargs kill -9
cd $AIRFLOW_HOME
rm -rf airflow-webserver.pid
rm -rf airflow-webserver-monitor.pid

# 停止airflow scheduler
ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs kill -9
cd $AIRFLOW_HOME
rm -rf airflow-scheduler.pid

3、封装一个启停脚本
脚本链接

常用命令

1
airflow dags list

实用配置

修改airflow.cfg:

1
2
3
4
5
6
7
8
9
10
11
# 指定dags目录
dags_folder = /Users/voidking/airflow/dags

# dags目录扫描时间间隔
dag_dir_list_interval = 30

# 启用api请求
auth_backend = airflow.api.auth.backend.basic_auth

# 日志路径和日志文件名
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}.log

更多内容参考Configuration Reference

Hello DAGs

1、启动airflow
启动airflow webserver和airflow scheduler,然后访问本地8080端口,看到airflow管理页面

2、编写dags脚本
编写第一个dags脚本,命名为 hello.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
}

dag = DAG(
dag_id='hello',
default_args=default_args,
description='The first DAG',
catchup=False,
schedule_interval=None,
start_date=datetime.datetime(2021, 1, 1),
tags=['example'],
)

t1 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello dags!"',
dag=dag
)

t2 = BashOperator(
task_id='print_hello_again',
bash_command='echo "Hello dags again!"',
dag=dag
)

t1 >> t2

简单测试dags脚本:python hello.py

3、dags脚本放入到airflow dags目录
hello.py 放入 /Users/voidking/airflow/dags 目录(或者创建软链),使airflow可以扫描读取到它(前提是启动scheduler)。
注意,新建的dags任务并不会马上出现在界面上,默认需要5分钟。
如果页面头部出现报错信息,请按照提示修改dags脚本。

另外,dags脚本也可以放入到 /Users/voidking/.pyenv/versions/3.6.4/lib/python3.6/site-packages/airflow/example_dags/ 目录,和示例脚本放到一起。

4、触发运行


这里的传参为空,因为不需要额外参数。

5、查看执行日志
点击查看hello的dags详情,查看执行日志。


以上,第一个dags脚本运行完成。

重跑任务

1、进入dags详情页面

2、点击需要重跑的task的小方块

3、点击Clear或者Run,重跑任务

TODO

tutorial.py详解、传参、接口调用
airflow + django

  • 本文作者: 好好学习的郝
  • 本文链接: https://www.voidking.com/dev-airflow-start/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!源站会及时更新知识点及修正错误,阅读体验也更好。欢迎分享,欢迎收藏~