博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
celery 实例进阶
阅读量:5779 次
发布时间:2019-06-18

本文共 9623 字,大约阅读时间需要 32 分钟。

认识

这里有几个概念,task、worker、broker。

顾名思义,task 就是老板交给你的各种任务,worker 就是你手下干活的人员。

那什么是 Broker 呢?

老板给你下发任务时,你需要 把它记下来, 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本或者excel,或者是你的 任何时间管理工具。

Broker  则是 Celery 记录task的地方。

作为一个任务管理者的你,将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来,你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)

1. broker为rabbitmq

#tasks.py

from celery import Celeryapp = Celery('tasks', broker='amqp://admin:admin@localhost:5672')@app.taskdef add(x, y):    return x + y

启动

celery -A tasks worker --loglevel=info

运行

>>> from tasks import add>>> add(1, 3)4>>> add.delay(1,3)
>>>

:delay是使用异步的方式,会压入到消息队列。否则,不会使用消息队列。

文件名为tasks.py,则其中代码app = Celery('tasks', broker=),Celery第一个参数为工程名,启动时也是celery -A tasks worker --loglevel=info

对比

:投入到指定的队列用:add.delay(1, 3, queue='queue_add1') 

test_2.py

from celery import Celeryapp = Celery('proj', broker='amqp://admin:admin@localhost:5672', include='test_2')@app.taskdef add(x, y):    return x + y

2. 以python+文件名的方式启动

例1:

#test.py

from celery import Celeryimport timeapp = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')@app.taskdef add(x, y):    print "------>"    time.sleep(5)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

启动

python test.py worker

celery默认启动的worker数为内核个数,如果指定启动个数,用参数-c,例

python test.py worker -c 2

例2:

#test.py

from celery import Celeryimport timeapp = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')@app.taskdef add(x, y):    print "------>"    time.sleep(2)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

#eg.py

from test import *import timerev = []for i in range(3):    rev.append(add.delay(1,3))print "len rev:", len(rev)while 1:    tag = 1    for key in rev:        if not key.ready():            tag = 0            time.sleep(1)            print "sleep 1"    if tag:        breakprint "_____________________>"

3. broker为redis

#test_redis.py

from celery import Celeryimport time#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')@app.taskdef add(x, y):    print "------>"    time.sleep(5)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

启动

python test_redis.py worker -c 2

测试

from celery import groupfrom test_redis import *g = group(add.s(2, 3)).apply_async()g = group(add.s(2, 3)).apply_async()g = group(add.s(2, 3)).apply_async()g = group(add.s(2, 3)).apply_async()g = group(add.s(2, 3)).apply_async()for ret in g.get():    print retprint "end-----------------------------------"

结果

5end-----------------------------------

4. 两个队列(redis)

#test_redis.py

from celery import Celeryimport time#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')@app.taskdef add(x, y):    print "------>"    time.sleep(5)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

#test_redis_2.py

from celery import Celeryimport time#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')app = Celery('test_redis_2', backend='redis', broker='redis://100.69.201.116:7001')@app.taskdef add_2(x, y):    print "=======>"    time.sleep(5)    print "<================="    return x + yif __name__ == "__main__":    app.start()

测试

from celery import groupfrom test_redis import *from test_redis_2 import *ll = [(1,2), (3,4), (5,6)]g = group(add.s(key[0], key[1]) for key in ll).apply_async()for ret in g.get():    print retprint "end redis_1 -----------------------------------"ll = [(1,2), (3,4), (5,6)]g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()for ret in g.get():    print ":", retprint "end redis_2 -----------------------------------"

结果

3711end redis_1 -----------------------------------: 3: 7: 11end redis_2 -----------------------------------

5. 两个队列(同一个rabbitmq)

注释:需要提前设置下队列

##例1

#test.py

from celery import Celeryimport timeapp = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//')@app.taskdef add(x, y):    print "------>"    time.sleep(5)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

 

#test_2.py

from celery import Celeryimport timeapp = Celery('test_2', backend='amqp', broker='amqp://admin:admin@localhost:5672//hwzh')@app.taskdef add_2(x, y):    print "=====>"    time.sleep(5)    print "<=========="    return x + yif __name__ == "__main__":    app.start()

测试

from celery import groupfrom test import *from test_2 import *ll = [(1,2), (3,4), (7,8)]g = group(add.s(key[0], key[1]) for key in ll).apply_async()for ret in g.get():    print retll = [(1,2), (3,4), (7,8)]g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()for ret in g.get():    print ret

结果

37153715

 

##例2

#test.py

from celery import Celeryimport timeapp = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//mq4')@app.taskdef add(x, y):    print "------>"    time.sleep(2)    print "<--------------"    return x + y@app.taskdef sum(x, y):    print "------>"    time.sleep(2)    print "<--------------"    return x + yif __name__ == "__main__":    app.start()

#eg2.py

from test import *import timerev = []for i in range(3):    rev.append(add.delay(1,3))for i in range(3):    rev.append(sum.delay(1,3))print "len rev:", len(rev)while 1:    tag = 1    for key in rev:        if not key.ready():            tag = 0            time.sleep(1)            print "sleep 1"    if tag:        breakprint "_____________________>"

6. 保存结果

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://admin:admin@localhost')@app.taskdef add(x, y):     return x + y

启动

celery -A tasks_1 worker --loglevel=info

与前例不同:

- ** ---------- [config]

- ** ---------- .> app: tasks:0x7f8057931810
- ** ---------- .> transport: amqp://admin:**@localhost:5672//
- ** ---------- .> results: amqp

运行

>>> from tasks_1 import add>>> result = add.delay(1, 3)>>> result.ready()True>>> result.get()4

7. 多个队列

from celery import Celeryfrom kombu import Exchange, QueueBROKER_URL = 'amqp://admin:admin@localhost//'app = Celery('tasks', backend='amqp',broker=BROKER_URL)app.conf.update(     CELERY_ROUTES={          "add1":{
"queue":"queue_add1"}, "add2":{
"queue":"queue_add2"}, "add3":{
"queue":"queue_add3"}, "add4":{
"queue":"queue_add4"}, },)@app.taskdef add1(x, y): return x + y@app.taskdef add2(x, y): return x + y@app.taskdef add3(x, y): return x + y@app.taskdef add4(x, y): return x + y

8. 消息路由

文件:tasks.py

from celery import Celery, platformsimport timeimport osapp = Celery('proj', broker='amqp://admin:admin@ip:5672',             include=['tasks']             )app.conf.update(    CELERY_ROUTES={        'tasks.fun_1': {            'queue': "q_1"         },        'tasks.fun_2': {            'queue': "q_2"        }    })platforms.C_FORCE_ROOT = True @app.taskdef fun_1(n):    print "(((((((((((((((func_1", n    return 1@app.taskdef fun_2(n):    print n, ")))))))))))))))"    return 2if __name__ == "__main__":    app.start()

启动

python tasks.py worker -c 2 -Q q_1python tasks.py worker -c 2 -Q q_2

两个消息队列:q_1, q_2,调用示例

>>> from tasks import *>>> fun_1(1)(((((((((((((((func_1 11>>> fun_1.delay(1)
>>> fun_2.delay(2)

9. woker内启多进程

#tasks.py

from celery import Celeryimport timeimport multiprocessing as mpapp = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")def test_func(i):    print "beg...:", i    time.sleep(5)    print "....end:", i    return i * 5@app.taskdef fun_1(n):    curr_proc = mp.current_process()    curr_proc.daemon = False    p = mp.Pool(mp.cpu_count())    curr_proc.daemon = True    for i in range(n):        p.apply_async(test_func, args=(i,))    p.close()    p.join()    return 1if __name__ == "__main__":    app.start()

说明

直接启动多进程是肯定不可以的,因为是守候进程(curr_proc.daemon=True),所以启多进程之前主动设置为非守候进程:curr_proc.daemon=False,启动了以后再设为守候进程

#tasks_callback.py

from celery import Celeryimport timeimport multiprocessing as mpapp = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks_callback")rev = []def test_func(i):    print "beg...:", i    time.sleep(5)    print "....end:", i    return i * 5def callback_log(rev_val):    rev.append(rev_val)@app.taskdef fun_1(n):    print "before rev:", rev    curr_proc = mp.current_process()    curr_proc.daemon = False    p = mp.Pool(mp.cpu_count())    curr_proc.daemon = True    for i in range(n):        p.apply_async(test_func, args=(i,), callback=callback_log)    p.close()    p.join()    print "after rev:", rev    return 1if __name__ == "__main__":    app.start()

10. 常用参数配置

1. CELERYD_PREFETCH_MULTIPLIER

同时预取得消息个数,比如如果CELERYD_PREFETCH_MULTIPLIER=2,那么如果现在对于1个worker,有一个状态是STARTED, 那么可以有2个处于RECEVED状态(如果有的话),这样就避免了如果消息很多全部分下取,后起来的worker领不到消息的尴尬。

参考代码

from celery import Celery, platformsimport timeimport osapp = Celery('proj', broker='amqp://admin:admin@localhost:5672',             include=['tasks']             )app.conf.update(    CELERYD_PREFETCH_MULTIPLIER=2,    CELERY_ROUTES={        'tasks.fun_1': {            'queue': "q_1"        },        'tasks.fun_2': {            'queue': "q_2"        }    })platforms.C_FORCE_ROOT = True@app.taskdef fun_1(n):    print "(((((((((((((((func_1", n    time.sleep(20)    return 1@app.taskdef fun_2(n):    print n, ")))))))))))))))"    return 2

调用

>>> from tasks import *>>> fun_1.delay(3)
>>> fun_1.delay(3)
>>> fun_1.delay(3)
>>> fun_1.delay(3)

 

参考:http://windrocblog.sinaapp.com/?p=1585

转载地址:http://mokyx.baihongyu.com/

你可能感兴趣的文章
上位机和底层逻辑的解耦
查看>>
关于微信二次分享 配置标题 描述 图片??
查看>>
springcloud使用zookeeper作为config的配置中心
查看>>
校园火灾Focue-2---》洗手间的一套-》电梯
查看>>
css控制文字换行
查看>>
bzoj1913
查看>>
L104
查看>>
分镜头脚本
查看>>
链表基本操作的实现(转)
查看>>
邮件发送1
查看>>
[转] libcurl异步方式使用总结(附流程图)
查看>>
编译安装LNMP
查看>>
[转]基于display:table的CSS布局
查看>>
crm 02--->讲师页面及逻辑
查看>>
AS3.0 Bitmap类实现图片3D旋转效果
查看>>
Eigen ,MKL和 matlab 矩阵乘法速度比较
查看>>
带三角的面包屑导航栏(新增递增数字)
查看>>
Web应用程序安全与风险
查看>>
codeforces 984 A. Game
查看>>
CSS居中
查看>>