模块版本

django==4.1
flower==1.2.0
redis==4.3.4
pymysql==1.0.2
django_celery_bea==2.4.0

关于django和celery就不在这里介绍了,网上有很多,这里就记录一下Django配合celery实现动态添加定时任务的配置

一、Celery配置

1、先在django的settings.py文件INSTALLED_APPS添加django_celery_beat

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_celery_beat'
]

2、表结构迁移

python3 manage.py migrate

3、在django的项目文件中创建一个文件夹(我这里创建的是celery_task)

WX20221216-005854@2x.png

4、在celery_task目录中创建config.py文件

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# 设置结果存储
import os

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoProject.settings")
result_backend = 'redis://10.10.10.151:6379/0'
# 设置代理人broker
broker_url = 'redis://10.10.10.151:6379/1'
# celery 的启动工作数量设置
worker_concurrency = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
worker_prefetch_multiplier = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
worker_max_tasks_per_child = 40
# 设置CELERY_RESULT_BACKEND保存在redis中的有效时间
result_expires = 3600
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
worker_disable_rate_limits = True

#关闭时区
enable_utc = False
timezone = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False
# 定时任务调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

5、在celery_task目录中创建celery.py文件

# -*- coding: UTF-8 -*-
from celery import Celery
from celery_task import config

project_name = 'mycelery'

# 创建celery app
app = Celery(project_name)
# app.conf.timezone = 'Asia/Shanghai'
# app.conf.enable_utc = False
# 从单独的配置模块中加载配置
app.config_from_object(config)

# app.autodiscover_tasks([
#     'drfapi'
# ])

6、在celery_task目录中创建tasks.py文件

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from celery_task.celery import app

@app.task
def task1(x,y):
    s = 'x+y=%d'%(x+y)
    print(s)
    return 'task1任务执行成功'

@app.task
def task2():
    return 'task2任务执行成功'

二、Django配置

1、url.pys

from django.urls import path,re_path
from drfapi import views

urlpatterns = [
    # path('admin/', admin.site.urls),

    path('api/task1/<int:s>/', views.task_1),
    path('api/task2/', views.task_2),
    path('api/task/get/', views.task_get),
    path('api/task/on/<slug:name>/', views.task_on),
    path('api/task/off/<slug:name>/', views.task_off),
    path('api/task/del/<slug:name>/', views.task_del),
]

2、views.py

import json
from django.shortcuts import  HttpResponse, redirect
from celery_task.tasks import task1,task2
from django_celery_beat.models import CrontabSchedule,PeriodicTask,IntervalSchedule

def task_1(request,s):
    # 创建一个定时任务,s表示多少秒执行一次
    schedule, _ = IntervalSchedule.objects.get_or_create(
        every=s,
        period=IntervalSchedule.SECONDS)

    # 创建定时任务
    PeriodicTask.objects.create(
        interval=schedule,
        name='task1' + '_schedule',
        task="celery_task.tasks.task1", # 要执行的任务路径
        args=json.dumps([5, 6])  # 传递的参数
    )
    return HttpResponse('ok')

def task_2(request):
    # 创建一个定时任务,和crontab配置差不多,格式为分、时、周、月、年
    data = json.loads(request.body)
    minute = data.get('minute') # 分
    hour = data.get('hour') # 时
    week = data.get('week') # 周
    month = data.get('month') # 月
    year = data.get('year') # 年

    print(minute,hour,week,month,year)

    crontab, _ = CrontabSchedule.objects.get_or_create(
        minute=minute,
        hour=hour,
        day_of_week=week,
        day_of_month=month,
        month_of_year=year,

    )
    # 创建定时任务
    PeriodicTask.objects.create(
        crontab=crontab,
        name='task2' + '_crontab',
        task='celery_task.tasks.task2', # 要执行的任务路径
        args=''
    )
    return HttpResponse('ok')

def task_get(request):
    # 获取全部的定时任务
    my_task = PeriodicTask.objects.all()
    l = []
    for i in my_task:
        l.append((i.id,i.name,i.enabled))
    return HttpResponse(l)

def task_on(request,name):
    # 启动定时任务
    my_task = PeriodicTask.objects.get(name=name)
    my_task.enabled = True
    my_task.save()
    return HttpResponse(my_task)

def task_off(request,name):
    # 停止定时任务
    my_task = PeriodicTask.objects.get(name=name)
    my_task.enabled = False
    my_task.save()
    return HttpResponse(my_task)

def task_del(request,name):
    # 删除定时任务
    my_task = PeriodicTask.objects.get(name=name).delete()
    return HttpResponse(my_task)

三、启动服务

打开终端进入到你django的根目录

1、启动Django

python3 manage.py runserver

WX20221216-014636@2x.png

2、启动celery_worker

celery_task 根据Celery配置的第3步,填你创建的文件夹名

celery -A celery_task worker -l info 

WX20221216-014727@2x.png

3、启动celery_beat

celery_task 根据Celery配置的第3步,填你创建的文件夹名

celery -A celery_task beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval=5

WX20221216-014801@2x.png

4、启动celery_flower

里面redis的链接填Celery配置第4步中的BROKER_URL,启动后使用浏览器访问http://127.0.0.1:5555/tasks就可以看到定时任务执行的记录了

celery --broker=redis://10.10.10.151:6379/1 flower

WX20221216-014840@2x.png
至此服务就全部启动了

四、使用接口添加任务

1、添加task1任务,每10秒执行一次

使用浏览器访问http://127.0.0.1:8000/api/task1/10/,等待一段时间人后访问flower查看执行记录
WX20221216-015138@2x.png

2、添加task2任务,每3分钟执行一次

使用POST请求方式请求http://127.0.0.1:8000/api/task2/,等待一段时间人后访问flower查看执行记录
WX20221216-015611@2x.png
WX20221216-021001@2x.png

另外几个启动、停止、删除你们自己试吧,我这边都是请求ok的

最后修改:2022 年 12 月 21 日
如果觉得我的文章对你有用,请随意赞赏