GVKun编程网logo

无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确

16

在这篇文章中,我们将为您详细介绍无法启动AirflowWorker/鲜花,需要澄清Airflow架构以确认安装正确的内容。此外,我们还会涉及一些关于Airflow-PythonOperator在Win

在这篇文章中,我们将为您详细介绍无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确的内容。此外,我们还会涉及一些关于Airflow - PythonOperator 在 Window 10 上需要 termios 模块、airflow FAQ、airflow 介绍、airflow 安装的知识,以帮助您更全面地了解这个主题。

本文目录一览:

无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确

无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确

在其他计算机上运行工作程序会导致以下指定的错误。我已按照配置说明进行操作,并已同步dags文件夹。

我还想确认,RabbitMQ和PostgreSQL仅需要安装在Airflow核心计算机上,而不需要安装在工作线程上(工作线程仅连接到核心)。

该安装程序的规格详细说明如下:

气流核心/服务器计算机

已安装以下内容:

  • Python 2.7与
    • 气流(AIRFLOW_HOME =〜/气流)
    • 芹菜
    • psycogp2
  • 兔子MQ
  • PostgreSQL的

airflow.cfg中进行的配置:

  • sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
  • executor = CeleryExecutor
  • broker_url = amqp://username:password@192.168.1.2:5672//
  • celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow

进行的测试:

  • RabbitMQ正在运行
  • 可以连接到PostgreSQL并已确认Airflow已创建表
  • 可以启动和查看Web服务器(包括自定义dag)

气流工作者计算机

已安装以下内容:

  • Python 2.7与
    • 气流(AIRFLOW_HOME =〜/气流)
    • 芹菜
    • psycogp2

airflow.cfg中进行的配置与服务器中的配置完全相同:

  • sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
  • executor = CeleryExecutor
  • broker_url = amqp://username:password@192.168.1.2:5672//
  • celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow

在工作计算机上运行的命令的输出:

运行时airflow flower

ubuntu@airflow_client:~/airflow$ airflow flower[2016-06-13 04:19:42,814] {__init__.py:36} INFO - Using executor CeleryExecutorTraceback (most recent call last):  File "/home/ubuntu/anaconda2/bin/airflow", line 15, in <module>    args.func(args)  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 576, in flower    os.execvp("flower", [''flower'', ''-b'', broka, port, api])  File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 346, in execvp    _execvpe(file, args)  File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 382, in _execvpe    func(fullname, *argrest)OSError: [Errno 2] No such file or directory

运行时airflow worker

ubuntu@airflow_client:~$ airflow worker[2016-06-13 04:08:43,573] {__init__.py:36} INFO - Using executor CeleryExecutor[2016-06-13 04:08:43,935: ERROR/MainProcess] Unrecoverable error: ImportError(''No module named postgresql'',)Traceback (most recent call last):  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start    self.blueprint.start(self)  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start    self.on_start()  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 169, in on_start    string(self.colored.cyan('' \n'', self.startup_info())),  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 230, in startup_info    results=self.app.backend.as_uri(),  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 325, in __get__    value = obj.__dict__[self.__name__] = self.__get(obj)  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 626, in backend    return self._get_backend()  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 444, in _get_backend    self.loader)  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 68, in get_backend_by_url    return get_backend_cls(backend, loader), url  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 49, in get_backend_cls    cls = symbol_by_name(backend, aliases)  File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name    module = imp(module_name, package=package, **kwargs)  File "/home/ubuntu/anaconda2/lib/python2.7/importlib/__init__.py", line 37, in import_module    __import__(name)ImportError: No module named postgresql

celery_result_backend被更改为默认db+mysql://airflow:airflow@localhost:3306/airflowairflowworker被再次运行结果是:

ubuntu@airflow_client:~/airflow$ airflow worker                                                                             [2016-06-13 04:17:32,387] {__init__.py:36} INFO - Using executor CeleryExecutor -------------- celery@airflow_client2 v3.1.23 (Cipater)---- **** ----- --- * ***  * -- Linux-3.19.0-59-generic-x86_64-with-debian-jessie-sid-- * - **** --- - ** ---------- [config]- ** ---------- .> app:         airflow.executors.celery_executor:0x7f5cb65cb510- ** ---------- .> transport:   amqp://username:**@192.168.1.2:5672//- ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow- *** --- * --- .> concurrency: 16 (prefork)-- ******* ---- --- ***** ----- [queues] -------------- .> default          exchange=default(direct) key=celery[2016-06-13 04:17:33,385] {__init__.py:36} INFO - Using executor CeleryExecutorStarting flask[2016-06-13 04:17:33,737] {_internal.py:87} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)[2016-06-13 04:17:34,536: WARNING/MainProcess] celery@airflow_client2 ready.

我想念什么?如何进一步诊断?

答案1

小编典典

ImportError: No module namedpostgresql错误是由于您使用的无效的前缀celery_result_backend。将数据库用作Celery后端时,连接URL必须以前缀db+。参见
https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-
database-result-
backend

因此,请替换:

celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow

与类似:

celery_result_backend = db+postgresql://username:password@192.168.1.2:5432/airflow

Airflow - PythonOperator 在 Window 10 上需要 termios 模块

Airflow - PythonOperator 在 Window 10 上需要 termios 模块

termios 仅适用于 Unix 系统:https://docs.python.org/3/library/tty.html

Airflow 不支持 Windows(请参阅 GitHub 上的 open issue)

出于开发目的,您可以使用 Breeze with docker WSL 2

在 Windows 上使用 Airflow

airflow FAQ

airflow FAQ

1、关于airflow设置环境变量

在BashOperator的env(dict类型)参数中添加环境变量,当然也可以在声明DAG时的default_args中添加env的声明,但需要注意,如果设置了envairflow就不再访问系统的环境变量,所以这里设置的env一定要包含程序运行所需的所有环境变量,否则会出错

import os
local_env = os.environ
local_env['PATH'] = os.environ['PATH'] + ":" + Variable.get('PATH')
local_env['JAVA_HOME'] = Variable.get('JAVA_HOME')
 
在dag的default_args中添加'env':dict(local_env)

2、mark_success

当task执行完成,但只是返回值为失败的时候,可以不rerun该task,而是marksuccess,然后继续执行下面的task

UI中的操作暂时未成功,点击总是提示“No task instances to markas successful ”,目前可以通过强制rerun下一个task,使下一个task成功,虽然失败的task的状态未改变,但已经不影响下面的task执行

强制rerun的命令为airflowrun -f -i dag_id task_id execute_time

3、rerun

在UI界面中clear某个task,则airflow会自动rerun该task

当使用run按钮时,要求使用CeleryExecutor才可执行,因需要使用redis数据库,故暂时未尝试

4、hold某个dag或task

暂时只支持hold(pause)某个dag,可以使用命令airflow pause/unpause dagid来pause或unpause某个dag,也可在页面中点击dag前面的On/Off按钮来unpause/pause某个dag,若当前dag有某个task已经启动,则无法停止,只能保证下一个task不执行

暂时无法hold或pause某个task,只支持以dag为单位pause

5、当使用BashOperator时,command需要调用脚本时,脚本后需要有个空格,否则报错,暂时不清楚原因,但加空格后可以正常执行,如下例,run.sh后需加空格

t1 = BashOperator(
       task_id='process_rankinglist',bash_command='/home/rankinglist_processor/run.sh ',dag=dag)

6、airflow提供了很多Macros Variables,可以直接使用jinja模板语言调用宏变量

templated_command = """
echo "dag_run:{{ dag_run }}"
echo "run_id:{{ run_id }}"
echo "execution_date:{{ execution_date }}"
echo "ts:{{ ts }}"
echo "ti:{{ ti }}"
sleep 3
"""
 
t1 = BashOperator(
       task_id='xcom',bash_command=templated_command,dag=dag)


但是需要注意,其中的execution_date并不是task的真正执行时间,而是上一周期task的执行时间。

即可以理解为“actual_execution_date= execution_date +schedual_interval”,或者我们换一种说法,我们在airflow上看到一个任务是6am执行的,而且interval=4hours,那么execution_date的值是2am,而不是6am,所以获取某个task的真正执行时间,需要获取execution_date的下个执行周期时间,即使用dag.following_schedule(execution_date)

7、使用Xcom在task之间传参

可以直接使用jinja模板语言,在{{}}中调用ti的xcom_push和xcom_pull方法,下面的例子为t1使用xcom_push推出了一个kv,t2通过taskid和key来接收

dag = DAG(
   dag_id='xcomtest',default_args=default_args,schedule_interval='*/2 * ** *')
 
t1 = BashOperator(
   task_id='xcom',bash_command='''{{ ti.xcom_push(key='aaa',value='bbb') }}''',dag=dag)
 
t2 = BashOperator(
   task_id='xcom2',bash_command='''echo"{{ ti.xcom_pull(key='aaa',task_ids='xcom') }}" ''',dag=dag)
t2.set_upstream(t1)

airflow 介绍

airflow 介绍

OSC 请你来轰趴啦!1028 苏州源创会,一起寻宝 AI 时代

1, 简介

​ Airflow 是一个可编程,调度和监控的工作流平台,基于有向无环图 (DAG),airflow 可以定义一组有依赖的任务,按照依赖依次执行。airflow 提供了丰富的命令行工具用于系统管控,而其 web 管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2,执行器 (Executor)

​ Airflow 本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

LocalExecutor:多进程本地执行任务

CeleryExecutor:分布式调度,生产常用

DaskExecutor :动态任务调度,主要用于数据分析

在当前项目使用 CeleryExecutor 作为执行器。

celery 是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如 redis 或者 rabbitmq,当前项目使用的是 rabbitmq,系统整体结构如下所示:

其中:

turing 为外部系统

GDags 服务帮助拼接成 dag

master 节点 webui 管理 dags、日志等信息

scheduler 负责调度,只支持单节点

worker 负责执行具体 dag 中的 task, worker 支持多节点

在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的 dag 文件必须和 master 节点的 dag 文件保持一致,不然任务的执行会出问题。

3,任务处理器

airflow 内置了丰富的任务处理器,用于实现不同类型的任务:

BashOperator : 执行 bash 命令

PythonOperator : 调用 python 代码

EmailOperator : 发送邮件

HTTPOperator : 发送 HTTP 请求

SqlOperator : 执行 SQL 命令

除了这些基本的构建块之外,还有更多的特定处理器:DockerOperatorHiveOperatorS3FileTransferOperatorPrestoToMysqlOperatorSlackOperator ...

在当前项目使用了 HTTPOperator 作为执行器,用于调用 JAVA 服务,整体结构图如下:

关于 airflow 的环境搭建可以参考另外一篇博客: https://www.cnblogs.com/cord/p/9226608.html

4,基本使用

4.1,常用命令

$ airflow webserver -D		 守护进程运行webserver
$ airflow scheduler -D		 守护进程运行调度器
$ airflow worker -D		     守护进程运行调度器
$ airflow worker -c 1 -D 	 守护进程运行celery worker并指定任务并发数为1
$ airflow pause dag_id  	暂停任务
$ airflow unpause dag_id 	 取消暂停,等同于在管理界面打开off按钮
$ airflow list_tasks dag_id  查看task列表
$ airflow clear dag_id       清空任务实例
$ airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  运行整个dag文件
$ airflow run dag_id task_id execution_date       运行task

4.2,web 管控界面的使用

启动 web 管控界面需要执行 airflow webserver -D 命令,默认访问端口是 8080

http://110.55.63.51:8080/admin/

(1) 任务启动暂停开关

(2) 任务运行状态

(3) 待执行,未分发的任务

(4) 手动触发执行任务

(5) 任务管控界面

选择对应 dag 栏目,点击 (5) Graph View 即可进入任务管控界面

点击对应的任务,会弹出一个任务管控台,主要几个功能如下:

View Log : 查看任务日志

Run : 运行选中任务

Clear:清空任务队列

Mark Success : 标记任务为成功状态

4.3 通过定义 DAG 文件实现创建定时任务

1) 普通任务

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = { #默认参数
    ''owner'': ''jifeng.si'', #dag拥有者,用于权限管控
    ''depends_on_past'': False,  #是否依赖上游任务
    ''start_date'': datetime(2018, 5, 2), #任务开始时间,默认utc时间
    ''email'': [''123456789@qq.com''], #告警通知邮箱地址
    ''email_on_failure'': False,
    ''email_on_retry'': False,
    ''retries'': 1,
    ''retry_delay'': timedelta(minutes=5),
}

dag = DAG(
    ''example_hello_world_dag'',  #dag的id
    default_args=default_args,
    description=''my first DAG'', #描述
    schedule_interval=''*/25 * * * *'', # crontab
    start_date=datetime(2018, 5, 28) #开始时间,覆盖默认参数
)

def print_hello():
    return ''Hello world!''

dummy_operator = DummyOperator(task_id=''dummy_task'', dag=dag)

hello_operator = BashOperator(   #通过BashOperator定义执行bash命令的任务
    task_id=''sleep_task'',
    depends_on_past=False,
    bash_command=''echo `date` >> /home/py/test.txt'',
    dag=dag
)

dummy_operator >> hello_operator #设置任务依赖关系
#dummy_operator.set_downstream(hello_operator)

2) 定义 http 任务并使用本地时间

import os
from datetime import timedelta, datetime
import pytz
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG

default_args = {
    ''owner'': ''cord'',
    # ''depends_on_past'': False,
    ''depends_on_past'': True,
    ''wait_for_downstream'': True,
    ''execution_timeout'': timedelta(minutes=3),
    ''email'': [''123456789@qq.com''],
    ''email_on_failure'': False,
    ''email_on_retry'': False,
    ''retries'': 1,
    ''retry_delay'': timedelta(minutes=5),
}

#将本地时间转换为utc时间,再设置为start_date
tz = pytz.timezone(''Asia/Shanghai'')
dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)

os.environ[''AIRFLOW_CONN_HTTP_TEST'']=''http://localhost:9090''

dag = DAG(
    ''bm01'',
    default_args=default_args,
    description=''my DAG'',
    schedule_interval=''*/2 * * * *'',
    start_date=utc_dt
)

#通过SimpleHttpOperator定义http任务
task1 = SimpleHttpOperator(
    task_id=''get_op1'',
    http_conn_id=''http_test'',
    method=''GET'',
    endpoint=''test1'',
    data={},
    headers={},
    dag=dag)

task2 = SimpleHttpOperator(
    task_id=''get_op2'',
    http_conn_id=''http_test'',
    method=''GET'',
    endpoint=''test2'',
    data={},
    headers={},
    dag=dag)

task1 >> task2

4.4 crontab 语法

crontab 格式如下所示:

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                       7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * *  command to execute
是否必须 取值范围 可用特殊符号 备注
Minutes Yes 0–59 * , -  
Hours Yes 0–23 * , -  
Day of month Yes 1–31 * , - ? L W ? L W 部分实现可用
Month Yes 1–12 or JAN–DEC * , -  
Day of week Yes 0–6 or SUN–SAT * , - ? L # ? L W 部分实现可用
Year No 1970–2099 * , - 标准实现里无这一项

特殊符号功能说明:

逗号 (,)
​ 逗号用于分隔一个列表里的元素,比如 "MON,WED,FRI" 在第五域 (day of week) 表示 Mondays, Wednesdays and Fridays。

连字符 (-)
​ 连字符用于表示范围,比如 2000–2010 表示 2000 到 2010 之间的每年,包括这两年 (闭区间)。

百分号 (%)
​ 用于命令 (command) 中的格式化

L
​ 表示 last,最后一个,比如第五域,5L 表示当月最后一个星期五

W
 W 表示 weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为 15L ,这表示临近当月 15 附近的工作日,假如 15 号是星期六,那么定时器会在 14 号执行,如果 15 号是星期天,那么定时器会在 16 号执行,也就是说只会在离指定日期最近的那天执行。

井号#
 #用于第五域(day of week),# 后面跟着一个 1~5 之间的数字,这个用于表示第几个星期,比如 5#3 表示第三个星期五

?
​ 在有些实现里面,* 的功能相同,还有一些实现里面 ? 表示 cron 的启动时间,比如 当 cron 服务在 8:25am 启动,则 ? ? * * * * 会更新为 25 8 * * * *, 直到下一次 cron 服务重新启动,定时器会再次更新。

/
 / 一般与 * 组合使用,后面跟着一个数字,表示频率,比如在第一域 (Minutes) 中 */5 表示每 5 分钟,是普通列表表示 5,10,15,20,25,30,35,40,45,50,55,00 的缩写

参考链接:
https://segmentfault.com/a/1190000012803744?utm_source=tuicool&utm_medium=referral

https://en.wikipedia.org/wiki/Cron

airflow 安装

airflow 安装

1、安装

pip install apache-airflow -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

用豆瓣镜像,默认的镜像报错

今天的关于无法启动Airflow Worker /鲜花,需要澄清Airflow架构以确认安装正确的分享已经结束,谢谢您的关注,如果想了解更多关于Airflow - PythonOperator 在 Window 10 上需要 termios 模块、airflow FAQ、airflow 介绍、airflow 安装的相关知识,请在本站进行查询。

本文标签: