前言 之前讲过,在自己开发的公众号后台中有一个比较费时的请求操作,由于微信限制5秒内返回,无法立即返回结果,因此我开始寻找解决办法,最终决定使用任务队列 来进行实现。
实践 有些文章认为任务队列=消息队列,是一种解决方案,但我认为消息队列是任务队列的一部分,任务队列需要消息队列作支持 。完整的任务队列不仅需要消息中间件(即消息队列),还需要生产者,消费者来提出任务与解决任务。 任务队列在架构中十分常见,常见的使用场景有
费时操作交由任务队列系统异步执行,提高响应时间与吞吐量。
将请求的部分操作转交给任务队列执行,加快操作,如推送消息,发送验证码/邮件等。
定时任务。在生产环境下会有许多定时任务,如定时清理,定时备份等等,如果定时任务太多,又或者服务器数量过多,定时任务的管理就十分困难,此时任务队列就可以帮助管理员管理定时任务。
市面上也有许多消息中间件供开发者使用,如rabbitMQ,rocketMQ等等,当然也可以使用redis作为消息中间件。 当然光有消息中间件是无法实现一个任务队列的,接下来我们手动实现一个任务队列。
任务队列雏形 我们采用redis做消息中间件,利用redis的list数据结构,我们可以直接将list当作消息队列,作为消息存放处。同时redis还有lpush与rpop,生产者使用lpush将任务消息存放在list中,而消费者使用rpop或者使用阻塞的brpop来获取任务消息,并处理消息,之后将处理结果存在redis中,当需要结果时,从redis中取出结果。如此我们便实现了一个简单的任务队列。 我们假设生产者产生了一串数字,而消费者则是将数字进行简单hash处理。 我们可以实现一下代码 生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import redis,random,timer = redis.Redis(host='localhost' , port=6379 , decode_responses=True ) def producer (): print ("producer开始工作" ) while True : random_num = random.randint(1 ,10000 ) print (random_num) r.lpush("task_list" ,random_num) time.sleep(10 ) if __name__ == "__main__" : producer()
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import redis,random,time,hashlibr = redis.Redis(host='localhost' , port=6379 , decode_responses=True ) def get_md5 (str1 ): str1 = str (str1) h = hashlib.md5() h.update(str1.encode(encoding='utf-8' )) return h.hexdigest() def worker (): print ("worker开始工作" ) while True : random_num = r.brpop("task_list" )[1 ] num_md5 = get_md5(random_num) r.set (random_num,num_md5) print ("{} hash is: {}" .format (random_num,r.get(random_num))) if __name__ == "__main__" : worker()
如此我们简单实现一个最原始的任务队列,接下来就是让生产者从redis中取出结果,实现方法很简单,将每个任务分配任务id,根据任务id去获取结果。我们改进代码,让producer将任务id整合进任务中。这里我们使用json将序列化对象存入list中,然后结果也是用json数据存储。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def producer (): print ("producer开始工作" ) task_id = 0 task = {'task_id' :task_id,'random_num' :0 } while True : random_num = random.randint(1 ,10000 ) new_task = copy.copy(task) new_task['task_id' ] = (task_id+=1 ) new_task['random_num' ] = random_num print (new_task) r.lpush("task_list" ,json.dumps(new_task)) time.sleep(10 ) def worker (): print ("worker开始工作" ) while True : task = json.loads(r.brpop("task_list" )[1 ]) print (task) task_id = task['task_id' ] random_num = task['random_num' ] num_md5 = get_md5(random_num) result = {'task_id' :task_id,'result' :num_md5} r.set (task_id,json.dumps(result)) get_result = json.loads(r.get(task_id))['result' ] print ("{} hash is: {}" .format (random_num,get_result))
如此我们就可以根据任务id获取任务处理结果。
Celery 上面讲了很多,是时候谈谈本篇文章的重点,celery。它是一个由python语言实现分布式的任务队列,其中包括以下几个组成部分
Task。即用户定义的任务部分。
Broker。即上文所讲的消息中间件,一般采用redis或者rabbitMQ。
Worker。任务消费者,实时监控任务队列并处理。
Beat。定时任务调度器,将定时任务发送至broker,等待worker进行处理。
Backend。即任务执行的结果,worker将任务处理完成后将结果存放于此。
安装 我们采用pip安装,并采用redis作为消息中间件pip install -U "celery[redis]"
第一个celery实例 这里我们假设要向某api请求搜索数据,这个操作耗时需要10秒以上。
1 2 3 4 5 6 7 8 9 10 11 from celery import Celeryimport timebroker = 'redis://localhost:6379/0' celery_app = Celery('main' ,broker=broker) @celery_app.task def get_result_from_remote (keyword ): print ("正在搜索关键字为{}的数据" .format (keyword)) time.sleep(10 ) return "success"
注意创建celery实例时,第一个参数表示celery app所在的脚本名称,即入口点。 接下来我们同样在这个脚本中定义需要调用此任务的函数。
1 2 3 4 5 6 7 8 def search_from_remote (): keyword = input ("请输入关键字:" ) start = time.time() get_result_from_remote.delay(keyword) print ("一共耗时{}秒" .format (time.time()-start)) if __name__ == "__main__" : search_from_remote()
worker 此时我们就可以启动worker监听是否有任务存在于broker中。celery worker -A main --loglevel=info
-A 参数表示celery的任务在哪个脚本当中,然后定义了日志级别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 -------------- celery@ubuntu v4.4.2 (cliffs) --- ***** ----- -- ******* ---- Linux-4.4.0-105-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-24 21:02:29 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f5ba09f8ef0 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . main.get_result_from_remote [2020-03-24 21:16:24,764: INFO/MainProcess] Connected to redis://localhost:6379/0 [2020-03-24 21:16:24,804: INFO/MainProcess] mingle: searching for neighbors [2020-03-24 21:16:25,865: INFO/MainProcess] mingle: all alone [2020-03-24 21:16:25,954: INFO/MainProcess] celery@ubuntu ready.
由于还没启动生产者,所以worker不进行工作。或者说worker没有工作可做。 接着我们运行这个脚本python3 main.py
1 2 请输入关键字:123 一共耗时0.3693723678588867秒
可以发现定义在任务中延时并不会影响主线程运行,再看看worker的日志
1 2 3 [2020-03-24 21:16:33,985: INFO/MainProcess] Received task: main.get_result_from_remote[1d4196cf-12e2-49be-acf1-0163b1b0d522] [2020-03-24 21:16:33,990: WARNING/ForkPoolWorker-1] 正在搜索关键字为123的数据 [2020-03-24 21:16:44,005: INFO/ForkPoolWorker-1] Task main.get_result_from_remote[1d4196cf-12e2-49be-acf1-0163b1b0d522] succeeded in 10.015596589073539s: 'success'
十分完美,我们成功使用了celery做任务队列来处理耗时操作。
获取返回结果 接着我们看看在主线程中,任务的返回值究竟是什么。
1 2 3 4 5 6 7 def search_from_remote (): keyword = input ("请输入关键字:" ) start = time.time() result = get_result_from_remote.delay(keyword) print (result) print ("一共耗时{}秒" .format (time.time()-start))
发现返回了一串类似于任务id的数据.f75232b5-e3be-4a6f-a077-0b7bf9bd8111
通过查看官方文档,我找到了使用方法 第一种办法是通过任务的AsyncResult方法来获取结果:
1 2 task_id = "f75232b5-e3be-4a6f-a077-0b7bf9bd8111" print (get_result_from_remote.AsyncResult(task_id).get())
另一种方式是通过celery.result的AsyncResult方法:
1 2 3 task_id = "f75232b5-e3be-4a6f-a077-0b7bf9bd8111" from celery.result import AsyncResultAsyncResult(task_id).get()
添加backend 但是由于我们没有启动backend,所以无法获取结果。我们依旧把redis当作backend。
1 2 redis_server = 'redis://localhost:6379/0' celery_app = Celery('main' ,broker=redis_server,backend=redis_server)
我们重启worker,运行脚本之后获取任务id的结果
1 2 3 4 5 6 task_id = "ed9e6f8a-913f-49a5-a3b6-99444003352a" print ("result:" + get_result_from_remote.AsyncResult(task_id).get())from celery.result import AsyncResult print ("result:" + AsyncResult(task_id).get())
beat celery的几个组件就剩下beat。beat组件可以配置自动任务。我们另起一个新脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from celery import Celeryimport datetimeredis_server = 'redis://localhost:6379/0' celery_app = Celery('mybeat' ,broker=redis_server,backend=redis_server) celery_app.conf.beat_schedule = { 'task-10-seconds' : { 'task' : 'test' , 'schedule' : datetime.timedelta(seconds=10 ), 'args' : (1 , 1 ), }, } @celery_app.task(name='test' ) def test (x, y ): print ('定时任务开始运行' ) return x + y
我们先启动worker进程celery worker -A mybeat -l info
然后启动beat程序celery beat -A mybeat
之后每10秒worker就会计算一遍1+1的结果并将结果缓存。
其他 尽管我们初步学习了celery的使用,但其实在项目中celery的配置是较为复杂的,在网上找了几篇文章,celery项目结构普遍为
1 2 3 4 ├── __init__.py ├── celeryconfig.py ├── celery.py └── tasks.py
其中celery.py为主程序,作为整个celery的入口点,celeryconfig为配置存放处,而tasks则是各种任务。 在本篇文章的代码中,我只是设置了一个任务队列,celery支持多个任务队列,通过设置queue与route,并且制定任务名称,根据名称匹配路由,路由再将任务送至响应的queue中,实现不同类型的任务有不同的任务队列来处理。这里需要使用kombu对queue进行处理。简单给出代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 from celery import Celeryfrom kombu import Exchange, Queueredis_server = 'redis://localhost:6379/0' celery_app = Celery('celery' ,broker=redis_server,backend=redis_server) '''使用redis做broker可以无需配置exchange''' queue = ( Queue('default' , exchange=Exchange('default' , type ='direct' ), routing_key='default' ), Queue('task1' , exchange=Exchange('task1' , type ='direct' ), routing_key='task1' ), Queue('task2' , exchange=Exchange('task2' , type ='direct' ), routing_key='task2' ), ) route = { 'task*' : {'queue' : 'task1' , 'routing_key' : 'task1' }, 'task2' : {'queue' : 'task2' , 'routing_key' : 'task2' }, '*' : {'queue' : 'default' , 'routing_key' : 'default' }, } app.conf.update( task_serializer='json' , accept_content=['json' ], result_serializer='json' , timezone='Asia/Shanghai' , enable_utc=False , task_queues=queue, task_routes=route, ) @app.task(name="task2" ) def task2 (arg ): return "celery_task2,执行结果为:{}" .format (arg)
通过指定任务的name,celery选择合适的任务队列,如果需要worker配合,则需要使用-Q参数 指定队列的名称celery worker -A tasks -l info -Q task1
此时这个worker只会处理这个队列中的任务。 但就一个小项目而言,则无需配置queue,将项目简单化,上面的代码就已经够用。
小结 至此我们成功完成了一个celery任务队列的模板,包括设置celery实例,添加了消息中间件与消息结果件,并且成功让worker执行。之后设置了beat定时任务,定时执行某些任务。 有没有发现,celery与我们所写的任务队列雏形十分相像?如果设置celery的任务与结果序列化方式为json的话,核心逻辑就是我们所写的demo。 另外这里需要注意的是,celery4.x在win10存在不兼容问题,所以我采用Ubuntu作为测试环境。
后话 这次学习了celery的基本使用方法,这篇文章的知识在简单和中型开发中已然够用。 celery虽然在python中很火,但是却无法使用在其他语言中,也限制了它的流行度,官网中的文档只有入门例子才有中文。网上的文章很多讲述地又不是很清楚,因此我总结了这篇文章,希望能更好地了解开发,不仅是代码,更是对架构的理解。