Python 异步任务框架Celery 使用总结

简介

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,一般我们可以使用RabbitMQ或redis ,官方推荐使用RabbitMQ,而处理结果我们可以使用redis。消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。 Celery 系统可包含多个职程和中间人,以此获得高可用性和横向扩展能力。

Celery 是用Python 编写的,但协议可以用任何语言实现。迄今,已有 Ruby 实现的 RCelery 、node.js 实现的 node-celery 以及一个 PHP 客户端。

基本原理

Celery作为一个异步任务框架,其基本原理都是使用任务队列来实现,Celery本身不包含消息队列,在使用中我们一般选择RabbitMQ来实现。通常使用一个叫broker(中间人)来协调client(任务的发出者)和worker(任务的处理者),client发出消息到队列中,broker将队列中的信息派发给worker来处理,一个Celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

安装

我们可以一次执行安装多个celery 所需的依赖,像broker, backend以及常用的序列化工具,执行以下命令进行安装:

  pip install celery[librabbitmq,redis,auth,msgpack]

Celery使用rabbitmq作为broker, 我们需要创建一个rabbitmq用户, 一个虚拟主机, 并且允许这个用户访问虚拟主机:

cd /opt/

yum install -y epel-release

yum install -y erlang

wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm

yum install -y rabbitmq-server-3.6.10-1.el7.noarch.rpm

systemctl start rabbitmq-server

systemctl enable rabbitmq-server

rabbitmq-plugins enable rabbitmq_management # 启动rabbitmq的web端监控界面

# 添加rabbitmq的admin用户,授予最高权限

sudo rabbitmqctl add_user admin admin

sudo rabbitmqctl set_user_tags admin administrator

sudo rabbitmqctl set_permissions -p / admin “.*” “.*” “.*”

配置

 # 使用RabbitMQ作为消息代理
 BROKER_URL = 'amqp://admin:admin@localhost:5672/'

 # celery 启动时要导入的模块列表
 CELERY_IMPORTS = ('myapp.tasks', )

 # 把任务结果存在Redis
 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

 # 任务序列化和反序列化使用msgpack方案
 CELERY_TASK_SERIALIZER = 'msgpack' 

 # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
 CELERY_RESULT_SERIALIZER = 'json' 

 # 指定接受的内容类型
 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] 

以上是我们使用Celery 的常用配置信息,更多配置详情请参考Celery配置

定义一个应用

Celery库必须在使用前进行实例化,此实例称为应用程序(或简称app),该应用程序是线程安全的,因此具有不同配置,组件和任务的多个Celery应用程序可以在同一个进程空间中共存。

 from celery import Celery

 # 定义一个应用并用async_tasks 模块中加载所有的任务,‘async_tasks’ 模块名称您可以根据您需要进行命名
 app = Celery("async_tasks")

 # 加载celery 配置
 app.config_from_object("path_to_celery_config")

 # 定义一个 task
 @app.taskdef add(x, y):
return x + y

if __name__ == ‘__main__’:

app.start()

调用Task

celery 调用task 有三种方式:

1. apply_async(args[, kwargs[, …]]) ,这种方式会往消息队列发送消息, 并支持各种参数使用

2. delay(*args, **kwargs) ,是apply_async 一种简明调用方式,但是不支持很多额外的参数

3. calling ( __call__) , 应用支持调用API的对象(例如add(2,2) )意味着任务将在当前进程中执行,而不是由worker执行(不会发送消息)

示例:

 说明:本例中的T 即为 task 名称

 # 相当于apply_async 的简单调用方式
 T.delay(arg, kwarg=value)

 T.apply_async((arg, ), {'kwarg': value})

 # 任务会在10s 后开始执行
 T.apply_async(countdown=10)

 # 任务会在 now 之后的10秒开始执行
 T.apply_async(eta=now + timedelta(seconds=10))

 # 任务会在一分钟之后执行,在两分钟后过期
 T.apply_async(countdown=60, expires=120)

 # 任务会在now之后的两天过期
 T.apply_async(expires=now + timedelta(days=2))

LinkingCelery支持将任务链接(Linking)在一起,以便一个任务跟随另一个。回调任务将与父任务的结果一起作为部分参数应用:

 # 这里第一个任务的结果(4), 将被发送到一个新的任务中,并与第二个任务中的16得值相加然后计算出最后的结果20,形成类似于(2 + 2) + 16 = 20
 add.apply_async((2, 2), link=add.s(16))

消息发送重试(Message Sending retry)如果连接失败,Celery将自动重试发送消息,并且可以配置重试策略, 例如重试频率, 最大重试次数, 或者一起禁用。

add.apply_async((2, 2), retry=True, retry_policy={
 'max_retries': 3,
 'interval_start': 0,
 'interval_step': 0.2,
 'interval_max': 0.2,

})

获取任务处理结果一般我们使用异步任务来处理那些比较耗时的工作,可能不关心处理结果,但有时候我们还需要任务处理结果来进行一些其他的处理。要获取任务处理结果,我们首先要拿到任务执行的task_id, 然后根据这个id我们在获取结果。

 task_result = None

 task_id = add.delay(2, 2)
 result = AsyncResult(task_id)
 if result.successful():
task_result = result.get()