XTimer源码分析笔记

XTimer源码分析笔记

一、开篇必看

1.本篇文章是源于XTimer源码的笔记记录,适合看过源码的人重头到尾梳理一遍知识巩固记忆,基础小白不用浪费时间

 

二、redis内存储数据介绍

函数位置:xtimer-main\dao\task\cache.go:BatchCreateTasks()

1.task任务通过迁移模块迁移到redis中的存储格式为zset类型:

key: minute_timerid[%minute_maxBuket]                     value: timerid_unix                        socer: uinx                                  expire: 1day;

其中minute为当前task任务的分钟级时间,通过GetTableName()函数进行转换拼接为key,[%minute_maxBuket]为进阶技巧,当前分钟级的最大桶的数量,源码中minute_maxBuket为固定值,如需进阶则创建一个新的表记录每个分钟级的最大桶数量。timerid为定时器的id字段,uinx为执行时间的时间戳数值。

函数位置:xtimer-main\dao\task\cache.go:BatchCreateBucket()

2.每分钟级Bucket桶在redis中的记录存储格式为set类型:

key: buket_cnt_minute                                         value: conf.BuketNum+minute_taskcnt/200                                 expire: end_second;

buket_cnt_为key的前缀,minute为当前任务桶分钟级时间,conf.BuketNum为配置文件的默认桶数量,minute_taskcnt/200为含义为该分钟级每两百个任务增加一个桶数(源码中始终只使用的默认桶数量固定,并没有启用这个功能)。过期时间为创建桶的时间到给定结束时间的秒级时间

函数位置:xtimer-main\service\migrator\worker.go:Start()

3.迁移模块在迁移之前获取string):

key: migrator_lock_hour                                                                            value: processid_coroutineid

expire: conf.MigrateTryLockMinutes                                                          expire_success: conf.MigrateSucessExpireMinutes;

migrator_lock_为前缀,hour为当前迁移时间的小时级时间,processid为运行进程id,coroutineid为运行协程id,申请锁时的过期时间为conf.MigrateTryLockMinutes配置文件为20min,迁移成功则更新过期时间为conf.MigrateSucessExpireMinutes配置文件为120min

4.迁移器轮寻mysql里面的激活的timer时,根据当前小时的范围进行cron解析创建任务(从timerDAO类方法获得),批量存放入mysql(也是timerDAO),然后从mysql中取出当前小时的任务(taskDAO),将任务通过事务的模式,批量转移到redis里面去(taskCache

函数位置:xtimer-main\service\webserver\timer.go:CreateTimer()

5.在创建定时器获取string):

key: create_timer_appName                                value: processid_coroutineid                                   expire: defaultEnableGapSeconds;

appName为定时器app字段的内容,processid为运行进程id,coroutineid为运行协程id,申请锁的过期时间defaultEnableGapSeconds配置文件为3s

6.webserver创建定时器首先抢锁,然后设置3秒过期时间,验证cron表达式的正确性,vo模型转变为po模型,调用内部客户端将定时器保存到mysql,返回对应的timer.id

7.删除定时器timer也要抢锁,锁和创建定时器的keyvalue一样,然后根据id删除对应定时器,说明了创建和删除定时器是互斥的

函数位置:xtimer-main\service\webserver\timer.go:EnableTimer()

8.激活和去激活获取锁:

key: enable_timer_locker_appName                  value: processid_coroutineid                                     expire: defaultEnableGapSeconds;

enable_timer_locker_为前缀,appName为定时器app字段的内容,processid为运行进程id,coroutineid为运行协程id,申请锁的过期时间defaultEnableGapSeconds配置文件为3s

9.激活定时器,通过名字,获取锁成功以后,开启事务,通过id获取对应timer,验证是否是未激活状态,调用time.now(),在此时间基础上加两轮迁移时间120min,然后取小时级时间作为end时间,调用cronParser解析定时器现在到end时间段的所有需要执行的时间executetimes,根据executetimes创建对应tasks然后存入mysql,通过tasksCache将任务迁移到redis创建zset集合,修改定时器状态为激活态2(并且修改数据库中的)

10.取消激活定时器只需要获取锁(锁同激活定时器相容,两者也是互斥的)后查询到对应定时器判断其状态是否是激活态,修改状态为1

11.激活和取消激活会启动事物并加行级锁完成此操作

12.timer两种状态:(1)激活(2)未激活

     Task四种状态:(0)未运行(0)运行中(1)成功(2)失败

函数位置:xtimer-main\service\executor\worker.go:Work()

13.去重用的布隆过滤器(bitmap):

key: task_bloom_day                                           value: timerIDUnixKeytoBit;

task_bloom_为前缀,day为任务task的日级时间,timerIDUnixKeytoBit为timerid和unix的组合的两次hash(murmur3sha1)值的bit位。

函数位置:xtimer-main\service\scheduler\worker.go:asyncHandleSlice()

14.获取桶锁的(string):

key: time_bucket_lock_minute_bucketid                                           value: processid_coroutineid

expire: TryLockSeconds                                                                    expire_success: SuccessExpireSeconds;

time_bucket_lock_为前缀,minute_bucketid为分钟级时间和桶id的拼接,processid为运行进程id,coroutineid为运行协程id;TryLockSeconds配置文件固定为70s,SuccessExpireSeconds配置文件固定为130s

三、分布式定时器服务三大模块执行流程

函数位置:xtimer-main\service\scheduler\worker.go:Start()

1.调度器启动会通过触发器启动执行器里面timerserver开启一个协程(该协程每2min开启一个协程用来获取当前时间以后2min内需要执行的任务task从而获取他们的timer保存在timer.timers的map字段)
启动一个ticker每100豪秒触发一次执行,根据配置的桶数,循环遍历(循环次数为设置的分钟级桶的数量),每次循环,获取当前时间,
给协程池提交任务一:获取上一分钟级时间桶锁,调用触发器的work函数,将上一分钟级时间和桶id用“_”拼接(minute_bucketid)和ack延长锁时间函数一并传给触发器;任务二和任务一一样,但获取的是当前时间分钟级的所有数据

函数位置:xtimer-main\service\trigger\worker.go:Work()

2.触发器的执行流程:获取调度器传递过来的分钟级时间和桶id,根据配置文件开启每秒定时器,在传进来的分钟级时间上加一分钟当结束时间,创建一个safachan类型的notifier,容量为分钟数加一的(time.minute/zrangegapsecond+1的61,因为传来的是分钟级时间,而定时器是秒级定时器,该分钟能处理任务数量就是分钟级时间除以秒级定时器的时间,加一是因为取时间的闭区间执行)大小
定义一个sync.waitGroup类型的w,调用w.add(1),启动一个协程传入对应的minuteBuketkey和当前处理时间和加上一秒作为结束时间,该协程最后会执行w.done(),在协程中会获取该分钟级和桶id获取到指定系列的任务(获取规则先从redis里面找,找不到就走mysql找,注意mysql中获取的task需要通过timerid取余桶数量判断是否为当前桶的任务),遍历tasks从执行器的pool池中获取对应的协程提交任务:将timerid和task执行时间传给执行,如果有错误传递给上面定定义的notifier,然后下次ticker触发会判断notifier是否有消息,如果存在则直接返回错误中断执行,否则将start时间增加一个扫描时间1秒,判断是否是endTime结束循环,没结束则继续重复开启上诉协程执行操作最后再判断一下是否notifier里面有出错,所有协程完成以后,w.wait()等待唤醒后,最后兜底一次检查通道notifier里面是否有数据,无数据后调用调度器传来的ack()函数

函数位置:xtimer-main\service\executor\worker.go:Work()

3.执行器的执行流程,根据传入的timerid_unix进行拆分出timerunix,然后通过布隆过滤器进行判断,是否当前的timerunix存在于布隆过滤器,如果存在则进一步判断是否数据库中的该task的状态是否为未运行状态,如果是则执行task,否则本次任务是重复执行,直接写入日志,并不执行;如果布隆过滤器中不存在则直接执行task(保证了每个任务至少执行一遍)
执行阶段:通过本地存储的timers列表中根据id查到对应timer的vo模型(三级缓存中的第三级),检查该task对应的timer是否处于激活态,激活态才执行,记录当前时间,通过当前的timer的NotifyHTTPParam.Method判断进行对应方法的回调操作(调用xhttp的客户端),启动监控汇报携程,设置布隆过滤器为当前任务的timerid_unix两次哈希的bit位在redis中置为1 ,根据timeridunix从mysql中获取对应task,将回调方法的结果保存进output字段,并根据回调的结果是否出现err更新task.status字段从运行变为成功或失败写入mysql中

四、整体流程:

1.迁移器每小时触发一次,获取当前时间的整小时的时间作为key,尝试获取锁🔒设置20分钟的过期时间,value为当前的进程和携程id,获取后,当前时间加上一小时和加上两小时作为开始和结束时间,相当于将当前后一个小时的task进行迁移,先获取所有被激活的timer进行遍历,通过他的cron表达式和开始和结束时间创建所有这一小时内满足cron表达式的时间切片,根据时间切片和当前timer创建tasks,将tasks保存进mysql,然后睡五秒继续循环,直到循环结束;创建完tasks后根据后一小时的时间范围从mysql中获取所有tasks进行迁移到redis操作(注意因为循环创建tasks记录进mysql出现错误只会记录日志,不会返回,所以可能有任务创建失败,所以后面还是得以mysql查的tasks为准)迁移成功后将锁的时间修改为2小时大于阶段轮训的时间

2.调度器每0.1秒执行一次,遍历每一个桶,获取当前时间,通过携程池获取携程,提交任务包,第一个携程是执行当前时间的上一分钟级的时间,获取时间桶的🔒(分钟加桶id)值为进程加携程id,过期时间为70秒,将ack()锁延期函数和ctx和分钟与桶id组合传递给触发器work函数
触发器根据传入的分钟与桶id提取出分钟当做开始时间,分钟加一当结束时间,开启一个每秒定时器,创建一个容量为61的Safechan,开启一个携程,获取starttime和其加一秒的时间范围内的任务(redis找不到找mysql,mysql无法根据桶id分类,需要获取这个时间段的所有tasks然后用timer和桶id对比符合的就是要执行的task),遍历获取到的tasks,获取携程池提交任务,将当前任务的timerid和runtime时间传给执行器执行操作。根据每秒定时器的触发,将starttime向后移动一秒,重新执行获取任务的操作。调用ack()延时函数将分钟桶🔒延时为130秒

© 版权声明
评论 抢沙发

请登录后发表评论

    暂无评论内容