任务调度之Celery

场景:当你在某些程序执行过程中需要另外执行一些不需马上返回结果的任务,这个任务可以交给其他进程去执行,以提高原程序的效率,这时就需要使用一些任务调度服务,比如这次的主角“celery”。

Celery 是一个py开发的模块,安装啥的就不说了。 Github 开源之

安装之后你则发现命令行下有一个新的可执行命令:celery

路径一般为:/usr/local/bin/celery

这就是执行任务调度的主要worker程序了

写个简易的任务试试:tasks.py

    # coding=utf8    from celery import Celery    celery = Celery('tasks', backend='redis', broker='redis://localhost:6379/0')    @celery.task(bind=True, max_retries=3, default_retry_delay=1*6)    def do(self, word):        print 'hi:::', word        try:            raise ValueError, 'eee'        except Exception as e:            raise self.retry(exc=e, countdown=60)

如何运行?

就是刚才说到的celery worker

命令行到tasks.py所在目录,执行命令:

    celery -A tasks worker --loglevel=warn

这就是Celery的Terminal了,可以看见任务调度的过程以及相应日志,可以将该worker运行到后台,也可以用管道重定向输出到日志文件,这些就不细说了。

worker有了,如何发起调度?

单独执行py函数一样

    # python    >>> from tasks import do    >>> r = do.delary('nic')    >>> r.status    'RETRY'

来从头到脚解释一下这个程序引入模块,创建实例,使用redis作为任务调度队列,(可以使用其他,比如amqp)

实例化Celery,源码在 https://github.com/celery/celery/blob/master/celery/app/base.py # 108行,关于参数的说明注释实在少…

其中参数,第一个大概是名字,在worker进程输出日志的时候能看到,

第三个参数则是任务队列使用的服务协议以及地址

至于第二个参数,GitHub上的官方示例中,是不填这参数的,经过实验,不带该参数,是能正常调度任务的,执行什么的都没问题,但是一旦使用到里面关于执行结果的函数或属性,则会报错

    # AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

光看代码,一头雾水,查了好多资料以及源码,发现是跟这个backend参数有关系,参考资料:

Keeping Results 请mark之

If you want to keep track of the tasks’ states, Celery needs to store or send the states somewhere. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, Redis, AMQP (RabbitMQ), and MongoDB – or you can define your own.

For this example you will use the amqp result backend, which sends states as messages. The backend is specified via the backend argument to Celery, (or via the CELERY_RESULT_BACKEND setting if you choose to use a configuration module):

    # 大概意思就是,如果要查询结果的话就需要一个消息后端来传递执行结果,如果使用跟任务队列一样的服务作为消息服务,则直接写协议名    app = Celery('tasks', backend='amqp', broker='amqp://')

Or if you want to use Redis as the result backend, but still use RabbitMQ as the message broker (a popular combination):

    # 大概意思就是,如果要想将消息队列与任务队列区分开来,则可以另外设置不同的服务后端    app = Celery('tasks', backend='redis://localhost', broker='amqp://')

然后就是task的常用参数:

1.bind 有点资料上涉及到这个参数,但官方也没说清楚是什么,经过试验,默认值是False,如果bind=True,被装饰的函数第一个参数为task 实例对象本身,所以像例子上的“self”,有啥用?不急…

2.max_retries 从名字上可以看出,最大重试次数

3.default_retry_delay 默认重试间隔时间,秒,名字还是很好理解的

往下看,基于测试,写了个try except,报了个Error,这时终于使用到了 “self”了,对,需要重试执行任务,则使用到了self.retry 函数

也就是说若你需要保持你自己的任务函数的干净度,将上述bind=False时,则使这个任务函数完全不包含关于celery的任何代码侵入,当然同时就没法使用celery.task带来的重试机制的便利。

retry函数中的常用参数:

1.exc 异常堆栈的保存

2.countdown 可选重试间隔时间,否则使用default_retry_delay 设定,秒

执行任务调度:

    >>> do.delay(...)    >>> type(do)    <class 'celery.local.PromiseProxy'>

经过装饰的函数,其实已经不是函数了,而是一个对象,经过试验,当然你仍然可以直接执行 do(…)以此实现在当前进程中执行原函数,因为这是一个callbale对象。

要通过任务调度方式来执行任务,则使用该对象的delary函数,通过名字看出来是“延迟”的意思。

执行发现,重试执行的期间不会打印该异常堆栈,只会打印retry等一些普通信息,知道最后一次执行仍然失败,才会打印出异常堆栈,执行成功则不打印。

最后来解释一下上述r.status >>> ‘RETRY’

通过上述代码发现,其实这个任务是永远不会执行成功的,只会一直报错,celery worker会一直重试直到重试机会花光,

所以在重试期间执行status 时发现它是RETRY 状

当然还有一个状态函数就是 ready()表示是否执行完毕,重试期间它的结果是 False

重试完了最后结果是

    >>> r.status    'FAILURE' # 最后结果为失败    >>> r.ready()    True #真正的执行完了

当然,这个任务对象可以通过dir(r)来获取可以使用的函数,相信功能强大的它能满足你对任务调度的需求。

任务调度之Celery

相关文章:

你感兴趣的文章:

标签云: