celery之Chains, Groups, Chords, Map & Starmap, Chunks

celery是一个独立的server进程,可以用redis、rabbitmq、mongo作为queue,flower是可视化的管理后台。
如下调用task装饰的func.apply_async就是异步执行,或者调用send_task。

1
2
3
4
5
6
7
8
9
[lavenderuni@~]# celery multi start w1 -A asynctask -l info -c 1
celery multi v3.1.23 (Cipater)
> Starting nodes...
<class 'asynctask.lib.DefaultRegistry'>
> w1@LavenderUnideMacBook-Pro.local: OK
[lavenderuni@~]# celery multi stop w1 -A asynctask -l info
celery multi v3.1.23 (Cipater)
> w1@LavenderUnideMacBook-Pro.local: DOWN

以上是启动停用命令,项目实例参照asynctask,-l参数是日志输出等级控制(debug是调试模式,会输出更多执行详情信息,生产环境不要轻易开启debug模式),-c是并发进程数目控制。

通过celery的Chains, Groups, Chords, Map & Starmap, Chunks,可以组合出各种复杂的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
app = Celery("task", backend="redis://localhost:6379/1", broker="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
@app.task
def trange(limit):
return range(limit)
result = add.apply_async((2, 2), link=add.s(16))

Chains

串行调用,可以将signature任务按照顺序执行,前一个任务的输出是后一个任务的输入,结果是最后一个signature任务的输出。

1
2
3
4
5
6
7
8
9
10
11
from celery import chain
result = chain(add.s(2, 2), add.s(4), add.s(8))() # 2 + 2 + 4 + 8
result.get() # 16
result.parent.parent.graph
# cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)
# 87eac0e5-1f1b-4c0d-a27a-dbf7e7ccd925(2)
# cd959635-47d2-4368-bdf1-ab969f9ce0e4(1)
# 6681c6b6-bc34-44b4-8c9f-7ad132ffa5f3(0)

Groups

并行调用,可以让signature并行执行,返回的结果是所有signature任务返回结果组成的数组。

1
2
3
4
from celery import group
result = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))() # 2 + 2, 4 + 4, 8 + 8
result.get() # [14, 8, 16]

Chords

先并行调用,再串行调用,把并行signature任务的结果列表输入到串行调用,进行汇总,是reduce过程。

1
2
3
4
5
6
7
from celery import group, chain, chord
result = chain(group(add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())() # sum([2 + 2, 4 + 4, 8 + 8])
result.get() # 28
result = chord((add.s(2, 2), add.s(4, 4), add.s(8, 8)), tsum.s())()
result.get() # 28

Map & Starmap

  • Map,对并行调用的结果各自汇总

    1
    2
    3
    ~tsum.map([range(2), range(2), range(2)]) # [1, 1, 1]
    result = chain(trange.s(2), group(tsum.s(), tsum.s(), tsum.s()))()
    result.get() # [1, 1, 1]
  • Starmap,对并行调用的结果各自汇总,汇总参数是tuple,相当于*args

    1
    ~add.starmap(zip(range(10), range(10))) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Chunks

对大任务进行分割,分成小块执行,提高性能,将结果收集成列表。

1
2
3
4
5
6
7
8
9
10
11
12
result = add.chunks(zip(range(100), range(100)), 10)()
result.get()
#[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
# [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
# [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
# [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
# [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
# [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
# [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
# [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
# [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
# [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]