[Fixed]-Allow a task execution if it's not already scheduled using celery

12👍

One problem with the accepted answer is that it is slow. Checking if a task is already running involves making a call to the broker and then iterating through both the running and active tasks. If you want to queue up the task fast this won’t work. Also the current solution has a small race condition, in that 2 processes could be checking if the task has been queued at the same (find out it isn’t), which would then queue up 2 tasks.

A better solution would be to what I call debounced tasks. Basically you increment a counter each time you queue a task. When the task starts you decrement it. Use redis and then it’s all atomic.

e.g.

Queue up the task:

conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

Then in the task, you have 2 options, do you want to execute the task 15 seconds after the first one was queued (throttle) or execute it 15 seconds after the last one was queued (debounce). That is, if we keep trying to run the same task do we extend the timer, or do we just wait 15 for the first one and ignore the other tasks that were queued.

Easy to support both, here is debounce where we wait until the tasks stops getting queued:

conn = get_redis()
counter = conn.decr(key)
if counter > 0:
    # task is queued
    return
# continue on to rest of task

Throttle version:

counter = conn.getset(key, '0')
if counter == '0':
    # we already ran so ignore all the tasks that were queued since
    return
# continue on to task

Another benefit of this solution over the accepted is that the key is entirely under your control. So if you want the same task to be executing but only once for different id/objects for example, you incorporate that into your key.

Update

Was thinking about this even more, you can do the throttle version even easier without having to queue up tasks.

Throttle v2 (when queuing up the task)

conn = get_redis()
counter = conn.incr(key)
if counter == 1:
    # queue up the task only the first time
    task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

Then in the task you set the counter back to 0.

You don’t even have to use a counter, if you had a set you could add the key to the set. If you get back 1, then the key wasn’t in the set and you should queue the task. If you get back 0, then key is already in the set so don’t queue the task.

👤dalore

2👍

Look before you leap! You can check if there are any tasks running/waiting before you queue tasks.

from celery.task.control import inspect

def is_running_waiting(task_name):
    """
    Check if a task is running or waiting.
    """
    scheduled_tasks = inspect().scheduled().values()[0]
    for task in scheduled_tasks:
        if task['request']['name'] == task_name:
            return True
    running_tasks = inspect().active().values()[0]
    for task in running_tasks:
        if task['request']['name'] == task_name:
            return True

Now if you queue three add tasks, first one will be queued for execution, remaining wont be queued.

for i in range(3):
    if not is_running_waiting('add'):
        add.apply_async((2,2), countdown=15)

Leave a comment