在这篇文章中,我们将为您详细介绍无法启动AirflowWorker/鲜花,需要澄清Airflow架构以确认安装正确的内容。此外,我们还会涉及一些关于Airflow-PythonOperator在Win
在这篇文章中,我们将为您详细介绍无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确的内容。此外,我们还会涉及一些关于Airflow - PythonOperator 在 Window 10 上需要 termios 模块、airflow FAQ、airflow 介绍、airflow 安装的知识,以帮助您更全面地了解这个主题。
本文目录一览:- 无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确
- Airflow - PythonOperator 在 Window 10 上需要 termios 模块
- airflow FAQ
- airflow 介绍
- airflow 安装
无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确
在其他计算机上运行工作程序会导致以下指定的错误。我已按照配置说明进行操作,并已同步dags文件夹。
我还想确认,RabbitMQ和PostgreSQL仅需要安装在Airflow核心计算机上,而不需要安装在工作线程上(工作线程仅连接到核心)。
该安装程序的规格详细说明如下:
气流核心/服务器计算机
已安装以下内容:
- Python 2.7与
- 气流(AIRFLOW_HOME =〜/气流)
- 芹菜
- psycogp2
- 兔子MQ
- PostgreSQL的
airflow.cfg中进行的配置:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.2:5672//
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
进行的测试:
- RabbitMQ正在运行
- 可以连接到PostgreSQL并已确认Airflow已创建表
- 可以启动和查看Web服务器(包括自定义dag)
。
。
气流工作者计算机
已安装以下内容:
- Python 2.7与
- 气流(AIRFLOW_HOME =〜/气流)
- 芹菜
- psycogp2
airflow.cfg中进行的配置与服务器中的配置完全相同:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.2:5672//
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
在工作计算机上运行的命令的输出:
运行时airflow flower
:
ubuntu@airflow_client:~/airflow$ airflow flower[2016-06-13 04:19:42,814] {__init__.py:36} INFO - Using executor CeleryExecutorTraceback (most recent call last): File "/home/ubuntu/anaconda2/bin/airflow", line 15, in <module> args.func(args) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 576, in flower os.execvp("flower", [''flower'', ''-b'', broka, port, api]) File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 346, in execvp _execvpe(file, args) File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 382, in _execvpe func(fullname, *argrest)OSError: [Errno 2] No such file or directory
运行时airflow worker
:
ubuntu@airflow_client:~$ airflow worker[2016-06-13 04:08:43,573] {__init__.py:36} INFO - Using executor CeleryExecutor[2016-06-13 04:08:43,935: ERROR/MainProcess] Unrecoverable error: ImportError(''No module named postgresql'',)Traceback (most recent call last): File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start self.blueprint.start(self) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start self.on_start() File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 169, in on_start string(self.colored.cyan('' \n'', self.startup_info())), File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 230, in startup_info results=self.app.backend.as_uri(), File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 325, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 626, in backend return self._get_backend() File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 444, in _get_backend self.loader) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 68, in get_backend_by_url return get_backend_cls(backend, loader), url File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 49, in get_backend_cls cls = symbol_by_name(backend, aliases) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name module = imp(module_name, package=package, **kwargs) File "/home/ubuntu/anaconda2/lib/python2.7/importlib/__init__.py", line 37, in import_module __import__(name)ImportError: No module named postgresql
当celery_result_backend
被更改为默认db+mysql://airflow:airflow@localhost:3306/airflow
和airflowworker
被再次运行结果是:
ubuntu@airflow_client:~/airflow$ airflow worker [2016-06-13 04:17:32,387] {__init__.py:36} INFO - Using executor CeleryExecutor -------------- celery@airflow_client2 v3.1.23 (Cipater)---- **** ----- --- * *** * -- Linux-3.19.0-59-generic-x86_64-with-debian-jessie-sid-- * - **** --- - ** ---------- [config]- ** ---------- .> app: airflow.executors.celery_executor:0x7f5cb65cb510- ** ---------- .> transport: amqp://username:**@192.168.1.2:5672//- ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow- *** --- * --- .> concurrency: 16 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> default exchange=default(direct) key=celery[2016-06-13 04:17:33,385] {__init__.py:36} INFO - Using executor CeleryExecutorStarting flask[2016-06-13 04:17:33,737] {_internal.py:87} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)[2016-06-13 04:17:34,536: WARNING/MainProcess] celery@airflow_client2 ready.
我想念什么?如何进一步诊断?
答案1
小编典典该ImportError: No module namedpostgresql
错误是由于您使用的无效的前缀celery_result_backend
。将数据库用作Celery后端时,连接URL必须以前缀db+
。参见
https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-
database-result-
backend
因此,请替换:
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
与类似:
celery_result_backend = db+postgresql://username:password@192.168.1.2:5432/airflow
Airflow - PythonOperator 在 Window 10 上需要 termios 模块
termios 仅适用于 Unix 系统:https://docs.python.org/3/library/tty.html
Airflow 不支持 Windows(请参阅 GitHub 上的 open issue)
出于开发目的,您可以使用 Breeze with docker WSL 2
在 Windows 上使用 Airflowairflow FAQ
1、关于airflow设置环境变量
在BashOperator的env(dict类型)参数中添加环境变量,当然也可以在声明DAG时的default_args中添加env的声明,但需要注意,如果设置了env,airflow就不再访问系统的环境变量,所以这里设置的env一定要包含程序运行所需的所有环境变量,否则会出错
import os local_env = os.environ local_env['PATH'] = os.environ['PATH'] + ":" + Variable.get('PATH') local_env['JAVA_HOME'] = Variable.get('JAVA_HOME') 在dag的default_args中添加'env':dict(local_env)
2、mark_success
当task执行完成,但只是返回值为失败的时候,可以不rerun该task,而是marksuccess,然后继续执行下面的task
UI中的操作暂时未成功,点击总是提示“No task instances to markas successful ”,目前可以通过强制rerun下一个task,使下一个task成功,虽然失败的task的状态未改变,但已经不影响下面的task执行
强制rerun的命令为airflowrun -f -i dag_id task_id execute_time
3、rerun
在UI界面中clear某个task,则airflow会自动rerun该task
当使用run按钮时,要求使用CeleryExecutor才可执行,因需要使用redis数据库,故暂时未尝试
4、hold某个dag或task
暂时只支持hold(pause)某个dag,可以使用命令airflow pause/unpause dagid来pause或unpause某个dag,也可在页面中点击dag前面的On/Off按钮来unpause/pause某个dag,若当前dag有某个task已经启动,则无法停止,只能保证下一个task不执行
暂时无法hold或pause某个task,只支持以dag为单位pause
5、当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格
t1 = BashOperator( task_id='process_rankinglist',bash_command='/home/rankinglist_processor/run.sh ',dag=dag)
6、airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量
templated_command = """ echo "dag_run:{{ dag_run }}" echo "run_id:{{ run_id }}" echo "execution_date:{{ execution_date }}" echo "ts:{{ ts }}" echo "ti:{{ ti }}" sleep 3 """ t1 = BashOperator( task_id='xcom',bash_command=templated_command,dag=dag)
但是需要注意,其中的execution_date并不是task的真正执行时间,而是上一周期task的执行时间。
即可以理解为“actual_execution_date= execution_date +schedual_interval”,或者我们换一种说法,我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am,所以获取某个task的真正执行时间,需要获取execution_date的下个执行周期时间,即使用dag.following_schedule(execution_date)
7、使用Xcom在task之间传参
可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收
dag = DAG( dag_id='xcomtest',default_args=default_args,schedule_interval='*/2 * ** *') t1 = BashOperator( task_id='xcom',bash_command='''{{ ti.xcom_push(key='aaa',value='bbb') }}''',dag=dag) t2 = BashOperator( task_id='xcom2',bash_command='''echo"{{ ti.xcom_pull(key='aaa',task_ids='xcom') }}" ''',dag=dag) t2.set_upstream(t1)
airflow 介绍

1, 简介
Airflow 是一个可编程,调度和监控的工作流平台,基于有向无环图 (DAG),airflow 可以定义一组有依赖的任务,按照依赖依次执行。airflow 提供了丰富的命令行工具用于系统管控,而其 web 管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。
2,执行器 (Executor)
Airflow 本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:
SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
LocalExecutor:多进程本地执行任务
CeleryExecutor:分布式调度,生产常用
DaskExecutor :动态任务调度,主要用于数据分析
在当前项目使用 CeleryExecutor
作为执行器。
celery 是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如 redis 或者 rabbitmq,当前项目使用的是 rabbitmq,系统整体结构如下所示:
其中:
turing 为外部系统
GDags 服务帮助拼接成 dag
master 节点 webui 管理 dags、日志等信息
scheduler 负责调度,只支持单节点
worker 负责执行具体 dag 中的 task, worker 支持多节点
在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的 dag 文件必须和 master 节点的 dag 文件保持一致,不然任务的执行会出问题。
3,任务处理器
airflow 内置了丰富的任务处理器,用于实现不同类型的任务:
BashOperator : 执行 bash 命令
PythonOperator : 调用 python 代码
EmailOperator : 发送邮件
HTTPOperator : 发送 HTTP 请求
SqlOperator : 执行 SQL 命令
除了这些基本的构建块之外,还有更多的特定处理器:DockerOperator
,HiveOperator
,S3FileTransferOperator
,PrestoToMysqlOperator
,SlackOperator
...
在当前项目使用了 HTTPOperator
作为执行器,用于调用 JAVA 服务,整体结构图如下:
关于 airflow 的环境搭建可以参考另外一篇博客: https://www.cnblogs.com/cord/p/9226608.html
4,基本使用
4.1,常用命令
$ airflow webserver -D 守护进程运行webserver
$ airflow scheduler -D 守护进程运行调度器
$ airflow worker -D 守护进程运行调度器
$ airflow worker -c 1 -D 守护进程运行celery worker并指定任务并发数为1
$ airflow pause dag_id 暂停任务
$ airflow unpause dag_id 取消暂停,等同于在管理界面打开off按钮
$ airflow list_tasks dag_id 查看task列表
$ airflow clear dag_id 清空任务实例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE 运行整个dag文件
$ airflow run dag_id task_id execution_date 运行task
4.2,web 管控界面的使用
启动 web 管控界面需要执行 airflow webserver -D
命令,默认访问端口是 8080
http://110.55.63.51:8080/admin/
(1) 任务启动暂停开关
(2) 任务运行状态
(3) 待执行,未分发的任务
(4) 手动触发执行任务
(5) 任务管控界面
选择对应 dag 栏目,点击 (5) Graph View 即可进入任务管控界面
点击对应的任务,会弹出一个任务管控台,主要几个功能如下:
View Log : 查看任务日志
Run : 运行选中任务
Clear:清空任务队列
Mark Success : 标记任务为成功状态
4.3 通过定义 DAG 文件实现创建定时任务
1) 普通任务
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = { #默认参数
''owner'': ''jifeng.si'', #dag拥有者,用于权限管控
''depends_on_past'': False, #是否依赖上游任务
''start_date'': datetime(2018, 5, 2), #任务开始时间,默认utc时间
''email'': [''123456789@qq.com''], #告警通知邮箱地址
''email_on_failure'': False,
''email_on_retry'': False,
''retries'': 1,
''retry_delay'': timedelta(minutes=5),
}
dag = DAG(
''example_hello_world_dag'', #dag的id
default_args=default_args,
description=''my first DAG'', #描述
schedule_interval=''*/25 * * * *'', # crontab
start_date=datetime(2018, 5, 28) #开始时间,覆盖默认参数
)
def print_hello():
return ''Hello world!''
dummy_operator = DummyOperator(task_id=''dummy_task'', dag=dag)
hello_operator = BashOperator( #通过BashOperator定义执行bash命令的任务
task_id=''sleep_task'',
depends_on_past=False,
bash_command=''echo `date` >> /home/py/test.txt'',
dag=dag
)
dummy_operator >> hello_operator #设置任务依赖关系
#dummy_operator.set_downstream(hello_operator)
2) 定义 http 任务并使用本地时间
import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG
default_args = {
''owner'': ''cord'',
# ''depends_on_past'': False,
''depends_on_past'': True,
''wait_for_downstream'': True,
''execution_timeout'': timedelta(minutes=3),
''email'': [''123456789@qq.com''],
''email_on_failure'': False,
''email_on_retry'': False,
''retries'': 1,
''retry_delay'': timedelta(minutes=5),
}
#将本地时间转换为utc时间,再设置为start_date
tz = pytz.timezone(''Asia/Shanghai'')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
os.environ[''AIRFLOW_CONN_HTTP_TEST'']=''http://localhost:9090''
dag = DAG(
''bm01'',
default_args=default_args,
description=''my DAG'',
schedule_interval=''*/2 * * * *'',
start_date=utc_dt
)
#通过SimpleHttpOperator定义http任务
task1 = SimpleHttpOperator(
task_id=''get_op1'',
http_conn_id=''http_test'',
method=''GET'',
endpoint=''test1'',
data={},
headers={},
dag=dag)
task2 = SimpleHttpOperator(
task_id=''get_op2'',
http_conn_id=''http_test'',
method=''GET'',
endpoint=''test2'',
data={},
headers={},
dag=dag)
task1 >> task2
4.4 crontab 语法
crontab
格式如下所示:
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │ 7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * command to execute
域 | 是否必须 | 取值范围 | 可用特殊符号 | 备注 |
---|---|---|---|---|
Minutes | Yes | 0–59 | * , - |
|
Hours | Yes | 0–23 | * , - |
|
Day of month | Yes | 1–31 | * , - ? L W |
? L W 部分实现可用 |
Month | Yes | 1–12 or JAN–DEC | * , - |
|
Day of week | Yes | 0–6 or SUN–SAT | * , - ? L # |
? L W 部分实现可用 |
Year | No | 1970–2099 | * , - |
标准实现里无这一项 |
特殊符号功能说明:
逗号 (,
)
逗号用于分隔一个列表里的元素,比如 "MON,WED,FRI" 在第五域 (day of week) 表示 Mondays, Wednesdays and Fridays。
连字符 (-
)
连字符用于表示范围,比如 2000–2010 表示 2000 到 2010 之间的每年,包括这两年 (闭区间)。
百分号 (%
)
用于命令 (command) 中的格式化
L
表示 last
,最后一个,比如第五域,5L
表示当月最后一个星期五
W
W
表示 weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为 15L
,这表示临近当月 15 附近的工作日,假如 15 号是星期六,那么定时器会在 14 号执行,如果 15 号是星期天,那么定时器会在 16 号执行,也就是说只会在离指定日期最近的那天执行。
井号#
#
用于第五域(day of week),# 后面跟着一个 1~5 之间的数字,这个用于表示第几个星期,比如 5#3
表示第三个星期五
?
在有些实现里面,?
与 *
的功能相同,还有一些实现里面 ?
表示 cron 的启动时间,比如 当 cron 服务在 8:25am 启动,则 ? ? * * * *
会更新为 25 8 * * * *
, 直到下一次 cron 服务重新启动,定时器会再次更新。
/
/
一般与 *
组合使用,后面跟着一个数字,表示频率,比如在第一域 (Minutes) 中 */5
表示每 5 分钟,是普通列表表示 5,10,15,20,25,30,35,40,45,50,55,00 的缩写
参考链接:
https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral
https://en.wikipedia.org/wiki/Cron
airflow 安装
1、安装
pip install apache-airflow -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
用豆瓣镜像,默认的镜像报错
今天的关于无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确的分享已经结束,谢谢您的关注,如果想了解更多关于Airflow - PythonOperator 在 Window 10 上需要 termios 模块、airflow FAQ、airflow 介绍、airflow 安装的相关知识,请在本站进行查询。
本文标签: