uniugm/admin/lib/node/app.go

217 lines
6.0 KiB
Go
Raw Normal View History

2025-04-18 17:17:23 +08:00
package node
import (
"admin/lib/xlog"
"fmt"
"github.com/gin-gonic/gin"
)
// Task 不会永久执行的任务串行用于启动前初始化或者启动后初始化工作返回error就停止application
type Task func() error
// Worker 永久执行的工作协程一旦停止就停止application
type Worker func() error
// Job 不会永久执行的任务,且不关心执行结果,不关心执行顺序,例如内存预热等
type Job func()
type pair struct {
key any
value any
}
// Application 受scheduler调度的最小逻辑单元有独立的启动参数、各种串行、并行任务
type Application struct {
Name string
bootFlag interface{}
initializeTasks []pair // 启动服务前串行执行初始化任务的job
services []pair // rpc服务
servers []pair // web服务
postRunTasks []pair // 启动后串行执行的job
postRunWorkers []pair // 启动后后台永久执行的工作协程一旦推出就停止application
parallelJobs []pair // 启动services、servers后并行执行的任务不关心结果例如内存数据的预热等
stopTasks []pair
}
// newApp
func newApp(name string, options ...AppOption) *Application {
app := new(Application)
app.Name = name
app.applyOptions(options...)
return app
}
// WithInitializeTask app完成init之后run之前执行的任务可以用来初始化某些业务或者检查配置等
func (app *Application) WithInitializeTask(desc string, task Task) *Application {
if task == nil {
return app
}
app.initializeTasks = append(app.initializeTasks, pair{desc, task})
return app
}
// WithServer 添加web服务器
func (app *Application) WithServer(desc string, addr string) *gin.Engine {
server := gin.Default()
app.servers = append(app.servers, pair{desc, pair{addr, server}})
return server
}
// WithService 添加rpc服务
//func (app *Application) WithService(desc string, service *joyservice.ServicesManager) *Application {
// if service == nil {
// return app
// }
// app.services = append(app.services, pair{desc, service})
// return app
//}
// WithPostTask app run之后执行的任务一般做临时检查任务可以用来服务启动后加载数据检查等
func (app *Application) WithPostTask(desc string, task Task) *Application {
if task == nil {
return nil
}
app.postRunTasks = append(app.postRunTasks, pair{desc, task})
return app
}
// WithPostWorker 完成post task之后执行的后台任务报错退出等app也会退出一般做永久的关键后台逻辑
func (app *Application) WithPostWorker(desc string, worker Worker) *Application {
if worker == nil {
return app
}
app.postRunWorkers = append(app.postRunWorkers, pair{desc, worker})
return app
}
// WithParallelJob 完成post task之后执行的并行后台任务一般做永久的不关键后台逻辑例如内存预热等
func (app *Application) WithParallelJob(desc string, job Job) *Application {
if job == nil {
return app
}
app.parallelJobs = append(app.parallelJobs, pair{desc, job})
return app
}
// WithStopTask 注册停服逻辑只能处理正常停服异常例如panic、oom会监听不到
func (app *Application) WithStopTask(desc string, task Task) *Application {
if task == nil {
return app
}
app.stopTasks = append(app.stopTasks, pair{desc, task})
return app
}
func (app *Application) GetBootConfig() any {
return app.bootFlag
}
func (app *Application) applyOptions(options ...AppOption) *Application {
for _, option := range options {
option.Apply(app)
}
return app
}
func (app *Application) run() (err error) {
waitChan := make(chan error, 1)
// 启动前的初始化任务
for _, j := range app.initializeTasks {
curErr := j.value.(Task)()
if curErr != nil {
err = fmt.Errorf("run initialize task(%s) return error:%v", j.key, curErr)
return
}
}
// 启动rpc服务
//for _, pair := range app.services {
// go func(desc string, s *joyservice.ServicesManager) {
// jlog.Noticef("app %v service %v will listen on %v", app.Name, desc, s.Addr)
// curErr := s.Run()
// if curErr != nil {
// waitChan <- fmt.Errorf("service %s run on %v error:%v", desc, s.Addr, curErr)
// } else {
//
// }
// }(pair.desc, pair.item.(*joyservice.ServicesManager))
//}
//defer app.stopServices()
// 启动web服务
for _, server := range app.servers {
go func(desc string, info pair) {
addr := info.key.(string)
engine := info.value.(*gin.Engine)
xlog.Noticef("app %v server %v will listen on %v", app.Name, desc, addr)
err := engine.Run(addr)
if err != nil {
waitChan <- fmt.Errorf("server %s error:%v", desc, err)
} else {
}
}(server.key.(string), server.value.(pair))
}
//defer app.stopServers()
// 启动后串行执行的job
for _, j := range app.postRunTasks {
curErr := j.value.(Task)()
if curErr != nil {
err = fmt.Errorf("run post task %s return error:%v", j.key, curErr)
return
}
}
// 启动后串行执行的工作协程
for _, worker := range app.postRunWorkers {
go func(desc string, g Worker) {
curErr := g()
if curErr != nil {
waitChan <- fmt.Errorf("run post worker %s return error:%v", desc, curErr)
}
}(worker.key.(string), worker.value.(Worker))
}
// 启动后的并行job
for _, j := range app.parallelJobs {
go j.value.(Job)()
}
xlog.Noticef("application[%v] run ok.", app.Name)
select {
case anyErr := <-waitChan:
xlog.Critif("scheduler stop with execute error:%v", anyErr)
return anyErr
}
}
func (app *Application) stop() {
for _, task := range app.stopTasks {
err := task.value.(Task)()
if err != nil {
xlog.Errorf("app stop, execute %v error:%v", task.key, err)
} else {
xlog.Infof("app %v stop, execute task:%v", app.Name, task.key)
}
}
//app.stopServices()
//app.stopServers()
}
//func (app *Application) stopServers() {
// for _, s := range app.servers {
// s.item.(*web.Engine).Stop()
// }
//}
//func (app *Application) stopServices() {
// for _, s := range app.services {
// s.item.(*joyservice.ServicesManager).Stop()
// }
//}