[Fixed]-Best practice of testing django-rq ( python-rq ) in Django

9πŸ‘

I just found django-rq, which allows you to spin up a worker in a test environment that executes any tasks on the queue and then quits.

from django.test impor TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.

5πŸ‘

None of the answers above really solved how to test without having redis installed and using django settings. I found including the following code in the tests does not impact the project itself yet gives everything needed.

The code uses fakeredis to pretend a Redis service is available, set up the connection before RQ Django reads the settings.

By default, fakeredis connections do not share the state (the server) so the connection must be the same. Therefore, it is a singleton object to reuse it.

from fakeredis import FakeStrictRedis, FakeRedis

class FakeRedisConn:
    """Singleton FakeRedis connection."""

    def __init__(self):
        self.conn = None

    def __call__(self, _, strict):
        if not self.conn:
            self.conn = FakeStrictRedis() if strict else FakeRedis()
        return self.conn


django_rq.queues.get_redis_connection = FakeRedisConn()

def test_case():
   ...

FakeRedis has the option to support it directly using FakeRedisConnSingleton:

from fakeredis import FakeRedisConnSingleton

django_rq.queues.get_redis_connection = FakeRedisConnSingleton()
πŸ‘€DanielM

4πŸ‘

I separated my rq tests into a few pieces.

  1. Test that I’m correctly adding things to the queue (using mocks).
  2. Assume that if something gets added to the queue, it will eventually be processed. (rqβ€˜s test suite should cover this).
  3. Test, given the correct input, my tasks work as expected. (normal code tests).

Code being tested:

def handle(self, *args, **options):
    uid = options.get('user_id')

    # @@@ Need to exclude out users who have gotten an email within $window
    # days.
    if uid is None:
        uids = User.objects.filter(is_active=True, userprofile__waitlisted=False).values_list('id', flat=True)
    else:
        uids = [uid]

    q = rq.Queue(connection=redis.Redis())

    for user_id in uids:
        q.enqueue(mail_user, user_id)

My tests:

class DjangoMailUsersTest(DjangoTestCase):
    def setUp(self):
        self.cmd = MailUserCommand()

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_no_userid_queues_all_userids(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        u2 = UserF.create(userprofile__waitlisted=False)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls,
                              [call(ANY, u1.pk), call(ANY, u2.pk)])

    @patch('redis.Redis')
    @patch('rq.Queue')
    def test_waitlisted_people_excluded(self, queue, _):
        u1 = UserF.create(userprofile__waitlisted=False)
        UserF.create(userprofile__waitlisted=True)
        self.cmd.handle()
        self.assertItemsEqual(queue.return_value.enqueue.mock_calls, [call(ANY, u1.pk)])

1πŸ‘

I commited a patch that lets you do:

from django.test impor TestCase
from django_rq import get_queue

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        queue = get_queue(async=False)
        queue.enqueue(func) # func will be executed right away
        # Test for job completion

This should make testing RQ jobs easier. Hope that helps!

πŸ‘€Selwin Ong

1πŸ‘

Just in case this would be helpful to anyone. I used a patch with a custom mock object to do the enqueue that would run right away

#patch django_rq.get_queue
with patch('django_rq.get_queue', return_value=MockBulkJobGetQueue()) as mock_django_rq_get_queue:
    #Perform web operation that starts job. In my case a post to a url

Then the mock object just had one method:

class MockBulkJobGetQueue(object):

    def enqueue(self, f, *args, **kwargs):
        # Call the function
        f(
            **kwargs.pop('kwargs', None)
        )
πŸ‘€user353255

1πŸ‘

what I’ve done for this case is to detect if I’m testing, and use fakeredis during tests. finally, in the test itself, I enqueue the redis worker task in synch mode:

first, define a function that detects if you’re testing:

TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'

def am_testing():
    return TESTING

then in your file that uses redis to queue up tasks, manage the queue this way.
you could extend get_queue to specify a queue name if needed:

if am_testing():
    from fakeredis import FakeStrictRedis 
    from rq import Queue
    def get_queue():
        return Queue(connection=FakeStrictRedis())

else:
    import django_rq
    def get_queue():
        return django_rq.get_queue()

then, enqueue your task like so:

queue = get_queue()
queue.enqueue(task_mytask, arg1, arg2)

finally, in your test program, run the task you are testing in synch mode, so that it runs in the same process as your test. As a matter of practice, I first clear the fakeredis queue, but I don’t think its necessary since there are no workers:

from rq import Queue
from fakeredis import FakeStrictRedis

FakeStrictRedis().flushall()
queue = Queue(async=False, connection=FakeStrictRedis())
queue.enqueue(task_mytask, arg1, arg2)

my settings.py has the normal django_redis settings, so django_rq.getqueue() uses these when deployed:

RQ_QUEUES = {
    'default': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
        # 'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
    },
    'high': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
        'DEFAULT_TIMEOUT': 500,
    },
    'low': {
        'HOST': env_var('REDIS_HOST'),
        'PORT': 6379,
        'DB': 0,
    }
}
πŸ‘€Comp Guy

0πŸ‘

You’ll need your tests to pause while there are still jobs in the queue. To do this, you can check Queue.is_empty(), and suspend execution if there are still jobs in the queue:

import time
from django.utils.unittest import TestCase
import django_rq

class TestQueue(TestCase):

def test_something(self):
    # simulate some User actions which will queue up some tasks

    # Wait for the queued tasks to run
    queue = django_rq.get_queue('default')
    while not queue.is_empty():
        time.sleep(5) # adjust this depending on how long your tasks take to execute

    # queued tasks are done, check state of the DB
    self.assert(.....)
πŸ‘€Chris Lawlor

0πŸ‘

I came across the same issue. In addition, I executed in my Jobs e.g. some mailing functionality and then wanted to check the Django test mailbox if there were any E-Mail. However, since the with Django RQ the jobs are not executed in the same context as the Django test, the emails that are sent do not end up in the test mailbox.

Therefore I need to execute the Jobs in the same context. This can be achieved by:

from django_rq import get_queue
queue = get_queue('default')
queue.enqueue(some_job_callable)

# execute input watcher
jobs = queue.get_jobs()

# execute in the same context as test
while jobs:
    for job in jobs:
        queue.remove(job)
        job.perform()
    jobs = queue.get_jobs()

# check no jobs left in queue
assert not jobs

Here you just get all the jobs from the queue and execute them directly in the test. One can nicely implement this in a TestCase Class and reuse this functionality.

πŸ‘€dice89

Leave a comment