2025-05-15 17:30:33 +08:00

101 lines
1.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package dtimer
import (
"admin/lib/xlog"
"fmt"
"runtime"
"sync"
"time"
)
/*
分布式定时器,用于解决比如多个进程启动后,只有一个进程触发定时器事件执行,
目标不影响架构复杂性做个能用的出来。思路是用mysql做分布式锁多个进程抢占单例执行
*/
type Timer struct {
Key string
Duration time.Duration
IsInterval bool
Callback func()
expireAt time.Time
timerMgr *DTimerMgr
}
// todo 先实现一个本地数据库单例版本的
type DTimerMgr struct {
locker *sync.Mutex
timers map[string]*Timer
taskQueue chan *Timer
}
func NewDTimerMgr() *DTimerMgr {
mgr := new(DTimerMgr)
mgr.locker = new(sync.Mutex)
mgr.timers = make(map[string]*Timer)
mgr.taskQueue = make(chan *Timer, 1024)
go mgr.start()
return mgr
}
func (mgr *DTimerMgr) Add(task *Timer) (*Timer, bool) {
mgr.locker.Lock()
defer mgr.locker.Unlock()
old, find := mgr.timers[task.Key]
task.expireAt = time.Now().Add(task.Duration)
mgr.timers[task.Key] = task
return old, find
}
func (mgr *DTimerMgr) Del(key string) {
mgr.locker.Lock()
defer mgr.locker.Unlock()
delete(mgr.timers, key)
}
func (mgr *DTimerMgr) start() {
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for {
select {
case t := <-mgr.taskQueue:
func() {
defer xlog.CatchWithInfo(fmt.Sprintf("handle timer %v panic", t.Key))
t.Callback()
}()
}
}
}()
}
tk := time.NewTicker(time.Second)
defer tk.Stop()
for {
select {
case <-tk.C:
mgr.locker.Lock()
mgr.checkExpire()
mgr.locker.Unlock()
}
}
}
func (mgr *DTimerMgr) checkExpire() {
timeNow := time.Now()
for k, v := range mgr.timers {
if v.expireAt.Before(timeNow) {
if !v.IsInterval {
delete(mgr.timers, k)
} else {
v.expireAt = timeNow.Add(v.Duration)
}
select {
case mgr.taskQueue <- v:
default:
xlog.Warnf("timer channel full!!!")
}
}
}
}