[Fixed]-RuntimeError: Never call result.get() within a task Celery

25👍

Use allow_join_result. See the snippet below.

@app.task(ignore_result=True)
def catpure_res(task_id):
    task_obj = AsyncResult(task_id)
    with allow_join_result():
        task_obj.get(on_message=on_msg)

Note: As mentioned in the other answers it can cause performance issue and even deadlock, but if your task is well written and doesn’t cause unexpected errors than it should work like a charm.

4👍

As your title explain, calling get within a task is a bad practice and can lead to deadlock.
instead, you can check for the task status and get it result whenever it’s ready:

result = catpure_res.AsyncResult(task_id, app=app)
    if result.ready():
        return result.get()

    return result.state

You can wrap the above snippet within a function and request for it every x seconds.

EDIT: regard your comment:

  • You can get the result.state instead, and use the retry mechanism with countdown until the task result.state == SUCCESS.

  • You can add celery beat to run periodic task that check if the primary task ends.

  • Note that using such heavy task (of long duration) is also a bad practice. consider to break it apart into a small tasks and use canvas to combine them.

👤ItayB

0👍

from celery.result import allow_join_result
task_obj = send_task("do_something", [arg1, arg2, arg3])

with allow_join_result():
    def on_msg(*args, **kwargs):
        print(f"on_msg: {args}, {kwargs}")
    try:
        result = task_obj.get(on_msg=on_msg, timeout=timeout_s)
    except TimeoutError as exc:
        print("Timeout!")

Leave a comment