Zer0e's Blog

初探celery任务队列

字数统计: 2.9k阅读时长: 12 min
2020/03/24 Share

前言

之前讲过,在自己开发的公众号后台中有一个比较费时的请求操作,由于微信限制5秒内返回,无法立即返回结果,因此我开始寻找解决办法,最终决定使用任务队列来进行实现。

实践

有些文章认为任务队列=消息队列,是一种解决方案,但我认为消息队列是任务队列的一部分,任务队列需要消息队列作支持。完整的任务队列不仅需要消息中间件(即消息队列),还需要生产者,消费者来提出任务与解决任务。
任务队列在架构中十分常见,常见的使用场景有

  • 费时操作交由任务队列系统异步执行,提高响应时间与吞吐量。
  • 将请求的部分操作转交给任务队列执行,加快操作,如推送消息,发送验证码/邮件等。
  • 定时任务。在生产环境下会有许多定时任务,如定时清理,定时备份等等,如果定时任务太多,又或者服务器数量过多,定时任务的管理就十分困难,此时任务队列就可以帮助管理员管理定时任务。

市面上也有许多消息中间件供开发者使用,如rabbitMQ,rocketMQ等等,当然也可以使用redis作为消息中间件。
当然光有消息中间件是无法实现一个任务队列的,接下来我们手动实现一个任务队列。

任务队列雏形

我们采用redis做消息中间件,利用redis的list数据结构,我们可以直接将list当作消息队列,作为消息存放处。同时redis还有lpush与rpop,生产者使用lpush将任务消息存放在list中,而消费者使用rpop或者使用阻塞的brpop来获取任务消息,并处理消息,之后将处理结果存在redis中,当需要结果时,从redis中取出结果。如此我们便实现了一个简单的任务队列。
我们假设生产者产生了一串数字,而消费者则是将数字进行简单hash处理。
我们可以实现一下代码
生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import redis,random,time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)


def producer():
print("producer开始工作")
while True:
random_num = random.randint(1,10000)
print(random_num)
r.lpush("task_list",random_num)
time.sleep(10)


if __name__ == "__main__":
producer()

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import redis,random,time,hashlib
r = redis.Redis(host='localhost', port=6379, decode_responses=True)


def get_md5(str1):
str1 = str(str1)
h = hashlib.md5()
h.update(str1.encode(encoding='utf-8'))
return h.hexdigest()

def worker():
print("worker开始工作")
while True:
random_num = r.brpop("task_list")[1]
num_md5 = get_md5(random_num)
r.set(random_num,num_md5)
print("{} hash is: {}".format(random_num,r.get(random_num)))


if __name__ == "__main__":
worker()

如此我们简单实现一个最原始的任务队列,接下来就是让生产者从redis中取出结果,实现方法很简单,将每个任务分配任务id,根据任务id去获取结果。我们改进代码,让producer将任务id整合进任务中。这里我们使用json将序列化对象存入list中,然后结果也是用json数据存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def producer():
print("producer开始工作")
task_id = 0 #初始化任务id
task = {'task_id':task_id,'random_num':0} #定义task结构
while True:
random_num = random.randint(1,10000)
new_task = copy.copy(task)
new_task['task_id'] = (task_id+=1)
new_task['random_num'] = random_num
print(new_task)
r.lpush("task_list",json.dumps(new_task))
time.sleep(10)

def worker():
print("worker开始工作")
while True:
task = json.loads(r.brpop("task_list")[1])
print(task)
task_id = task['task_id']
random_num = task['random_num']
num_md5 = get_md5(random_num)
result = {'task_id':task_id,'result':num_md5}
r.set(task_id,json.dumps(result))

get_result = json.loads(r.get(task_id))['result']
print("{} hash is: {}".format(random_num,get_result))

如此我们就可以根据任务id获取任务处理结果。

Celery

上面讲了很多,是时候谈谈本篇文章的重点,celery。它是一个由python语言实现分布式的任务队列,其中包括以下几个组成部分

  • Task。即用户定义的任务部分。
  • Broker。即上文所讲的消息中间件,一般采用redis或者rabbitMQ。
  • Worker。任务消费者,实时监控任务队列并处理。
  • Beat。定时任务调度器,将定时任务发送至broker,等待worker进行处理。
  • Backend。即任务执行的结果,worker将任务处理完成后将结果存放于此。

安装

我们采用pip安装,并采用redis作为消息中间件
pip install -U "celery[redis]"

第一个celery实例

这里我们假设要向某api请求搜索数据,这个操作耗时需要10秒以上。

1
2
3
4
5
6
7
8
9
10
11
from celery import Celery
import time

broker = 'redis://localhost:6379/0'
celery_app = Celery('main',broker=broker)

@celery_app.task
def get_result_from_remote(keyword):
print("正在搜索关键字为{}的数据".format(keyword))
time.sleep(10)
return "success"

注意创建celery实例时,第一个参数表示celery app所在的脚本名称,即入口点。
接下来我们同样在这个脚本中定义需要调用此任务的函数。

1
2
3
4
5
6
7
8
def search_from_remote():
keyword = input("请输入关键字:")
start = time.time()
get_result_from_remote.delay(keyword)
print("一共耗时{}秒".format(time.time()-start))

if __name__ == "__main__":
search_from_remote()

worker

此时我们就可以启动worker监听是否有任务存在于broker中。
celery worker -A main --loglevel=info
-A 参数表示celery的任务在哪个脚本当中,然后定义了日志级别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 -------------- celery@ubuntu v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-4.4.0-105-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-24 21:02:29
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x7f5ba09f8ef0
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery


[tasks]
. main.get_result_from_remote

[2020-03-24 21:16:24,764: INFO/MainProcess] Connected to redis://localhost:6379/0
[2020-03-24 21:16:24,804: INFO/MainProcess] mingle: searching for neighbors
[2020-03-24 21:16:25,865: INFO/MainProcess] mingle: all alone
[2020-03-24 21:16:25,954: INFO/MainProcess] celery@ubuntu ready.

由于还没启动生产者,所以worker不进行工作。或者说worker没有工作可做。
接着我们运行这个脚本
python3 main.py

1
2
请输入关键字:123
一共耗时0.3693723678588867秒

可以发现定义在任务中延时并不会影响主线程运行,再看看worker的日志

1
2
3
[2020-03-24 21:16:33,985: INFO/MainProcess] Received task: main.get_result_from_remote[1d4196cf-12e2-49be-acf1-0163b1b0d522]  
[2020-03-24 21:16:33,990: WARNING/ForkPoolWorker-1] 正在搜索关键字为123的数据
[2020-03-24 21:16:44,005: INFO/ForkPoolWorker-1] Task main.get_result_from_remote[1d4196cf-12e2-49be-acf1-0163b1b0d522] succeeded in 10.015596589073539s: 'success'

十分完美,我们成功使用了celery做任务队列来处理耗时操作。

获取返回结果

接着我们看看在主线程中,任务的返回值究竟是什么。

1
2
3
4
5
6
7
def search_from_remote():
keyword = input("请输入关键字:")
start = time.time()
result = get_result_from_remote.delay(keyword)
print(result)
print("一共耗时{}秒".format(time.time()-start))

发现返回了一串类似于任务id的数据.f75232b5-e3be-4a6f-a077-0b7bf9bd8111
通过查看官方文档,我找到了使用方法
第一种办法是通过任务的AsyncResult方法来获取结果:

1
2
task_id = "f75232b5-e3be-4a6f-a077-0b7bf9bd8111"
print(get_result_from_remote.AsyncResult(task_id).get())

另一种方式是通过celery.result的AsyncResult方法:

1
2
3
task_id = "f75232b5-e3be-4a6f-a077-0b7bf9bd8111"
from celery.result import AsyncResult
AsyncResult(task_id).get()

添加backend

但是由于我们没有启动backend,所以无法获取结果。我们依旧把redis当作backend。

1
2
redis_server = 'redis://localhost:6379/0'
celery_app = Celery('main',broker=redis_server,backend=redis_server)

我们重启worker,运行脚本之后获取任务id的结果

1
2
3
4
5
6
task_id = "ed9e6f8a-913f-49a5-a3b6-99444003352a"
print("result:" + get_result_from_remote.AsyncResult(task_id).get())
from celery.result import AsyncResult
print("result:" + AsyncResult(task_id).get())
#result:success
#result:success

beat

celery的几个组件就剩下beat。beat组件可以配置自动任务。我们另起一个新脚本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# mybeat.py
from celery import Celery
import datetime

redis_server = 'redis://localhost:6379/0'
celery_app = Celery('mybeat',broker=redis_server,backend=redis_server)

celery_app.conf.beat_schedule = {
'task-10-seconds': {
'task': 'test',
'schedule': datetime.timedelta(seconds=10),
'args': (1, 1),
},
}

@celery_app.task(name='test')
def test(x, y):
print('定时任务开始运行')
return x + y

我们先启动worker进程celery worker -A mybeat -l info
然后启动beat程序celery beat -A mybeat
之后每10秒worker就会计算一遍1+1的结果并将结果缓存。

其他

尽管我们初步学习了celery的使用,但其实在项目中celery的配置是较为复杂的,在网上找了几篇文章,celery项目结构普遍为

1
2
3
4
├── __init__.py
├── celeryconfig.py
├── celery.py
└── tasks.py

其中celery.py为主程序,作为整个celery的入口点,celeryconfig为配置存放处,而tasks则是各种任务。
在本篇文章的代码中,我只是设置了一个任务队列,celery支持多个任务队列,通过设置queue与route,并且制定任务名称,根据名称匹配路由,路由再将任务送至响应的queue中,实现不同类型的任务有不同的任务队列来处理。这里需要使用kombu对queue进行处理。简单给出代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from celery import Celery
from kombu import Exchange, Queue
#broker 与 backend可以使用不同的redis数据库,如0和1
redis_server = 'redis://localhost:6379/0'
celery_app = Celery('celery',broker=redis_server,backend=redis_server)

'''使用redis做broker可以无需配置exchange'''
queue = (
Queue('default', exchange=Exchange('default', type='direct'), routing_key='default'),
Queue('task1', exchange=Exchange('task1', type='direct'), routing_key='task1'),
Queue('task2', exchange=Exchange('task2', type='direct'), routing_key='task2'),
)
route = {
'task*': {'queue': 'task1', 'routing_key': 'task1'},

'task2': {'queue': 'task2', 'routing_key': 'task2'},
'*': {'queue': 'default', 'routing_key': 'default'},
}
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=False,
# 任务过期时间(超时时间)
# result_expires=3600,
# Number of CPU cores.
# worker_concurrency=4,
task_queues=queue,
task_routes=route,
)

#tasks.py
@app.task(name="task2")
def task2(arg):
return "celery_task2,执行结果为:{}".format(arg)

通过指定任务的name,celery选择合适的任务队列,如果需要worker配合,则需要使用-Q参数 指定队列的名称celery worker -A tasks -l info -Q task1此时这个worker只会处理这个队列中的任务。
但就一个小项目而言,则无需配置queue,将项目简单化,上面的代码就已经够用。

小结

至此我们成功完成了一个celery任务队列的模板,包括设置celery实例,添加了消息中间件与消息结果件,并且成功让worker执行。之后设置了beat定时任务,定时执行某些任务。
有没有发现,celery与我们所写的任务队列雏形十分相像?如果设置celery的任务与结果序列化方式为json的话,核心逻辑就是我们所写的demo。
另外这里需要注意的是,celery4.x在win10存在不兼容问题,所以我采用Ubuntu作为测试环境。

后话

这次学习了celery的基本使用方法,这篇文章的知识在简单和中型开发中已然够用。
celery虽然在python中很火,但是却无法使用在其他语言中,也限制了它的流行度,官网中的文档只有入门例子才有中文。网上的文章很多讲述地又不是很清楚,因此我总结了这篇文章,希望能更好地了解开发,不仅是代码,更是对架构的理解。

CATALOG
  1. 1. 前言
  2. 2. 实践
    1. 2.1. 任务队列雏形
    2. 2.2. Celery
      1. 2.2.1. 安装
      2. 2.2.2. 第一个celery实例
      3. 2.2.3. worker
      4. 2.2.4. 获取返回结果
      5. 2.2.5. 添加backend
      6. 2.2.6. beat
      7. 2.2.7. 其他
      8. 2.2.8. 小结
  3. 3. 后话