package node import ( "admin/lib/xlog" "fmt" "github.com/gin-gonic/gin" ) // Task 不会永久执行的任务,串行用于启动前初始化或者启动后初始化工作,返回error就停止application type Task func() error // Worker 永久执行的工作协程,一旦停止就停止application type Worker func() error // Job 不会永久执行的任务,且不关心执行结果,不关心执行顺序,例如内存预热等 type Job func() type pair struct { key any value any } // Application 受scheduler调度的最小逻辑单元,有独立的启动参数、各种串行、并行任务 type Application struct { Name string bootFlag interface{} initializeTasks []pair // 启动服务前串行执行初始化任务的job services []pair // rpc服务 servers []pair // web服务 postRunTasks []pair // 启动后串行执行的job postRunWorkers []pair // 启动后后台永久执行的工作协程,一旦推出就停止application parallelJobs []pair // 启动services、servers后并行执行的任务,不关心结果,例如内存数据的预热等 stopTasks []pair } // newApp func newApp(name string, options ...AppOption) *Application { app := new(Application) app.Name = name app.applyOptions(options...) return app } // WithInitializeTask app完成init之后run之前执行的任务,可以用来初始化某些业务或者检查配置等 func (app *Application) WithInitializeTask(desc string, task Task) *Application { if task == nil { return app } app.initializeTasks = append(app.initializeTasks, pair{desc, task}) return app } // WithServer 添加web服务器 func (app *Application) WithServer(desc string, addr string) *gin.Engine { server := gin.Default() app.servers = append(app.servers, pair{desc, pair{addr, server}}) return server } // WithService 添加rpc服务 //func (app *Application) WithService(desc string, service *joyservice.ServicesManager) *Application { // if service == nil { // return app // } // app.services = append(app.services, pair{desc, service}) // return app //} // WithPostTask app run之后执行的任务,一般做临时检查任务,可以用来服务启动后加载数据检查等 func (app *Application) WithPostTask(desc string, task Task) *Application { if task == nil { return nil } app.postRunTasks = append(app.postRunTasks, pair{desc, task}) return app } // WithPostWorker 完成post task之后执行的后台任务,报错退出等app也会退出,一般做永久的关键后台逻辑 func (app *Application) WithPostWorker(desc string, worker Worker) *Application { if worker == nil { return app } app.postRunWorkers = append(app.postRunWorkers, pair{desc, worker}) return app } // WithParallelJob 完成post task之后执行的并行后台任务,一般做永久的不关键后台逻辑,例如内存预热等 func (app *Application) WithParallelJob(desc string, job Job) *Application { if job == nil { return app } app.parallelJobs = append(app.parallelJobs, pair{desc, job}) return app } // WithStopTask 注册停服逻辑(只能处理正常停服,异常例如panic、oom会监听不到) func (app *Application) WithStopTask(desc string, task Task) *Application { if task == nil { return app } app.stopTasks = append(app.stopTasks, pair{desc, task}) return app } func (app *Application) GetBootConfig() any { return app.bootFlag } func (app *Application) applyOptions(options ...AppOption) *Application { for _, option := range options { option.Apply(app) } return app } func (app *Application) run() (err error) { waitChan := make(chan error, 1) // 启动前的初始化任务 for _, j := range app.initializeTasks { curErr := j.value.(Task)() if curErr != nil { err = fmt.Errorf("run initialize task(%s) return error:%v", j.key, curErr) return } } // 启动rpc服务 //for _, pair := range app.services { // go func(desc string, s *joyservice.ServicesManager) { // jlog.Noticef("app %v service %v will listen on %v", app.Name, desc, s.Addr) // curErr := s.Run() // if curErr != nil { // waitChan <- fmt.Errorf("service %s run on %v error:%v", desc, s.Addr, curErr) // } else { // // } // }(pair.desc, pair.item.(*joyservice.ServicesManager)) //} //defer app.stopServices() // 启动web服务 for _, server := range app.servers { go func(desc string, info pair) { addr := info.key.(string) engine := info.value.(*gin.Engine) xlog.Noticef("app %v server %v will listen on %v", app.Name, desc, addr) err := engine.Run(addr) if err != nil { waitChan <- fmt.Errorf("server %s error:%v", desc, err) } else { } }(server.key.(string), server.value.(pair)) } //defer app.stopServers() // 启动后串行执行的job for _, j := range app.postRunTasks { curErr := j.value.(Task)() if curErr != nil { err = fmt.Errorf("run post task %s return error:%v", j.key, curErr) return } } // 启动后串行执行的工作协程 for _, worker := range app.postRunWorkers { go func(desc string, g Worker) { curErr := g() if curErr != nil { waitChan <- fmt.Errorf("run post worker %s return error:%v", desc, curErr) } }(worker.key.(string), worker.value.(Worker)) } // 启动后的并行job for _, j := range app.parallelJobs { go j.value.(Job)() } xlog.Noticef("application[%v] run ok.", app.Name) select { case anyErr := <-waitChan: xlog.Critif("scheduler stop with execute error:%v", anyErr) return anyErr } } func (app *Application) stop() { for _, task := range app.stopTasks { err := task.value.(Task)() if err != nil { xlog.Errorf("app stop, execute %v error:%v", task.key, err) } else { xlog.Infof("app %v stop, execute task:%v", app.Name, task.key) } } //app.stopServices() //app.stopServers() } //func (app *Application) stopServers() { // for _, s := range app.servers { // s.item.(*web.Engine).Stop() // } //} //func (app *Application) stopServices() { // for _, s := range app.services { // s.item.(*joyservice.ServicesManager).Stop() // } //}