[Solved]-Is it possible to skip delegating a celery task if the params and the task name is already queued in the server?


celery-singleton solves this requirement

Caveat: requires redis broker (for distributed locks)

pip install celery-singleton

Use the Singleton task base class:

from celery_singleton import Singleton

def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)

from the docs:

calls to do_stuff.delay() will either queue a new task
or return an AsyncResult for the currently queued/running instance of
the task


I would try a mix of a cache lock and a task result backend which stores each task’s results:

  • The cache lock will prevent tasks with the same arguments to get added to the queue multiple times. Celery documentation contains a nice example of cache lock implementation here, but if you don’t want to create it yourself, you can use the celery-once module.

  • For a task result backend, we will use the recommended django-celery-results, which creates a TaskResult table that we will query for task results.


  • Install and configure django-celery-results:


    CELERY_RESULT_BACKEND = 'django-db'  # You can also use 'django-cache'

    ./manage.py migrate django_celery_results

  • Install and configure the celery-once module:


    from celery import Celery
    from celery_once import QueueOnce
    from time import sleep
    celery = Celery('tasks', broker='amqp://guest@localhost//')
    celery.conf.ONCE = {
        'backend': 'celery_once.backends.Redis',
        'settings': {
            'url': 'redis://localhost:6379/0',
            'default_timeout': 60 * 60
    def do_stuff_for_some_time(some_id):
        e = Model.objects.get(id=some_id)

    At this point, if a task with the same arguments is going to be executed,
    an AlreadyQueued exception will be raised.

  • Let’s use the above:

    from django_celery_results.models import TaskResult
        result = do_stuff_for_some_time(some_id)
    except AlreadyQueued:
        result = TaskResult.objects.get(task_args=some_id)


  • Mind that at the time an AlreadyQueued exception arises, the initial task with argument=some_id may not be executed and therefore it will not have results in TaskResult table.

  • Mind everything in your code that can go wrong and hang any of the above processes (because it will do that!).

Extra Reading:


I am not really sure if celery has such an option. However, I would like to suggest a work-around.

1) Create a model for all the celery tasks being queued. In that model, save the task_name, queue_name as well as the parameters

2) Use a get_or_create on that model for every celery task that is ready to be queued.

3) If created = True from step 2, allow the task to be added to the queue, else do not add the task into the queue

Leave a comment