101 lines
1.9 KiB
Go
101 lines
1.9 KiB
Go
package dtimer
|
||
|
||
import (
|
||
"admin/lib/xlog"
|
||
"fmt"
|
||
"runtime"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
/*
|
||
分布式定时器,用于解决比如多个进程启动后,只有一个进程触发定时器事件执行,
|
||
目标不影响架构复杂性,做个能用的出来。思路是用mysql做分布式锁,多个进程抢占单例执行
|
||
*/
|
||
|
||
type Timer struct {
|
||
Key string
|
||
Duration time.Duration
|
||
IsInterval bool
|
||
Callback func()
|
||
expireAt time.Time
|
||
timerMgr *DTimerMgr
|
||
}
|
||
|
||
// todo 先实现一个本地数据库单例版本的
|
||
type DTimerMgr struct {
|
||
locker *sync.Mutex
|
||
timers map[string]*Timer
|
||
taskQueue chan *Timer
|
||
}
|
||
|
||
func NewDTimerMgr() *DTimerMgr {
|
||
mgr := new(DTimerMgr)
|
||
mgr.locker = new(sync.Mutex)
|
||
mgr.timers = make(map[string]*Timer)
|
||
mgr.taskQueue = make(chan *Timer, 1024)
|
||
go mgr.start()
|
||
return mgr
|
||
}
|
||
|
||
func (mgr *DTimerMgr) Add(task *Timer) (*Timer, bool) {
|
||
mgr.locker.Lock()
|
||
defer mgr.locker.Unlock()
|
||
|
||
old, find := mgr.timers[task.Key]
|
||
task.expireAt = time.Now().Add(task.Duration)
|
||
mgr.timers[task.Key] = task
|
||
return old, find
|
||
}
|
||
|
||
func (mgr *DTimerMgr) Del(key string) {
|
||
mgr.locker.Lock()
|
||
defer mgr.locker.Unlock()
|
||
delete(mgr.timers, key)
|
||
}
|
||
|
||
func (mgr *DTimerMgr) start() {
|
||
for i := 0; i < runtime.NumCPU(); i++ {
|
||
go func() {
|
||
for {
|
||
select {
|
||
case t := <-mgr.taskQueue:
|
||
func() {
|
||
defer xlog.CatchWithInfo(fmt.Sprintf("handle timer %v panic", t.Key))
|
||
t.Callback()
|
||
}()
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
tk := time.NewTicker(time.Second)
|
||
defer tk.Stop()
|
||
for {
|
||
select {
|
||
case <-tk.C:
|
||
mgr.locker.Lock()
|
||
mgr.checkExpire()
|
||
mgr.locker.Unlock()
|
||
}
|
||
}
|
||
}
|
||
|
||
func (mgr *DTimerMgr) checkExpire() {
|
||
timeNow := time.Now()
|
||
for k, v := range mgr.timers {
|
||
if v.expireAt.Before(timeNow) {
|
||
if !v.IsInterval {
|
||
delete(mgr.timers, k)
|
||
} else {
|
||
v.expireAt = timeNow.Add(v.Duration)
|
||
}
|
||
select {
|
||
case mgr.taskQueue <- v:
|
||
default:
|
||
xlog.Warnf("timer channel full!!!")
|
||
}
|
||
}
|
||
}
|
||
}
|