[Solved]-Django, Celery, Redis, RabbitMQ: Chained Tasks for Fanout-On-Writes

9đź‘Ť

âś…

The approach described in the video is task “chaining”.

To get your task method up and running as a chain, you want to add an extra parameter that represents the index into the list of followers. Instead of working on the full list of followers, the task only works on a fixed batch size, starting from the index argument it was handed. At completion, the task should create a new task and pass the new index.

INSERT_INTO_HOMEFEED_BATCH = 10000

@shared_task
def insert_into_homefeed(photo_id, user_id, index=0):
    # Grab the list of all follower IDs from Redis for user_id.
    r_server = redis.Redis(connection_pool=pool)

    range_limit = index + INSERT_INTO_HOMEFEED_BATCH - 1 # adjust for zero-index

    followers_list_batch = r_server.lrange("followers:%s" % (user_id), index, range_limit)

    if not followers_list_batch:
        return # zero followers or no more batches

    # Now for each follower_id in followers_list_batch, find their homefeed key 
    # in Redis and insert the photo_id into that homefeed list.
    for follower_id in followers_list:
        homefeed_list = r_server.lpush("homefeed:%s" % (follower_id), photo_id)

    insert_into_homefeed.delay(photo_id, user_id, range_limit + 1)

This works well because Redis lists are ordered and the lrange command doesn’t return an error on out-of-range inputs.

Leave a comment