[Solved]-Tracking progress of a celery.group task?


tinkering around on the shell (ipython’s tab auto-completion) I found that group_task (which is a celery.result.ResultSet object) had a method called completed_count which gave exactly what I needed.

Also found the documentation at http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count


Here’s a full working example based on @dalore’s answer.

First tasks.py.

import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://guest@', backend='redis://localhost')

def add(x, y):
    return x + y

def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

Start redis server using Docker: docker run --name my-redis -p 6379:6379 -d redis.

Start RabbitMQ using Docker: docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine.

Start a single process celery worker in a separate shell: celery -A tasks worker --loglevel=info -c 1.

Then run the test script below.

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):

You should see something like the following with the progress bar increasing by 10% every second.

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Finally, clean up your redis and rabbitmq containers.

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis


Reading the documentation for AsyncResult there is a collect method that collects results as they come in.


from celery import group
from proj.celery import app

def A(how_many):
    return group(B.s(i) for i in range(how_many))()

def B(i):
    return pow2.delay(i)

def pow2(i):
    return i ** 2

Example output:

>>> from celery.result import ResultBase
>>> from proj.tasks import A

>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

The Task.trail option must be enabled so that the list of children is stored in result.children. This is the default but enabled explicitly for illustration.


Upon further testing this have found that whilst collect states it will collect results, it still waits. I found that to get the progress you need to get the result of the children, like so:

group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
    yield result.get()

tqdm displays a progress in the console

The mygrouptask is a returning a celery group like so:

return group(mytask.s(arg) for arg in args)()

