【FastAPI 学习十二】定时任务篇

定时任务是一个通用场景常见的功能,之前我使用django的时候,更习惯使用celery中的定时任务,现在花时间看了看apscheduler感觉不错,就写了demo,并集成到项目代码中了

任务调度主要就是以下几个功能

  • 添加/删除 任务调度
  • 暂停/恢复 任务调度(这条我未实现)
  • 查看定时任务状态

定时任务

添加定时任务

其中添加定时任务方式,有以下三种方式

  • date: 固定的时间执行一次时 用这种
  • interval: 想要在固定的间隔时间循环执行时用这种
  • cron: 这种就是最为灵活的 crontab 表达式定时任务了

    原文 https://apscheduler.readthedocs.io/en/stable/userguide.html#starting-the-scheduler

  • date: use when you want to run the job just once at a certain point of time
  • interval: use when you want to run the job at fixed intervals of time
  • cron: use when you want to run the job periodically at certain time(s) of day

Tip: crontab写法可以参考这个网站 https://crontab.guru/

在FastAPI异步框架中,选择 AsyncIOScheduler调度程序

默认使用sqlite持久化定时任务,不至于重启就失效

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.triggers.cron import CronTrigger

Schedule = AsyncIOScheduler(
    jobstores={
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
)
Schedule.start()

不完善的地方

  • 1 由于时间仓促,比如参数验证的部分,参数数据校验,还可以继续完善。
  • 2 添加调度任务参数封装,我把三种添加任务的三种方式,拆成了三个函数,其中很多参数没有用到。
  • 3 其中文档中还有说明最大woker数量限制之类的,然后按照我使用其他定时任务的常识,应该还会函数最长执行时间限制(比如执行的功能函数特别耗时)
  • 4 ...等等

这些就需要自己查看文档和issues搜索了

其他方案

定时任务有很多种方案,比如可以使用

说到arq, 想起我之前,使用过rq 并且学习的时候稍微翻译了一下文档 rq v1.0 https://codercharm.github.io/Python-rq-doc-cn/#/ 一晃过去一年了。

代码地址

参考地址


文章作者: 王小右
版权声明: 咳咳想白嫖文章?本文章著作权归作者所有,任何形式的转载都请注明出处。 https://www.charmcode.cn !
还能输入 100 字
  目录