Django实现crontab远程任务管理系统

2023-06-07 Django 1 444

前面有文章介绍过使用 django-crontab 和 apscheduler 模块管理Django本身应用的定时任务。

注意这里说的是Django本身应用的定时任务,也就是说定时任务本身是Django应用中的对应的函数功能。

这里也大概总结下 django-crontab 和 apscheduler 优缺点供大家参考

1、 Django crontab 的优点:

  • 集成度高:Django crontab 是 Django 框架的一部分,与 Django 应用无缝集成,使用起来非常方便。

  • 简单易用:Django crontab 提供了简洁的语法和接口,可以轻松定义和管理定时任务。

2、Django crontab 的缺点:

  • 依赖 Django:Django crontab 需要依赖 Django 框架,如果你的项目不是基于 Django,就无法使用该库。
  • 单进程模型:Django crontab 默认使用单进程模型,即每个定时任务在单独的进程中执行,可能会对系统资源造成一定压力。

3、APScheduler 的优点:

  • 独立性:APScheduler 是一个独立的任务调度库,可以与任何 Python 项目集成,不依赖于特定的框架。

  • 灵活性:APScheduler 提供了多种调度器和触发器的选择,可以根据不同的需求选择最合适的调度策略。

  • 多进程支持:APScheduler 支持多进程模型,可以通过配置来控制任务的并发执行,提高任务的执行效率。

4、APScheduler 的缺点:

  • 配置复杂:相比 Django crontab,APScheduler 的配置相对复杂一些,需要更多的配置项来定义定时任务。

背景介绍

不同于 django-crontab 和 apscheduler 管理Django应用本身的定时任务。

这里介绍的是类似Java的 xxl-job 管理不同Java项目的Task定时任务。

通过调用ansible api 来实现管理分发crontab任务到Linux指定单台或者多台主机。

简单理解就是 ansible cron 模块的可视化界面版本

技术栈

  • Django及其信号signal机制

  • Django celery异步

  • Django logging 日志

  • Ansible API 这里推荐使用 ansible-runner

实现逻辑

新增crontab模型,然后注册到Django Admin后台,新增、修改、删除(逻辑删除)时触发 post_save 信号,然后在信号中触发celery 任务Task,在Task中调用 ansible-runner的playbook接口推进 crontab 记录到远程主机

1、模型设计

from django.db import models
from django.contrib.auth import get_user_model


User = get_user_model()


# Create your models here.
class CronJob(models.Model):
    name = models.CharField(max_length=128, verbose_name="任务名称")

    # 远程主机的用户,也就是crontab任务配置在那个用户下
    system_user = models.CharField(max_length=16, default="root", verbose_name="系统用户")

    command = models.CharField(max_length=512, verbose_name="命令")

    # 多个主机逗号分隔
    execute_host =  models.CharField(max_length=512, verbose_name="执行主机")

    # 可以为空,为空就是给指定system_user下创建crontab,不为空则在 /etc/cron.d/下新增 cron_file只能名称的crontab
    # Will not manage /etc/crontab via cron_file, see documentation
    # https://docs.ansible.com/ansible/latest/collections/ansible/builtin/cron_module.html
    cron_file = models.CharField(max_length=512, default="job-from-devops", verbose_name="cron文件名")

    # True 代表 present, False 代表 absent
    status = models.BooleanField(default=True, verbose_name="是有有效")

    # crontab任务的时间规则
    weekday = models.CharField(max_length=16, default="*", verbose_name="crontab周")
    month = models.CharField(max_length=16, default="*", verbose_name="crontab月")
    day = models.CharField(max_length=16, default="*", verbose_name="crontab天")
    hour = models.CharField(max_length=16, default="*", verbose_name="crontab小时")
    minute = models.CharField(max_length=16, default="*", verbose_name="crontab分钟")

    # 逻辑删除
    is_deleted = models.BooleanField(default=False, verbose_name="是否删除")

    # 扩展信息
    creator = models.ForeignKey(User, on_delete=models.SET_DEFAULT, default="admin", verbose_name="创建者")
    create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
    update_time = models.DateTimeField(auto_now=True, verbose_name="更新时间")

    def __str__(self):
        return self.name

    class Meta:
        verbose_name = 'CronJob'
        verbose_name_plural = verbose_name

2、celery task执行ansible-playbook

先给出核心代码

@shared_task
def task_cron(id, update_time):
        # 获取crontab 记录
    cron_obj = Job.objects.get(id=id)


    if cron_obj.status:
        state = 'present'
    else:
        state = 'absent'

    cron_obj.command = mark_safe(cron_obj.command)

        # 渲染 Ansible playbook 配置文件模板
    adhoc_template = 'jobs/adhoc_crontab_template.yml'
    ansible_config = render_to_string(adhoc_template, {'cron_obj': cron_obj, 'state': state})

    # 将 Ansible 配置文件保存到临时文件
    ansible_config_file = '/tmp/ansible/xxx.yml'
    with open(ansible_config_file, 'w') as f:
        f.write(ansible_config)


    # 使用 Ansible Runner 运行 Ansible playbook
    os.environ['ANSIBLE_REMOTE_USER'] = getattr(settings, 'ANSIBLE_REMOTE_USER', 'xxxx')
    os.environ['ANSIBLE_PRIVATE_KEY_FILE'] = getattr(settings, 'ANSIBLE_PRIVATE_KEY_FILE', '/home/xxxx/.ssh/id_rsa')

    private_data_dir = getattr(settings, 'ANSIBLE_PRIVATE_DATA_DIR', '/tmp/ansible')
    inventory_file = getattr(settings, 'ANSIBLE_INVENTORY_FILE', '/etc/ansible/inventory')

    runner = ansible_runner.run_async(
            private_data_dir=private_data_dir,
            inventory=inventory_file,
            playbook=ansible_config_file
        )

    return runner[1].stats

这里重点解读下该断代码

1)、函数参数额多了个 update_time 但是实际未用,目的是每次更新都是新的celery task (函数名+参数),不然会出现更新了模型但是celery task不执行的情况

2)、使用mark_safe 函数是为了确保crontab command 中如果有 重定向符时不被转义

3)、根据模型记录使用render_to_string转化 ansible 模板为具体的 playbook

4)、通过 os.environ 设定 ansible的系统变量

5)、关于 ansible_runner的使用和介绍可以参考

Python使用ansible-runner模块实现ansible调用学习

ansible-runner官网文档

3、配置celery和signal

这里就细讲了,感兴趣的可以阅读之前写的相关文档

Django通过celery实现任务注册和管理

django使用celery执行异步任务时采用信号实现每个任务日志独立存放(after_setup_logger)

Django的信号机制解读

4、配置logging

这里给出配置的logging 模板

# 在日志级别中,有以下几个常见的级别(从低到高):'DEBUG'、'INFO'、'WARNING'、'ERROR' 和 'CRITICAL'。
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,#此选项开启表示禁用部分日志,不建议设置为True
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(message)s'#日志格式
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'require_debug_true': {
            '()': 'django.utils.log.RequireDebugTrue',#过滤器,只有当setting的DEBUG = True时生效
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'filters': ['require_debug_true'],
            'class': 'logging.StreamHandler',
            'formatter': 'verbose'
        },
        'file': {#重点配置部分
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': 'logs/job.log',#日志保存文件
            'formatter': 'verbose'#日志格式,与上边的设置对应选择
        }
    },
    'loggers': {
        'jobapp': {#日志记录器
            'handlers': ['file'],
            'level': 'INFO',
            'propagate': True,
        }
    },
}

项目中 signal.py 中使用logging

from django.db.models.signals import post_save
from django.dispatch import receiver
from jobs.models import Job
from jobs.tasks import task_cron
import logging

# logger = logging.getLogger(__name__)
logger = logging.getLogger('jobapp')

# 监听User模型创建
# Todo: 特殊日志接入到表中,做页面结果展示
@receiver(post_save, sender=Job)
def create_user_profile(sender, instance, created, **kwargs):
    if created:
        print("Job created", instance.name, instance.create_time)
        logger.info(f"Job created {instance.name}; {instance.create_time}")
    else:
        print("job update", instance.name, instance.update_time)
        logger.info(f"job update { instance.name} ; {instance.update_time}")

    task_cron.delay(instance.id, instance.update_time)
    print(f"after signal execute task_cron {instance.id}")

这里关于crontab 的更变是记录到日志中的。如果想记录到一个日志表中,可以单独定义一个 日志模型,然后通过信号获取到 sender的信息 写入到日志表中去

这里有个问题,就是如果是使用Django自带的后台管理系统,那么request 中是获取不到当前用户的,这样不管是记录操作的方式,都无法记录“是谁” 进行的操作,无法审计。

关于Django自带Admin后台如何获取当前用户,可以参考之前的文章

Django自带的Admin后台中如何获取当前登录用户

Django自带的Admin后台中如何获取当前登录用户


实际的代码演示就不操作了,把源码分享给大家,然后自己实际跑一遍更有收获,有问题可以微信公众号联系我

源码地址 dj_cronjobs

具体如何使用参考源码中的 Readme.md 即可。


如果觉得文章对你有用,请不吝点赞 和 关注个人公众号(搜索 全栈运维 或者 DailyJobOps


如果觉得文章对你有所帮助,欢迎点赞和赞赏~

文章同步发布个人微信公众号,欢迎关注。第一时间获取最新发布

全栈运维/DailyJobOps

如果阅读过程中有任何问题,欢迎入群交流~

QQ交流群
本文标题: Django实现crontab远程任务管理系统
本文作者: 老鹰
发布时间: 2023-06-07 20
原始链接: http://newblog.colinspace.com/blog/post/51/
许可协议: 署名-非商业性使用 4.0 国际许可协议
转载请保留原文链接及作者