[Solved]-How to track the progress of individual tasks inside a group which forms the header to a chord in celery?

7👍

After hours of googling I stumbled upon http://www.manasupo.com/2012/03/chord-progress-in-celery.html . Though the solution there didn’t work for me out of the box, it did inspire me to try something similar.

from celery.utils import uuid
from celery import chord

class ProgressChord(chord):

    def __call__(self, body=None, **kwargs):
        _chord = self.type
        body = (body or self.kwargs['body']).clone()
        kwargs = dict(self.kwargs, body=body, **kwargs)
        if _chord.app.conf.CELERY_ALWAYS_EAGER:
            return self.apply((), kwargs)
        callback_id = body.options.setdefault('task_id', uuid())
        r= _chord(**kwargs)
        return _chord.AsyncResult(callback_id), r

and instead of executing celery.chord I use ProgressChord as follows:

def temptask(n):
    header=list(tempsubtask.si(i) for i in range(n))
    callback=templink.si('printed at last?')
    r = celery.Progresschord(celery.group(header))(callback)
    return r

returned value of r contained a tuple having both, callback’s asyncresult and a group result. So success looked something like this:

In [3]: r
Out[3]: 
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
 <GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)

I inherited and overrode [celery.chord][1] instead of [celery.task.chords.Chord][2] because I couldn’t find it’s source anywhere.

6👍

I’ve had a similar question. Most examples on the net are outdated, the docs didn’t help much, but the docs have links to sources, reading which did help me.
My objective was to organize parallel tasks in groups. The groups would have to be executed sequentially in order.
So I decided to generate the task ids before starting any tasks separately and only assigning them. I’m using Celery 4.3.0

Here’s a brief example.

Firstly I needed a dummy task to make execution sequential and to be able to check the state of a certain group. As this is used a callback, it will complete only after all other tasks in the group.

@celery.task(bind=True, name="app.tasks.dummy_task")
def dummy_task( self, results=None, *args, **kwargs ):
    return results

My comments here explain how I assign ids.

from celery.utils import uuid
from celery import group, chord, chain


# Generating task ids, 
# which can be saved to a db, sent to the client and so on
#
# This is done before executing any tasks

task_id_1 = uuid()
task_id_2 = uuid()

chord_callback_id_1 = uuid()
chord_callback_id_2 = uuid()

workflow_id = None


# Generating goups, using signatures
# the group may contain any number of tasks
group_1 = group(
        [
            celery.signature(
                    'app.tasks.real_task', 
                    args=(), 
                    kwargs = { 'email': some_email, 'data':some_data },
                    options = ( {'task_id': task_id_1 } )
                )
        ]
    )

group_2 = group(
        [
            celery.signature(
                    'app.tasks.real_task', 
                    args=(), 
                    kwargs = { 'email': some_email, 'data':some_data },
                    options = ( {'task_id': task_id_2 } )
                )
        ]
    )

# Creating callback task which will simply rely the result
# Using the task id, which has been generated before
# 
# The dummy task start after all tasks in this group are completed
# This way we know that the group is completed

chord_callback = celery.signature( 
        'app.tasks.dummy_task',
        options=( {'task_id': chord_callback_id_1 } )
    ) 

chord_callback_2 = celery.signature( 
        'app.tasks.dummy_task',
        options=( {'task_id': chord_callback_id_2 } )
    ) 


# we can monitor each step status
# by its chord callback id

# the id of the chord callback  
step1 = chord( group_1, body=chord_callback )
# the id of the chord callback  
step2 = chord( group_2, body=chord_callback_2 )

# start the workflow execution
# the steps will execute sequentially 
workflow = chain( step1, step2 )()


# the id of the last cord callback
workflow_id = workflow.id

# return any ids you need
print( workflow_id )

That’s how I can check the status of any task in my app.

# This is a simplified example
# some code is omitted
from celery.result import AsyncResult


def task_status( task_id=None ):

    # PENDING
    # RECEIVED
    # STARTED
    # SUCCESS
    # FAILURE
    # REVOKED
    # RETRY

    task = AsyncResult(task_id)

    response = {
      'state': task.state,
    }

    return jsonify(response), 200
👤sr9yar

1👍

Old problem and I wasted a several days to find a better and modern solution. In my current project I must to track group progress separately and release lock in final callback.

And current solution is much more simple (but harder to guess), subject lines commented at the end:

@celery_app.task(name="_scheduler", track_started=True, ignore_result=False)
def _scheduler():
    lock = cache.lock("test_lock")
    if not lock.acquire(blocking=False):
        return {"Error": "Job already in progress"}

    lock_code = lock.local.token.decode("utf-8")

    tasks = []
    for x in range(100):
        tasks.append(calculator.s())
    
    _group = group(*tasks)
    _chord = chord(_group)(_get_results.s(token=lock_code))
    
    group_results = _chord.parent # This is actual group inside chord
    group_results.save() # I am saving it to usual results backend, and can track progress inside.
    
    return _chord # can return anything, I need only chord.

I am working in Celery 5.1

👤Insspb

Leave a comment