django celery 定时任务 Celery定时任务组件之Django+C

django celery 定时任务 Celery定时任务组件之Django+C

目录
  • 一、项目初始化
    • 1. 创建虚拟环境并安装依赖
    • 2. 创建 Django 项目和应用
    • 3. 配置项目(task_manager/settings.py)
  • 二、Celery 集成配置
    • 1. 创建 Celery 应用(task_manager/celery.py)
    • 2. 初始化 Celery(task_manager/__init__.py)
  • 三、Model 开发
    • 创建任务模型(tasks/models.py)
    • 迁移数据库
  • 四、接口开发
    • 1. 创建序列化器(tasks/serializers.py)
    • 2. 创建视图集(tasks/views.py)
    • 3. 配置 URL(tasks/urls.py)
    • 4. 项目 URL 配置(task_manager/urls.py)
  • 五、创建示例任务
    • 定义任务函数(tasks/tasks.py)
  • 六、启动服务
    • 1. 启动 Redis
    • 2. 启动 Celery Worker
    • 3. 启动 Celery Beat
    • 4. 启动 Django 开发服务器
  • 七、API 测试
    • 1. 创建周期性任务
    • 2. 查看任务列表
    • 3. 查看执行日志
  • 项目结构
    • 关键特性说明
      • 扩展建议
        • 拓展资料

          一、项目初始化

          1. 创建虚拟环境并安装依赖

          创建虚拟环境python3 -m venv myenvsource myenv/bin/activate 安装依赖pip install django celery redis django-celery-beat

          2. 创建 Django 项目和应用

          创建项目django-admin startproject task_managercd task_manager 创建应用python manage.py startapp tasks

          3. 配置项目(task_manager/settings.py)

          INSTALLED_APPS = [ … ‘django_celery_beat’, ‘django_celery_results’, ‘tasks’,] 数据库配置DATABASES = ‘default’: ‘ENGINE’: ‘django.db.backends.sqlite3’, ‘NAME’: BASE_DIR / ‘db.sqlite3’, }} Celery配置CELERY_BROKER_URL = ‘redis://localhost:6379/0’CELERY_RESULT_BACKEND = ‘django-db’ 使用django-celery-results存储结局CELERY_ACCEPT_CONTENT = [‘json’]CELERY_TASK_SERIALIZER = ‘json’CELERY_RESULT_SERIALIZER = ‘json’CELERY_TIMEZONE = ‘Asia/Shanghai’

          二、Celery 集成配置

          1. 创建 Celery 应用(task_manager/celery.py)

          from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celeryos.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘task_manager.settings’)app = Celery(‘task_manager’)app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)app.autodiscover_tasks()@app.task(bind=True)def debug_task(self): print(f’Request: self.request!r}’)

          2. 初始化 Celery(task_manager/__init__.py)

          from __future__ import absolute_import, unicode_literalsfrom .celery import app as celery_app__all__ = (‘celery_app’,)

          三、Model 开发

          创建任务模型(tasks/models.py)

          from django.db import modelsfrom django.utils import timezoneclass ScheduledTask(models.Model): TASK_TYPES = ( (‘periodic’, ‘周期性任务’), (‘one_time’, ‘一次性任务’), ) name = models.CharField(‘任务名称’, max_length=100) task_type = models.CharField(‘任务类型’, max_length=20, choices=TASK_TYPES) task_function = models.CharField(‘任务函数’, max_length=200) cron_expression = models.CharField(‘Cron表达式’, max_length=100, blank=True, null=True) interval_seconds = models.IntegerField(‘间隔秒数’, blank=True, null=True) next_run_time = models.DateTimeField(‘下次执行时刻’, blank=True, null=True) is_active = models.BooleanField(‘是否激活’, default=True) created_at = models.DateTimeField(‘创建时刻’, auto_now_add=True) updated_at = models.DateTimeField(‘更新时刻’, auto_now=True) def __str__(self): return self.name class Meta: verbose_name = ‘定时任务’ verbose_name_plural = ‘定时任务列表’class TaskExecutionLog(models.Model): task = models.ForeignKey(ScheduledTask, on_delete=models.CASCADE, related_name=’logs’) execution_time = models.DateTimeField(‘执行时刻’, auto_now_add=True) status = models.CharField(‘执行情形’, max_length=20, choices=( (‘success’, ‘成功’), (‘failed’, ‘失败’), )) result = models.TextField(‘执行结局’, blank=True, null=True) error_message = models.TextField(‘错误信息’, blank=True, null=True) def __str__(self): return f”self.task.name} – self.execution_time}” class Meta: verbose_name = ‘任务执行日志’ verbose_name_plural = ‘任务执行日志列表’

          迁移数据库

          python manage.py makemigrationspython manage.py migrate

          四、接口开发

          1. 创建序列化器(tasks/serializers.py)

          from rest_framework import serializersfrom .models import ScheduledTask, TaskExecutionLogclass ScheduledTaskSerializer(serializers.ModelSerializer): class Meta: model = ScheduledTask fields = ‘__all__’class TaskExecutionLogSerializer(serializers.ModelSerializer): class Meta: model = TaskExecutionLog fields = ‘__all__’

          2. 创建视图集(tasks/views.py)

          from rest_framework import viewsets, statusfrom rest_framework.response import Responsefrom .models import ScheduledTask, TaskExecutionLogfrom .serializers import ScheduledTaskSerializer, TaskExecutionLogSerializerfrom celery import current_appfrom django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabScheduleimport jsonclass ScheduledTaskViewSet(viewsets.ModelViewSet): queryset = ScheduledTask.objects.all() serializer_class = ScheduledTaskSerializer def create(self, request, args, kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) 创建Celery定时任务 task = serializer.save() self._create_celery_task(task) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) def update(self, request, args, kwargs): partial = kwargs.pop(‘partial’, False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) 更新Celery定时任务 task = serializer.save() self._update_celery_task(task) return Response(serializer.data) def destroy(self, request, args, kwargs): instance = self.get_object() 删除Celery定时任务 self._delete_celery_task(instance) self.perform_destroy(instance) return Response(status=status.HTTP_204_NO_CONTENT) def _create_celery_task(self, task): if task.task_type == ‘periodic’: 创建间隔调度 schedule, _ = IntervalSchedule.objects.get_or_create( every=task.interval_seconds, period=IntervalSchedule.SECONDS, ) PeriodicTask.objects.create( interval=schedule, name=task.name, task=task.task_function, enabled=task.is_active, args=json.dumps([]), kwargs=json.dumps(}), ) elif task.task_type == ‘one_time’: 一次性任务使用ETA pass def _update_celery_task(self, task): try: periodic_task = PeriodicTask.objects.get(name=task.name) if task.task_type == ‘periodic’: schedule, _ = IntervalSchedule.objects.get_or_create( every=task.interval_seconds, period=IntervalSchedule.SECONDS, ) periodic_task.interval = schedule periodic_task.enabled = task.is_active periodic_task.save() except PeriodicTask.DoesNotExist: self._create_celery_task(task) def _delete_celery_task(self, task): try: periodic_task = PeriodicTask.objects.get(name=task.name) periodic_task.delete() except PeriodicTask.DoesNotExist: passclass TaskExecutionLogViewSet(viewsets.ReadOnlyModelViewSet): queryset = TaskExecutionLog.objects.all() serializer_class = TaskExecutionLogSerializer

          3. 配置 URL(tasks/urls.py)

          from django.urls import include, pathfrom rest_framework import routersfrom .views import ScheduledTaskViewSet, TaskExecutionLogViewSetrouter = routers.DefaultRouter()router.register(r’tasks’, ScheduledTaskViewSet)router.register(r’logs’, TaskExecutionLogViewSet)urlpatterns = [ path(”, include(router.urls)),]

          4. 项目 URL 配置(task_manager/urls.py)

          from django.contrib import adminfrom django.urls import path, includeurlpatterns = [ path(‘admin/’, admin.site.urls), path(‘api/’, include(‘tasks.urls’)),]

          五、创建示例任务

          定义任务函数(tasks/tasks.py)

          from celery import shared_taskfrom .models import ScheduledTask, TaskExecutionLogimport logginglogger = logging.getLogger(__name__)@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=3, retry_kwargs=’max_retries’: 3})def sample_task(self, task_id): try: task = ScheduledTask.objects.get(id=task_id) 模拟任务执行 result = f”任务 task.name} 执行成功,时刻:str(self.request.time_start)}” 记录执行日志 TaskExecutionLog.objects.create( task=task, status=’success’, result=result ) logger.info(f”任务执行成功: task.name}”) return result except Exception as e: 记录错误日志 task = ScheduledTask.objects.get(id=task_id) if ScheduledTask.objects.filter(id=task_id).exists() else None if task: TaskExecutionLog.objects.create( task=task, status=’failed’, error_message=str(e) ) logger.error(f”任务执行失败: str(e)}”) raise

          六、启动服务

          1. 启动 Redis

          redis-server

          2. 启动 Celery Worker

          celery -A task_manager worker –loglevel=info –pool=prefork –concurrency=4

          3. 启动 Celery Beat

          celery -A task_manager beat –loglevel=info –scheduler django_celery_beat.schedulers:DatabaseScheduler

          4. 启动 Django 开发服务器

          python manage.py runserver

          七、API 测试

          1. 创建周期性任务

          curl -X POST http://localhost:8000/api/tasks/ -d ‘ “name”: “示例周期性任务”, “task_type”: “periodic”, “task_function”: “tasks.tasks.sample_task”, “interval_seconds”: 60, “is_active”: true}’ -H “Content-Type: application/json”

          2. 查看任务列表

          curl http://localhost:8000/api/tasks/

          3. 查看执行日志

          curl http://localhost:8000/api/logs/

          项目结构

          task_manager/├── task_manager/│ ├── __init__.py│ ├── celery.py│ ├── settings.py│ ├── urls.py│ └── wsgi.py├── tasks/│ ├── migrations/│ ├── __init__.py│ ├── admin.py│ ├── apps.py│ ├── models.py│ ├── serializers.py│ ├── tasks.py│ ├── urls.py│ └── views.py├── manage.py└── db.sqlite3

          关键特性说明

          • 动态任务管理:通过 API 创建 / 更新 / 删除定时任务
          • 任务执行记录:自动记录任务执行结局和情形
          • 失败重试机制:任务失败时自动重试(最多 3 次)
          • 多种调度方式:支持周期性任务和一次性任务
          • 可视化管理:通过 Django Admin 界面管理定时任务

          扩展建议

          • 添加任务参数支持,允许在创建任务时传递参数
          • 实现任务暂停 / 恢复功能
          • 添加任务优先级队列配置
          • 集成监控体系(如 Prometheus+Grafana)
          • 实现任务执行结局的异步通知(邮件、短信等)

          这个实现提供了一个完整的 Django+Celery 定时任务体系,支持动态管理和监控,可直接用于生产环境。

          拓展资料

          以上为个人经验,希望能给大家一个参考,也希望大家多多支持风君子博客。

          无论兄弟们可能感兴趣的文章:

          • djangocelery定时任务实战详解
          • 怎样使用celery进行异步处理和定时任务(django)
          • Django中使用Celery执行定时任务难题
          • django中celery的定时任务使用
          • django-celery-beat搭建定时任务的实现
          • Django初步使用Celery处理耗时任务和定时任务难题
          版权声明

          返回顶部