101 lines
1.9 KiB
Go
Raw Permalink Normal View History

2025-05-12 18:43:41 +08:00
package dtimer
2025-05-15 17:30:33 +08:00
import (
"admin/lib/xlog"
"fmt"
"runtime"
"sync"
"time"
)
2025-05-12 18:43:41 +08:00
/*
分布式定时器用于解决比如多个进程启动后只有一个进程触发定时器事件执行
目标不影响架构复杂性做个能用的出来思路是用mysql做分布式锁多个进程抢占单例执行
*/
2025-05-15 17:30:33 +08:00
type Timer struct {
Key string
Duration time.Duration
IsInterval bool
Callback func()
expireAt time.Time
timerMgr *DTimerMgr
}
// todo 先实现一个本地数据库单例版本的
type DTimerMgr struct {
locker *sync.Mutex
timers map[string]*Timer
taskQueue chan *Timer
}
func NewDTimerMgr() *DTimerMgr {
mgr := new(DTimerMgr)
mgr.locker = new(sync.Mutex)
mgr.timers = make(map[string]*Timer)
mgr.taskQueue = make(chan *Timer, 1024)
go mgr.start()
return mgr
}
func (mgr *DTimerMgr) Add(task *Timer) (*Timer, bool) {
mgr.locker.Lock()
defer mgr.locker.Unlock()
old, find := mgr.timers[task.Key]
task.expireAt = time.Now().Add(task.Duration)
mgr.timers[task.Key] = task
return old, find
}
func (mgr *DTimerMgr) Del(key string) {
mgr.locker.Lock()
defer mgr.locker.Unlock()
delete(mgr.timers, key)
}
func (mgr *DTimerMgr) start() {
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for {
select {
case t := <-mgr.taskQueue:
func() {
defer xlog.CatchWithInfo(fmt.Sprintf("handle timer %v panic", t.Key))
t.Callback()
}()
}
}
}()
}
tk := time.NewTicker(time.Second)
defer tk.Stop()
for {
select {
case <-tk.C:
mgr.locker.Lock()
mgr.checkExpire()
mgr.locker.Unlock()
}
}
}
func (mgr *DTimerMgr) checkExpire() {
timeNow := time.Now()
for k, v := range mgr.timers {
if v.expireAt.Before(timeNow) {
if !v.IsInterval {
delete(mgr.timers, k)
} else {
v.expireAt = timeNow.Add(v.Duration)
}
select {
case mgr.taskQueue <- v:
default:
xlog.Warnf("timer channel full!!!")
}
}
}
2025-05-12 18:43:41 +08:00
}