14👍
I created a custom management command which modifies and replaces the rqscheduler
command included in django_rq
. An example is provided here: https://github.com/rq/rq-scheduler/issues/51#issuecomment-362352497
13👍
The best place I’ve found to run it is from your AppConfig
in apps.py
.
def ready(self):
scheduler = django_rq.get_scheduler('default')
# Delete any existing jobs in the scheduler when the app starts up
for job in scheduler.get_jobs():
job.delete()
# Have 'mytask' run every 5 minutes
scheduler.schedule(datetime.utcnow(), 'mytask', interval=60*5)
- Can't git-push to heroku due to "Build stream timed out"
- How to convert request.user into a proxy auth.User class?
- Google App Engine Application Extremely slow
9👍
I’ve added scheduling to a __init__
module in one of my project application (in terms of Django), but wrapped with small function which prevents queueing jobs twice or more. Scheduling strategy may be dependent of your specific needs (i.e. you may need additional checking for a job arguments).
Code that works for me and fit my needs:
import django_rq
from collections import defaultdict
import tasks
scheduler = django_rq.get_scheduler('default')
jobs = scheduler.get_jobs()
functions = defaultdict(lambda: list())
map(lambda x: functions[x.func].append(x.meta.get('interval')), jobs)
now = datetime.datetime.now()
def schedule_once(func, interval):
"""
Schedule job once or reschedule when interval changes
"""
if not func in functions or not interval in functions[func]\
or len(functions[func])>1:
# clear all scheduled jobs for this function
map(scheduler.cancel, filter(lambda x: x.func==func, jobs))
# schedule with new interval
scheduler.schedule(now+datetime.timedelta(seconds=interval), func,
interval=interval)
schedule_once(tasks.some_task_a, interval=60*5)
schedule_once(tasks.some_task_b, interval=120)
Also I’ve wrapped this snippet to avoid imports at the package level:
def init_scheduler():
# paste here initialization code
init_scheduler()
1👍
you should use django command to run schedule job https://docs.djangoproject.com/en/3.2/howto/custom-management-commands/
like this
class Command(BaseCommand):
def handle(self, *args, **options):
scheduler = django_rq.get_scheduler('crontab_job')
for job in scheduler.get_jobs():
scheduler.cancel(job)
# 定时任务例子1
scheduler.cron(
"*/3 * * * *", # 每周一零点零时零分执行 0 0 * * 0 测试可以使用 */3 * * * * 每3分钟执行一次
func=gong_an_job, # Function to be queued
kwargs={'msg': '我是王龙飞1,我喜欢修仙', 'number': 1}, # Keyword arguments passed into function when executed
repeat=None, # Repeat this number of times (None means repeat forever)
queue_name='crontab_job', # In which queue the job should be put in
use_local_timezone=False # Interpret hours in the local timezone
)
# 定时任务例子2
scheduler.cron(
"*/5 * * * *", # 每周一零点零时零分执行 0 0 * * 0 测试可以使用 */3 * * * * 每3分钟执行一次
func=gong_an_job, # Function to be queued
kwargs={'msg': '我是王龙飞222222,我喜欢修仙', 'number': 22222}, # Keyword arguments passed into function when executed
repeat=None, # Repeat this number of times (None means repeat forever)
queue_name='crontab_job', # In which queue the job should be put in
use_local_timezone=False # Interpret hours in the local timezone
)
#create crontab job
python manage.py rq_crontab_job
#check crontab job and put crontab job to queue
python manage.py rqscheduler --queue crontab_job
#run crontab job
python manage.py rqworker crontab_job
I think the first answer is greate,but in multi-Progress enviroment may have some probelm,you should only run once to create crontab job !