[Solved]-Django celery worker to send real-time status and result messages to front end

2👍

EDIT: Moved to django-channels now, works well but more complex than solution below.

Previous:

Ok so below is pseudo code for how I’ve solved it for now. Basically I use https://pusher.com/docs/javascript_quick_start and server-side pass the instantiated object into the compute_module. One downside is that the pusher messages are ephermeral so I’m going to have to do some extra work in LogPusher to store them in a db, something for another day…

Also in my real implementation I trigger the task via a $.post() ajax call in $(document).ready() because small tasks completed so fast the user would never see the pusher messages because the connection wasn’t established (back to that historic message problem).

Another alternative route which I hadn’t mentioned above is https://channels.readthedocs.io/en/latest/

[Edit] Another solutions is Server-sent events which has django implementations, havent tested it. But it looks good for uni-directional updates eg from server to client (vs websockets bidirectional). You would need a messaging system like redis pubsub to get updates to the server sse route.

Front-end updates from django server via pusher:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>

1👍

The only way I’ve managed to get realtime statuses is to simply put some SQL writes/api calls into the task itself. Doing things with the return value of the task is far easier since you can just write a custom task class.

I’m not entirely sure how this works using Django but it should look something like this.

class CustomTask(celery.Task):
    def __call__(self, *args, **kwargs):
        self.start_time = time.time()

    def on_success(self, retval, task_id, args, kwargs):
        do_success_stuff()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        do_failure_stuff()

@shared_task(base=CustomTask)
def do_stuff():
    return create_widgets()

The full list can be found here:
http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

👤lpiner

0👍

There is a library called celery-progress that might be helpful
celery-progress library

also he made a blog post about doing it manually:
blog about celery progress bars

Leave a comment