[Fixed]-How to purge all tasks of a specific queue with celery in python?

36👍

just to update @Sam Stoelinga answer for celery 3.1, now it can be done like this on a terminal:

celery amqp queue.purge <QUEUE_NAME>

For Django be sure to start it from the manage.py file:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

If not, be sure celery is able to point correctly to the broker by setting the --broker= flag.

👤Hassek

10👍

The original answer does not work for Celery 3.1. Hassek’s update is the correct command if you want to do it from the command line. But if you want to do it programmatically, do this:

Assuming you ran your Celery app as:

celery_app = Celery(...)

Then:

import celery.bin.amqp
amqp = celery.bin.amqp.amqp(app = celery_app)
amqp.run('queue.purge', 'name_of_your_queue')

This is handy for cases where you’ve enqueued a bunch of tasks, and one task encounters a fatal condition that you know will prevent the rest of the tasks from executing.

E.g. you enqueued a bunch of web crawler tasks, and in the middle of your tasks your server’s IP address gets blocked. There’s no point in executing the rest of the tasks. So in that case, your task it self can purge its own queue.

6👍

Lol it’s quite easy, hope somebody can help me still though.

from celery.bin.camqadm import camqadm
camqadm('queue.purge', queue_name_as_string)

The only problem with this I still need to stop the celeryd before purging the que, after purging I need to run the celeryd again to handle tasks for the queue. Will update this question if i succeed.

I succeeded, but please correct me if this is not a good method to stop the celeryd, purge que and start it again. I know I am using term, because I actually want it to be terminated the task.

kill_command =  "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9"
subprocess.call(kill_command, shell=True)

camqadm('queue.purge', 'twitter_save')
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT

os.popen(rerun_command+' &')
send_task("socialreport.tasks.twitter_save")

Leave a comment