模块版本
django==4.1
flower==1.2.0
redis==4.3.4
pymysql==1.0.2
django_celery_bea==2.4.0
关于django和celery就不在这里介绍了,网上有很多,这里就记录一下Django配合celery实现动态添加定时任务的配置
一、Celery配置
1、先在django的settings.py文件INSTALLED_APPS添加django_celery_beat
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_celery_beat'
]
2、表结构迁移
python3 manage.py migrate
3、在django的项目文件中创建一个文件夹(我这里创建的是celery_task)
4、在celery_task目录中创建config.py文件
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# 设置结果存储
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoProject.settings")
result_backend = 'redis://10.10.10.151:6379/0'
# 设置代理人broker
broker_url = 'redis://10.10.10.151:6379/1'
# celery 的启动工作数量设置
worker_concurrency = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
worker_prefetch_multiplier = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
worker_max_tasks_per_child = 40
# 设置CELERY_RESULT_BACKEND保存在redis中的有效时间
result_expires = 3600
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
worker_disable_rate_limits = True
#关闭时区
enable_utc = False
timezone = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 定时任务调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
5、在celery_task目录中创建celery.py文件
# -*- coding: UTF-8 -*-
from celery import Celery
from celery_task import config
project_name = 'mycelery'
# 创建celery app
app = Celery(project_name)
# app.conf.timezone = 'Asia/Shanghai'
# app.conf.enable_utc = False
# 从单独的配置模块中加载配置
app.config_from_object(config)
# app.autodiscover_tasks([
# 'drfapi'
# ])
6、在celery_task目录中创建tasks.py文件
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from celery_task.celery import app
@app.task
def task1(x,y):
s = 'x+y=%d'%(x+y)
print(s)
return 'task1任务执行成功'
@app.task
def task2():
return 'task2任务执行成功'
二、Django配置
1、url.pys
from django.urls import path,re_path
from drfapi import views
urlpatterns = [
# path('admin/', admin.site.urls),
path('api/task1/<int:s>/', views.task_1),
path('api/task2/', views.task_2),
path('api/task/get/', views.task_get),
path('api/task/on/<slug:name>/', views.task_on),
path('api/task/off/<slug:name>/', views.task_off),
path('api/task/del/<slug:name>/', views.task_del),
]
2、views.py
import json
from django.shortcuts import HttpResponse, redirect
from celery_task.tasks import task1,task2
from django_celery_beat.models import CrontabSchedule,PeriodicTask,IntervalSchedule
def task_1(request,s):
# 创建一个定时任务,s表示多少秒执行一次
schedule, _ = IntervalSchedule.objects.get_or_create(
every=s,
period=IntervalSchedule.SECONDS)
# 创建定时任务
PeriodicTask.objects.create(
interval=schedule,
name='task1' + '_schedule',
task="celery_task.tasks.task1", # 要执行的任务路径
args=json.dumps([5, 6]) # 传递的参数
)
return HttpResponse('ok')
def task_2(request):
# 创建一个定时任务,和crontab配置差不多,格式为分、时、周、月、年
data = json.loads(request.body)
minute = data.get('minute') # 分
hour = data.get('hour') # 时
week = data.get('week') # 周
month = data.get('month') # 月
year = data.get('year') # 年
print(minute,hour,week,month,year)
crontab, _ = CrontabSchedule.objects.get_or_create(
minute=minute,
hour=hour,
day_of_week=week,
day_of_month=month,
month_of_year=year,
)
# 创建定时任务
PeriodicTask.objects.create(
crontab=crontab,
name='task2' + '_crontab',
task='celery_task.tasks.task2', # 要执行的任务路径
args=''
)
return HttpResponse('ok')
def task_get(request):
# 获取全部的定时任务
my_task = PeriodicTask.objects.all()
l = []
for i in my_task:
l.append((i.id,i.name,i.enabled))
return HttpResponse(l)
def task_on(request,name):
# 启动定时任务
my_task = PeriodicTask.objects.get(name=name)
my_task.enabled = True
my_task.save()
return HttpResponse(my_task)
def task_off(request,name):
# 停止定时任务
my_task = PeriodicTask.objects.get(name=name)
my_task.enabled = False
my_task.save()
return HttpResponse(my_task)
def task_del(request,name):
# 删除定时任务
my_task = PeriodicTask.objects.get(name=name).delete()
return HttpResponse(my_task)
三、启动服务
打开终端进入到你django的根目录
1、启动Django
python3 manage.py runserver
2、启动celery_worker
celery_task 根据Celery配置
的第3步,填你创建的文件夹名
celery -A celery_task worker -l info
3、启动celery_beat
celery_task 根据Celery配置
的第3步,填你创建的文件夹名
celery -A celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval=5
4、启动celery_flower
里面redis的链接填Celery配置
第4步中的BROKER_URL,启动后使用浏览器访问http://127.0.0.1:5555/tasks就可以看到定时任务执行的记录了
celery --broker=redis://10.10.10.151:6379/1 flower
至此服务就全部启动了
四、使用接口添加任务
1、添加task1任务,每10秒执行一次
使用浏览器访问http://127.0.0.1:8000/api/task1/10/,等待一段时间人后访问flower查看执行记录
2、添加task2任务,每3分钟执行一次
使用POST请求方式请求http://127.0.0.1:8000/api/task2/,等待一段时间人后访问flower查看执行记录
另外几个启动、停止、删除你们自己试吧,我这边都是请求ok的