golang 第三方定时任务库 github.com/robfig/cron/v3 核心源码解读

定时任务是一个通用场景的功能,在golang中,现在github最为常见的一个第三方定时任务库就是 github.com/robfig/cron/v3 目前(2020年1月9日) 7.2k Star。 我之前使用Python的时候用惯了apscheduler,切换这个是真的不习惯。

感觉github.com/robfig/cron/v3功能太简陋了,

  • 不支持定时任务持久化,我重启一下服务,调度任务信息就没了,需要自己存储调度信息。
  • 再比如不支持一次定时见issue)等,虽然有PR 但是 v3 分支还是依旧不支持,parse文件函数不支持,虽然可以按照作者的说法,调用一次之后再调用移除可以实现。
  • 不支持立即运行,见issue ,作者表示可以调用后手动调用解决,但是我感觉不够优雅,没有指定首次运行时间方便。(我突然有种想提PR的冲动,哈哈哈)

综上,个人感觉这个库封装的不是很完善,作为一个golang新手,读解析一下这个定时任务库,还是很有收获的。如果能力允许,以解决以上问题为目标,自己提PR。 这篇文章算是衔接上一篇文章,goroutine的一个基础应用。

注意

文章内容皆为个人见解,并且只看了核心的实现方式,细节部分没有解析,不保证准确,如果和你的理解有歧义,以你的为准。

前置知识

你需要掌握golang的 goroutine知识,包括channel通信,select多路复用, time.NewTimer等知识,否则解读起来就会很困难。 time.NewTimer的作用

func main(){
    // Calling NewTimer method
    timer := time.NewTimer(5 * time.Second)

    // Notifying the channel
    <-timer.C

    // Printed after 5 seconds 5秒之后输出
    fmt.Println("Timer is inactivated")
}

简单的demo

更多使用demo可以参考 个人Go学习笔记


package _1_demo

import (
    "fmt"
    "github.com/robfig/cron/v3"
    "testing"
    "time"
)

// 定时任务
func jobTask() {
    fmt.Printf( "任务启动: %s \n",time.Now().Format("2006-01-02 15:04:05"))
}

func TestCron(t *testing.T) {
    // 创建一个cron对象
    c := cron.New()

    // 任务调度
    enterId, err := c.AddFunc("@every 3s", jobTask)
    if err!=nil{
        panic(err)
    }
    fmt.Printf("任务id是 %d \n", enterId)

    // 同步执行任务会阻塞当前执行顺序  一般使用Start()
    //c.Run()
    //fmt.Println("当前执行顺序.......")

    // goroutine 协程启动定时任务(看到后面Start函数和run()函数,就会明白启动这一步也可以写在任务调度之前执行)
    c.Start()
    // Start()内部有一个running 布尔值 限制只有一个Cron对象启动 所以这一步多个 c.Start() 也只会有一个运行
    c.Start()
    c.Start()

    // 用于阻塞 后面可以使用 select {} 阻塞
    time.Sleep(time.Second * 9)

    // 关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
    c.Stop()

}

源码解读

核心文件主要就是cron.go文件

首先可以看到 c := cron.New() 创建了这个 Cron结构体对象

type Cron struct {
    entries   []*Entry                         // 用于存放job指针对象的数组
    chain     Chain
    stop      chan struct{}                  // 定制调度任务
    add       chan *Entry                    // 添加一个调度任务
    remove    chan EntryID               // 移除 一个调度任务
    snapshot  chan chan []Entry     // 正在运行中的调度任务 
    running   bool                             // 保证整个Cron对象只启动一次 和启动后其他chan正常
    logger    Logger                          // 记录日志
    runningMu sync.Mutex              // 协程锁,确保执行安全
    location  *time.Location            // 时区
    parser    ScheduleParser           // 解析参数
    nextID    EntryID                         // 下一个调度任务的id
    jobWaiter sync.WaitGroup        // 确保单一的调度任务执行完毕
}

Entry包含那些


// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
    // ID is the cron-assigned ID of this entry, which may be used to look up a
    // snapshot or remove it.
    ID EntryID  // 任务调度Id,默认是自增 创建任务时返回

    // Schedule on which this job should be run.
    Schedule Schedule // 调度任务运行

    // Next time the job will run, or the zero time if Cron has not been
    // started or this entry's schedule is unsatisfiable
    Next time.Time       // 下次执行时间

    // Prev is the last time this job was run, or the zero time if never.
    Prev time.Time      // 上次执行时间

    // WrappedJob is the thing to run when the Schedule is activated.
    WrappedJob Job      // 执行的任务

    // Job is the thing that was submitted to cron.
    // It is kept around so that user code that needs to get at the job later,
    // e.g. via Entries() can do so.
    Job Job
}

调度任务enterId, err := c.AddFunc("@every 3s", jobTask) 会使用以下两个文件来解析定时执行的参数,也就是翻译给golang 解析@erery 3s是干什么

启动c.Start()

// Start the cron scheduler in its own goroutine, or no-op if already started.
func (c *Cron) Start() {
    c.runningMu.Lock()
    defer c.runningMu.Unlock()
    if c.running {
        return
    }
    c.running = true
    go c.run()
}

可以看到Start() 执行了三个操作

  • 1 上锁 最后解锁
  • 2 判断此对象的状态是否正在运行,如果运行了直接 return
  • 3 如果没有运行,就修改状态,然后启动协程运行 run()方法

核心逻辑run()方法

这里需要知道的知识

  • time.NewTimer(d Duration)返回一个sleep指定时间的channel
  • select{} 多路复用阻塞
    • 任意一个case满足select{}就回直接执行结束
  • sort.Sort 的用法(我这里不是很熟悉猜测是把最短循环时间的任务排在最前面)
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
    c.logger.Info("start")

    // Figure out the next activation times for each entry.
    now := c.now()  // 获取现在时间
        // 循环调度任务 计算下一次执行时间
    for _, entry := range c.entries {  
        entry.Next = entry.Schedule.Next(now)
        c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    }

        // 最外层死循环 这一层会一直存在
    for {
        // Determine the next entry to run.
        sort.Sort(byTime(c.entries)) // 排序确定下一个要运行的目标

        var timer *time.Timer // 声明一个Timer 指针变量
        if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
            // If there are no entries yet, just sleep - it still handles new entries
            // and stop requests.
                        // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,用于阻塞下面的 select{} ,因为`select`是多路复用,其他channel能返回数据时,select就回执行不会阻塞。
                        // 所以当没有任务时,启动Start()程序 就会被这个阻塞
            timer = time.NewTimer(100000 * time.Hour)
        } else {
                        // 如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
            timer = time.NewTimer(c.entries[0].Next.Sub(now))
        }
                // 第二层死循环  内部使用select{}阻塞
        for {
            select {
                        // 上一步中的 timer sleep时间如果到了就执行
            case now = <-timer.C:
                now = now.In(c.location)
                c.logger.Info("wake", "now", now)

                // Run every entry whose next time was less than now
                for _, e := range c.entries {
                    if e.Next.After(now) || e.Next.IsZero() {
                        break
                    }
                    c.startJob(e.WrappedJob)
                    e.Prev = e.Next
                    e.Next = e.Schedule.Next(now)
                    c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
                }
                        // 向Cron中添加了 一个调度任务就会执行
            case newEntry := <-c.add:
                timer.Stop()
                now = c.now()
                newEntry.Next = newEntry.Schedule.Next(now)
                c.entries = append(c.entries, newEntry)
                c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

            case replyChan := <-c.snapshot:
                replyChan <- c.entrySnapshot()
                continue
                        // 停止定时任务
            case <-c.stop:
                timer.Stop()
                c.logger.Info("stop")
                return
                        // 移除任务
            case id := <-c.remove:
                timer.Stop()
                now = c.now()
                c.removeEntry(id)
                c.logger.Info("removed", "entry", id)
            }
                        // 当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
            break
        }
    }
}

自己的总结

这个robfig/cron/v3 这个库实现定时任务的核心逻辑,就是利用以下几个点:

  • 主体for循环
    • 循环配合time.NewTimerchannel sleep确保定时任务能定时执行
  • select多路选择阻塞
    • 确保能在任意时间改变其case中的channel 执行
  • time.NewTimersleep定时时间

疑惑的地方

自己看完还是有很多疑惑的地方,很多函数用法,没怎么用过比如:

  • snapshot chan chan []Entry
    • 定义一个chan 类型是 chan []Entry ?? 没怎么见过这种用法
  • 其他函数用法(慢慢学习吧)

总体来说思路设计的很巧妙,感觉如果只是单纯的写web接口的话,很少直接接触到这样的设计。 最后顺便说一句Golang简单的语法是真的方便。


文章作者: 王小右
版权声明: 咳咳想白嫖文章?本文章著作权归作者所有,任何形式的转载都请注明出处。 https://www.charmcode.cn !
还能输入 100 字
  目录