2025-05-12 18:43:41 +08:00
|
|
|
|
package dtimer
|
|
|
|
|
|
2025-05-15 17:30:33 +08:00
|
|
|
|
import (
|
|
|
|
|
"admin/lib/xlog"
|
|
|
|
|
"fmt"
|
|
|
|
|
"runtime"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
2025-05-12 18:43:41 +08:00
|
|
|
|
/*
|
|
|
|
|
分布式定时器,用于解决比如多个进程启动后,只有一个进程触发定时器事件执行,
|
|
|
|
|
目标不影响架构复杂性,做个能用的出来。思路是用mysql做分布式锁,多个进程抢占单例执行
|
|
|
|
|
*/
|
|
|
|
|
|
2025-05-15 17:30:33 +08:00
|
|
|
|
type Timer struct {
|
|
|
|
|
Key string
|
|
|
|
|
Duration time.Duration
|
|
|
|
|
IsInterval bool
|
|
|
|
|
Callback func()
|
|
|
|
|
expireAt time.Time
|
|
|
|
|
timerMgr *DTimerMgr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo 先实现一个本地数据库单例版本的
|
|
|
|
|
type DTimerMgr struct {
|
|
|
|
|
locker *sync.Mutex
|
|
|
|
|
timers map[string]*Timer
|
|
|
|
|
taskQueue chan *Timer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewDTimerMgr() *DTimerMgr {
|
|
|
|
|
mgr := new(DTimerMgr)
|
|
|
|
|
mgr.locker = new(sync.Mutex)
|
|
|
|
|
mgr.timers = make(map[string]*Timer)
|
|
|
|
|
mgr.taskQueue = make(chan *Timer, 1024)
|
|
|
|
|
go mgr.start()
|
|
|
|
|
return mgr
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mgr *DTimerMgr) Add(task *Timer) (*Timer, bool) {
|
|
|
|
|
mgr.locker.Lock()
|
|
|
|
|
defer mgr.locker.Unlock()
|
|
|
|
|
|
|
|
|
|
old, find := mgr.timers[task.Key]
|
|
|
|
|
task.expireAt = time.Now().Add(task.Duration)
|
|
|
|
|
mgr.timers[task.Key] = task
|
|
|
|
|
return old, find
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mgr *DTimerMgr) Del(key string) {
|
|
|
|
|
mgr.locker.Lock()
|
|
|
|
|
defer mgr.locker.Unlock()
|
|
|
|
|
delete(mgr.timers, key)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mgr *DTimerMgr) start() {
|
|
|
|
|
for i := 0; i < runtime.NumCPU(); i++ {
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case t := <-mgr.taskQueue:
|
|
|
|
|
func() {
|
|
|
|
|
defer xlog.CatchWithInfo(fmt.Sprintf("handle timer %v panic", t.Key))
|
|
|
|
|
t.Callback()
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tk := time.NewTicker(time.Second)
|
|
|
|
|
defer tk.Stop()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-tk.C:
|
|
|
|
|
mgr.locker.Lock()
|
|
|
|
|
mgr.checkExpire()
|
|
|
|
|
mgr.locker.Unlock()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (mgr *DTimerMgr) checkExpire() {
|
|
|
|
|
timeNow := time.Now()
|
|
|
|
|
for k, v := range mgr.timers {
|
|
|
|
|
if v.expireAt.Before(timeNow) {
|
|
|
|
|
if !v.IsInterval {
|
|
|
|
|
delete(mgr.timers, k)
|
|
|
|
|
} else {
|
|
|
|
|
v.expireAt = timeNow.Add(v.Duration)
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case mgr.taskQueue <- v:
|
|
|
|
|
default:
|
|
|
|
|
xlog.Warnf("timer channel full!!!")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-05-12 18:43:41 +08:00
|
|
|
|
}
|