package dtimer import ( "admin/lib/xlog" "fmt" "runtime" "sync" "time" ) /* 分布式定时器,用于解决比如多个进程启动后,只有一个进程触发定时器事件执行, 目标不影响架构复杂性,做个能用的出来。思路是用mysql做分布式锁,多个进程抢占单例执行 */ type Timer struct { Key string Duration time.Duration IsInterval bool Callback func() expireAt time.Time timerMgr *DTimerMgr } // todo 先实现一个本地数据库单例版本的 type DTimerMgr struct { locker *sync.Mutex timers map[string]*Timer taskQueue chan *Timer } func NewDTimerMgr() *DTimerMgr { mgr := new(DTimerMgr) mgr.locker = new(sync.Mutex) mgr.timers = make(map[string]*Timer) mgr.taskQueue = make(chan *Timer, 1024) go mgr.start() return mgr } func (mgr *DTimerMgr) Add(task *Timer) (*Timer, bool) { mgr.locker.Lock() defer mgr.locker.Unlock() old, find := mgr.timers[task.Key] task.expireAt = time.Now().Add(task.Duration) mgr.timers[task.Key] = task return old, find } func (mgr *DTimerMgr) Del(key string) { mgr.locker.Lock() defer mgr.locker.Unlock() delete(mgr.timers, key) } func (mgr *DTimerMgr) start() { for i := 0; i < runtime.NumCPU(); i++ { go func() { for { select { case t := <-mgr.taskQueue: func() { defer xlog.CatchWithInfo(fmt.Sprintf("handle timer %v panic", t.Key)) t.Callback() }() } } }() } tk := time.NewTicker(time.Second) defer tk.Stop() for { select { case <-tk.C: mgr.locker.Lock() mgr.checkExpire() mgr.locker.Unlock() } } } func (mgr *DTimerMgr) checkExpire() { timeNow := time.Now() for k, v := range mgr.timers { if v.expireAt.Before(timeNow) { if !v.IsInterval { delete(mgr.timers, k) } else { v.expireAt = timeNow.Add(v.Duration) } select { case mgr.taskQueue <- v: default: xlog.Warnf("timer channel full!!!") } } } }