27👍
If your broker is configured as redis://localhost:6379/1
, and your tasks are submitted to the general celery
queue, then you can get the length by the following means:
import redis
queue_name = "celery"
client = redis.Redis(host="localhost", port=6379, db=1)
length = client.llen(queue_name)
Or, from a shell script (good for monitors and such):
$ redis-cli -n 1 -h localhost -p 6379 llen celery
33👍
Here is how you can get the number of messages in a queue using celery that is broker-agnostic.
By using connection_or_acquire
, you can minimize the number of open connections to your broker by utilizing celery’s internal connection pooling.
celery = Celery(app)
with celery.connection_or_acquire() as conn:
conn.default_channel.queue_declare(
queue='my-queue', passive=True).message_count
You can also extend Celery to provide this functionality:
from celery import Celery as _Celery
class Celery(_Celery)
def get_message_count(self, queue):
'''
Raises: amqp.exceptions.NotFound: if queue does not exist
'''
with self.connection_or_acquire() as conn:
return conn.default_channel.queue_declare(
queue=queue, passive=True).message_count
celery = Celery(app)
num_messages = celery.get_message_count('my-queue')
- [Django]-Why won't Django use IPython?
- [Django]-What does it mean to normalize an email address?
- [Django]-Token Authentication for RESTful API: should the token be periodically changed?
7👍
If you have already configured redis in your app, you can try this:
from celery import Celery
QUEUE_NAME = 'celery'
celery = Celery(app)
client = celery.connection().channel().client
length = client.llen(QUEUE_NAME)
- [Django]-Django Standalone Script
- [Django]-HTML Forms without actions
- [Django]-Control the size TextArea widget look in django admin
6👍
Get a redis client instance used by Celery, then check the queue length. Don’t forget to release the connection every time you use it (use .acquire
):
# Get a configured instance of celery:
from project.celery import app as celery_app
def get_celery_queue_len(queue_name):
with celery_app.pool.acquire(block=True) as conn:
return conn.default_channel.client.llen(queue_name)
Always acquire a connection from the pool, don’t create it manually. Otherwise, your redis server will run out of connection slots and this will kill your other clients.
- [Django]-Cache_page with Class Based Views
- [Django]-Saving image/file through django shell
- [Django]-Update model django through kwargs
3👍
I’ll expand on the answer of @StephenFuhry around the not-found error, because more or less broker-agnostic way of retrieving queue length is beneficial even if Celery suggests to mess with brokers directly. In Celery 4 (with Redis broker) this error looks like:
ChannelError: Channel.queue_declare: (404) NOT_FOUND - no queue 'NAME' in vhost '/'
Observations:
-
ChannelError
is akombu
exception (if fact, it’samqp
‘s andkombu
"re-exports" it). -
On Redis broker Celery/Kombu represent queues as Redis lists
-
Redis collection type keys are removed whenever the collection becomes empty
-
If we look at what
queue_declare
does, it has these lines:if passive and not self._has_queue(queue, **kwargs): raise ChannelError(...)
-
Kombu Redis virtual transport’s
_has_queue
is this:def _has_queue(self, queue, **kwargs): with self.conn_or_acquire() as client: with client.pipeline() as pipe: for pri in self.priority_steps: pipe = pipe.exists(self._q_for_pri(queue, pri)) return any(pipe.execute())
The conclusion is that on a Redis broker ChannelError
raised from queue_declare
is okay (for an existing queue of course), and just means that the queue is empty.
Here’s an example of how to output all active Celery queues’ lengths (normally should be 0, unless your worker can’t cope with the tasks).
from kombu.exceptions import ChannelError
def get_queue_length(name):
with celery_app.connection_or_acquire() as conn:
try:
ok_nt = conn.default_channel.queue_declare(queue=name, passive=True)
except ChannelError:
return 0
else:
return ok_nt.message_count
for queue_info in celery_app.control.inspect().active_queues().values():
print(queue_info[0]['name'], get_queue_length(queue_info[0]['name']))
- [Django]-Django request.GET
- [Django]-Running django tutorial tests fail – No module named polls.tests
- [Django]-Sortable table columns in django