凯时集团凯时集团


凯时官网

AirFlow简介

1, 简介

​ Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2,执行器(Executor)

​ Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

LocalExecutor:多进程本地执行任务

CeleryExecutor:分布式调度,生产常用

DaskExecutor :动态任务调度,主要用于数据分析

在当前项目使用CeleryExecutor作为执行器。

celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,当前项目使用的是rabbitmq,系统整体结构如下所示:

其中:

turing为外部系统

GDags服务帮助拼接成dag

master节点webui管理dags、日志等信息

scheduler负责调度,只支持单节点

worker负责执行具体dag中的task, worker支持多节点

在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

3,任务处理器

airflow内置了丰富的任务处理器,用于实现不同类型的任务:

BashOperator : 执行bash命令

PythonOperator : 调用python代码

EmailOperator : 发送邮件

HTTPOperator : 发送 HTTP 请求

SqlOperator : 执行 SQL 命令

除了这些基本的构建块之外,还有更多的特定处理器:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator ...

在当前项目使用了HTTPOperator作为执行器,用于调用JAVA服务,整体结构图如下:

4,基本使用

4.1,常用命令

$ airflow webserver -D 守护进程运行webserver

$ airflow scheduler -D 守护进程运行调度器

$ airflow worker -D 守护进程运行调度器

$ airflow worker -c 1 -D 守护进程运行celery worker并指定任务并发数为1

$ airflow pause dag_id  暂停任务

$ airflow unpause dag_id 取消暂停,等同于在管理界面打开off按钮

$ airflow list_tasks dag_id 查看task列表

$ airflow clear dag_id 清空任务实例

$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 运行整个dag文件

$ airflow run dag_id task_id execution_date 运行task

4.2,web管控界面的使用

启动web管控界面需要执行airflow webserver -D命令,默认访问端口是8080

http://110.55.63.51:8080/admin/

(1) 任务启动暂停开关

(2) 任务运行状态

(3) 待执行,未分发的任务

(4) 手动触发执行任务

(5) 任务管控界面

选择对应dag栏目,点击(5) Graph View即可进入任务管控界面

点击对应的任务,会弹出一个任务管控台,主要几个功能如下:

View Log : 查看任务日志

Run : 运行选中任务

Clear:清空任务队列

Mark Success : 标记任务为成功状态

4.3 通过定义DAG文件实现创建定时任务

1) 普通任务

from datetime import timedelta, datetimeimport airflowfrom airflow import DAGfrom airflow.operators.bash_operator import BashOperatorfrom airflow.operators.dummy_operator import DummyOperatordefault_args = { #默认参数 "owner": "jifeng.si", #dag拥有者,用于权限管控 "depends_on_past": False, #是否依赖上游任务 "start_date": datetime(2018, 5, 2), #任务开始时间,默认utc时间 "email": ["123456789@qq.com"], #告警通知邮箱地址 "email_on_failure": False, "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5),}dag = DAG( "example_hello_world_dag", #dag的id default_args=default_args, description="my first DAG", #描述 schedule_interval="*/25 * * * *", # crontab start_date=datetime(2018, 5, 28) #开始时间,覆盖默认参数)def print_hello(): return "Hello world!"dummy_operator = DummyOperator(task_id="dummy_task", dag=dag)hello_operator = BashOperator( #通过BashOperator定义执行bash命令的任务 task_id="sleep_task", depends_on_past=False, bash_command="echo `date` >> /home/py/test.txt", dag=dag)dummy_operator >> hello_operator #设置任务依赖关系#dummy_operator.set_downstream(hello_operator)

2) 定义http任务并使用本地时间

import osfrom datetime import timedelta, datetimeimport pytzfrom airflow.operators.http_operator import SimpleHttpOperatorfrom airflow.models import DAGdefault_args = { "owner": "cord", # "depends_on_past": False, "depends_on_past": True, "wait_for_downstream": True, "execution_timeout": timedelta(minutes=3), "email": ["123456789@qq.com"], "email_on_failure": False, "email_on_retry": False, "retries": 1, "retry_delay": timedelta(minutes=5),}#将本地时间转换为utc时间,再设置为start_datetz = pytz.timezone("Asia/Shanghai")dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)os.environ["AIRFLOW_CONN_HTTP_TEST"]="http://localhost:9090"dag = DAG( "bm01", default_args=default_args, description="my DAG", schedule_interval="*/2 * * * *", start_date=utc_dt)#通过SimpleHttpOperator定义http任务task1 = SimpleHttpOperator( task_id="get_op1", http_conn_id="http_test", method="GET", endpoint="test1", data={}, headers={}, dag=dag)task2 = SimpleHttpOperator( task_id="get_op2", http_conn_id="http_test", method="GET", endpoint="test2", data={}, headers={}, dag=dag)task1 >> task2

4.4 crontab语法

crontab格式如下所示:

# ┌───────────── minute (0 - 59)# │ ┌───────────── hour (0 - 23)# │ │ ┌───────────── day of month (1 - 31)# │ │ │ ┌───────────── month (1 - 12)# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;# │ │ │ │ │ 7 is also Sunday on some systems)# │ │ │ │ │# │ │ │ │ │# * * * * * command to execute

是否必须取值范围可用特殊符号备注
MinutesYes0–59* , -
HoursYes0–23* , -
Day of monthYes1–31* , - ? L W? L W部分实现可用
MonthYes1–12 or JAN–DEC* , -
Day of weekYes0–6 or SUN–SAT* , - ? L #? L W 部分实现可用
YearNo1970–2099* , -标准实现里无这一项

特殊符号功能说明:

逗号(,)​ 逗号用于分隔一个列表里的元素,比如 "MON,WED,FRI" 在第五域(day of week)表示Mondays, Wednesdays and Fridays。

连字符(-)​ 连字符用于表示范围,比如2000–2010表示2000到2010之间的每年,包括这两年(闭区间)。

百分号(%)​ 用于命令(command)中的格式化

L​ 表示last,最后一个,比如第五域,5L表示当月最后一个星期五

WW表示weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为15L ,这表示临近当月15附近的工作日,假如15号是星期六,那么定时器会在14号执行,如果15号是星期天,那么定时器会在16号执行,也就是说只会在离指定日期最近的那天执行。

井号##用于第五域(day of week),#后面跟着一个1~5之间的数字,这个用于表示第几个星期,比如5#3表示第三个星期五

?​ 在有些实现里面,*的功能相同,还有一些实现里面?表示cron的启动时间,比如 当cron服务在8:25am启动,则? ? * * * *会更新为25 8 * * * *, 直到下一次cron服务重新启动,定时器会再次更新。

//一般与*组合使用,后面跟着一个数字,表示频率,比如在第一域(Minutes)中*/5表示每5分钟,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的缩写

参考链接:https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral

https://en.wikipedia.org/wiki/Cron

欢迎阅读本文章: 李万兵

凯时国际注册地址

凯时官网