GVKun编程网logo

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

12

本文将分享如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系的详细内容,此外,我们还将为大家带来关于append方法如何在python中工作?、ASP.NET网站中

本文将分享如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系的详细内容,此外,我们还将为大家带来关于append方法如何在python中工作?、ASP.NET网站中的预定作业,无需购买专用服务器、docker-compose 中 links 和 depends_on 区别、docker_compose.yml 中的 links 和 depends_on 的区别的相关知识,希望对你有所帮助。

本文目录一览:

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

我有一个Web服务(Python 3.7,Flask 1.0.2),其工作流程包括3个步骤:

  • 步骤1:将远程计算作业提交到商业排队系统(IBM的LSF)
  • 步骤2:每61秒轮询一次远程计算作业状态(由于缓存了作业状态结果,所以每61秒轮询一次)
  • 步骤3:如果步骤2返回了远程计算作业状态==“ DONE”,则进行数据后处理

远程计算作业的长度是任意的(介于秒和天之间),并且每个步骤都取决于上一个步骤的完成:

with Connection(redis.from_url(current_app.config[''REDIS_URL''])):    q = Queue()    job1 = q.enqueue(step1)    job2 = q.enqueue(step2, depends_on=job1)    job3 = q.enqueue(step3, depends_on=job2)

但是,最终所有工作人员(4个工作人员)将进行轮询(4个客户请求中的第2步),而他们应继续执行其他传入请求的第1步以及已成功通过第2步的那些工作流的第3步。

每次民意调查后都应释放工人。他们应该定期返回步骤2进行下一次轮询(每个作业最多每61秒一次),如果远程计算作业轮询未返回“
DONE”,则重新排队该轮询作业。


在这一点上,我开始使用rq-scheduler(因为间隔和重新排队功能听起来很有希望):

with Connection(redis.from_url(current_app.config[''REDIS_URL''])):    q = Queue()    s = Scheduler(''default'')    job1 = q.enqueue(step1, REQ_ID)    job2 = Job.create(step2, (REQ_ID,), depends_on=job1)    job2.meta[''interval''] = 61    job2.origin = ''default''    job2.save()    s.enqueue_job(job2)    job3 = q.enqueue(step3, REQ_ID, depends_on=job2)

Job2已正确创建(包括与depends_onjob1
的关系,但s.enqueue_job()立即执行它,而忽略了其与job1的关系。(q.enqueue_job()函数doc-
string实际上说它是立即执行的…) 。

depends_on
当将job2放在调度程序中而不是队列中时,如何创建job1,job2和job3之间的关系?(或者,如何在不立即执行job2并等待job1完成的情况下将job2交给调度程序?)


为了进行测试,步骤如下所示:

def step1():    print(f''*** --> [{datetime.utcnow()}] JOB [ 1 ] STARTED...'', flush=True)    time.sleep(20)    print(f''    <-- [{datetime.utcnow()}] JOB [ 1 ] FINISHED'', flush=True)    return Truedef step2():    print(f''    --> [{datetime.utcnow()}] POLL JOB [ 2 ] STARTED...'', flush=True)    time.sleep(10)    print(f''    <-- [{datetime.utcnow()}] POLL JOB [ 2 ] FINISHED'', flush=True)    return Truedef step3():    print(f''    --> [{datetime.utcnow()}] JOB [ 3 ] STARTED...'', flush=True)    time.sleep(10)    print(f''*** <-- [{datetime.utcnow()}] JOB [ 3 ] FINISHED'', flush=True)    return True

我收到的输出是这样的:

worker_1     | 14:44:57 default: project.server.main.tasks.step1(1) (d40256a2-904f-4ce3-98da-6e49b5d370c9)worker_2     | 14:44:57 default: project.server.main.tasks.step2(1) (3736909c-f05d-4160-9a76-01bb1b18db58)worker_2     |     --> [2019-11-04 14:44:57.341133] POLL JOB [ 2 ] STARTED...worker_1     | *** --> [2019-11-04 14:44:57.342142] JOB [ 1 ] STARTED......

job2不等待job1完成…


#requirements.txtFlask==1.0.2Flask-Bootstrap==3.3.7.1Flask-Testing==0.7.1Flask-WTF==0.14.2redis==3.3.11rq==0.13rq_scheduler==0.9.1

答案1

小编典典

我对此问题的解决方案rq仅使用了(并且不再使用rq_scheduler):

  1. 升级到最新的python-rq软件包:

    # requirements.txt


    rq==1.1.0

  2. 为轮询作业创建专用队列,并相应地使作业入队(具有depends_on关系):

    with Connection(redis.from_url(current_app.config[''REDIS_URL''])):q = Queue(''default'')p = Queue(''pqueue'')job1 = q.enqueue(step1)job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queuejob3 = q.enqueue(step3, depends_on=job2)
  3. 派遣专职工作人员进行轮询队列。它继承自标准Worker类:

    class PWorker(rq.worker.Worker):def execute_job(self, *args, **kwargs):    seconds_between_polls = 65    job = args[0]    if ''lastpoll'' in job.meta:        job_timedelta = (datetime.utcnow() - job.meta["lastpoll"]).total_seconds()        if job_timedelta < seconds_between_polls:            sleep_period = seconds_between_polls - job_timedelta            time.sleep(sleep_period)    job.meta[''lastpoll''] = datetime.utcnow()    job.save_meta()    super().execute_job(*args, **kwargs)

PWorker execute_job通过向作业的元数据添加时间戳来扩展该方法''lastpoll''

如果有lastpoll时间戳记的轮询作业进入,工作人员将检查此后的时间间隔lastpoll是否大于65秒。如果是,它将当前时间写入
''lastpoll''并执行轮询。如果没有,它将一直hibernate直到65s结束,然后将当前时间写入''lastpoll''并执行轮询。没有lastpoll时间戳的进来的作业是第一次轮询,而工作人员创建时间戳并执行轮询。

  1. 创建一个专用异常(由task函数抛出)和一个异常处理程序来处理它:
        # exceptions.py    class PACError(Exception):        pass    class PACJobRun(PACError):        pass    class PACJobExit(PACError):        pass        # exception_handlers.py    def poll_exc_handler(job, exc_type, exc_value, traceback):        if exc_type is PACJobRun:            requeue_job(job.get_id(), connection=job.connection)            return False  # no further exception handling        else:            return True  # further exception handling        # tasks.py    def step2():        # GET request to remote compute job portal API for status        # if response == "RUN":        raise PACJobRun        return True

当定制异常处理程序捕获到定制异常(这意味着远程计算作业仍在运行)时,它将在轮询队列中重新排队该作业。

  1. 将定制异常处理程序放入异常处理层次结构中:
        # manage.py    @cli.command(''run_pworker'')    def run_pworker():        redis_url = app.config[''REDIS_URL'']        redis_connection = redis.from_url(redis_url)        with rq.connections.Connection(redis_connection):            pworker = PWorker(app.config[''PQUEUE''], exception_handlers=[poll_exc_handler])            pworker.work()

该解决方案的优点在于,它仅用几行额外的代码即可扩展python-rq的标准功能。另一方面,额外的队列和工作程序增加了复杂性……

append方法如何在python中工作?

append方法如何在python中工作?

我学习编程语言,测验问题和解决方案是这样的:

def foo(x):
   x.append (3)
   x = [8]
   return x

x=[1,5]
y= foo(x)
print x
print y

为什么打印如下:

[1 5 3 ]
[8]

为什么x不等于8?

解决方法

另外两个答案很棒.我建议你试试id来获取地址.

请参阅以下示例

def foo(x):
   x.append (3)
   print "global",id(x)
   x = [8]
   print "local ",id(x)
   return x

x=[1,5]
print "global",id(x)
y= foo(x)
print "global",id(x)
print x
print y

和输出

global 140646798391920
global 140646798391920
local  140646798392928
global 140646798391920
[1,5,3]
[8]

如您所见,变量x的地址在您操作时保持不变,但在使用=时会发生变化.函数内部的变量赋值使变量本地化为函数

ASP.NET网站中的预定作业,无需购买专用服务器

ASP.NET网站中的预定作业,无需购买专用服务器

如何在共享的托管服务器上按配置的计划时间执行各种任务(例如电子邮件警报/发送新闻信件)?

解决方法

这是一个Global.ascx.cs文件,我以前用过这种方式,使用缓存到期来触发计划任务:
public class Global : HttpApplication
{
    private const string CACHE_ENTRY_KEY = "ServiceMimicCacheEntry";
    private const string CACHE_KEY = "ServiceMimicCache";

    private void Application_Start(object sender,EventArgs e)
    {
        Application[CACHE_KEY] = HttpContext.Current.Cache;
        RegisterCacheEntry();
    }

    private void RegisterCacheEntry()
    {
        Cache cache = (Cache)Application[CACHE_KEY];
        if (cache[CACHE_ENTRY_KEY] != null) return;
        cache.Add(CACHE_ENTRY_KEY,CACHE_ENTRY_KEY,null,DateTime.MaxValue,TimeSpan.FromSeconds(120),CacheItemPriority.normal,new CacheItemRemovedCallback(CacheItemRemoved));
    }

    private void SpawnServiceActions()
    {
        ThreadStart threadStart = new ThreadStart(DoServiceActions);
        Thread thread = new Thread(threadStart);
        thread.Start();
    }

    private void DoServiceActions()
    {
        // do your scheduled stuff
    }

    private void CacheItemRemoved(string key,object value,CacheItemRemovedReason reason)
    {
        SpawnServiceActions();
        RegisterCacheEntry();
    }
}

目前,这会每2分钟触发一次您的操作,但这可以在代码中配置.

docker-compose 中 links 和 depends_on 区别

OSC 请你来轰趴啦!1028 苏州源创会,一起寻宝 AI 时代

以下的内容适用于 docker-compose 版本 version 2  version 3。先来看 Docker 官方文档中关于 Docker Compose and Django 的例子,可以使用 depends_on 来访问容器中的数据

version: ''2''
services:
  db:
    image: postgres
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

而比如下面使用

web:
  links:
   - db

则表示当启动 db 容器时会随机分配一个本地端口比如 32777 来连接容器 3306 端口,每一次修改或者重启容器都会改变该端口,使用 links 来保证每一次都能够连接数据库,而不需要知道具体端口是什么。比如说启动了一个 mysql 容器

docker run -d --name=my-mysql --env="MYSQL_ROOT_PASSWORD=mypassword" -P mysql
docker inspect <container-id> | grep HostPort

会显示该容器的本地端口。

 docker-compose 执行 V2 文件时会自动在容器间创建一个网络,每一个容器都能够立即通过名字来访问到另外的容器。 因此,不再需要 links,links 过去通常用来开始 db 容器和 web server 容器网络之间的通讯,但是这一步已经被 docker-compose 做了。1

当使用 depends_on 来定义服务之间的依赖关系时会造成下面的影响 2

  • docker-compose up 会依据依赖顺序启动服务
  • docker-compose up 启动时 SERVICE 会自动包括 SERVICE 的依赖

看这个例子:

version: ''2''
services:
  web:
    build: .
    depends_on:
      - db
      - redis
  redis:
    image: redis
  db:
    image: postgres

这个例子中 db ,redis 容器启动顺序要优先于 web 容器;当启动 web 容器时会自动创建 redis 和 db 容器。

不过需要注意的是, depends_on 不会等到 db 和 redis 容器 ready 再启动,web 容器仅仅等到 redis 和 db 容器启动就开始启动。具体可参考官网启动顺序了解。

reference

  • https://stackoverflow.com/a/39658359/1820217
  1. https://medium.com/@giorgioto/docker-compose-yml-from-v1-to-v2-3c0f8bb7a48e#.ukh8ajps0 ↩

  2. https://docs.docker.com/compose/compose-file/#/dependson ↩

docker_compose.yml 中的 links 和 depends_on 的区别

根据 Docker Compose 的compose-file 文档:

  • depends_on- 表达服务之间的依赖关系。
  • links- 链接到另一个服务中的容器,并以与depends_on相同的方式 表达服务之间的依赖关系

我不明白链接到其他容器的目的,所以两个选项之间的区别对我来说似乎仍然很困难。

如果有一个例子会容易得多,但我找不到任何例子。

我注意到,当我将容器 B 与容器 A 链接时,容器 B 将在容器 A 的外壳内“可ping”。

ping B我在容器A里面跑bash了,得到了这样的结果(仅供参考,图片来自互联网)

在此处输入图像描述

关于如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于append方法如何在python中工作?、ASP.NET网站中的预定作业,无需购买专用服务器、docker-compose 中 links 和 depends_on 区别、docker_compose.yml 中的 links 和 depends_on 的区别的相关信息,请在本站寻找。

本文标签: