bi/application/init.go

261 lines
6.1 KiB
Go
Raw Normal View History

2022-01-26 16:40:50 +08:00
package application
import (
"fmt"
"github.com/1340691923/xwl_bi/controller"
"github.com/1340691923/xwl_bi/engine/db"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/model"
"github.com/1340691923/xwl_bi/platform-basic-libs/rbac"
"github.com/1340691923/xwl_bi/platform-basic-libs/service/report"
"github.com/1340691923/xwl_bi/platform-basic-libs/sinker"
"github.com/1340691923/xwl_bi/platform-basic-libs/util"
"github.com/gofiber/websocket/v2"
"github.com/tidwall/gjson"
"go.uber.org/zap"
"io"
"log"
"path/filepath"
"strconv"
)
// 初始化日志
func InitLogs() (fn func(), err error) {
logger := logs.NewLog(
logs.WithLogPath(filepath.Join(model.GlobConfig.Comm.Log.LogDir, model.CmdName)),
logs.WithStorageDays(model.GlobConfig.Comm.Log.StorageDays),
)
logs.Logger, err = logger.InitLog()
if err != nil {
return
}
log.Println(fmt.Sprintf("日志组件初始化成功!日志所在目录:%v保存天数为%v", model.GlobConfig.Comm.Log.LogDir, model.GlobConfig.Comm.Log.StorageDays))
fn = func() {}
return
}
// 初始化mysql连接
func InitMysql() (fn func(), err error) {
config := model.GlobConfig.Comm.Mysql
dbSource := fmt.Sprintf(
"%s:%s@tcp(%s:%s)/%s",
config.Username,
config.Pwd,
config.IP,
config.Port,
config.DbName)
db.Sqlx, err = db.NewSQLX(
"mysql",
dbSource,
config.MaxOpenConns,
config.MaxIdleConns,
)
if err != nil {
return
}
log.Println(fmt.Sprintf("Mysql组件初始化成功连接%v最大打开连接数%v最大等待连接数:%v",
dbSource,
config.MaxOpenConns,
config.MaxIdleConns,
))
fn = func() {}
return
}
// 初始化mysql连接
func InitClickHouse() (fn func(), err error) {
config := model.GlobConfig.Comm.ClickHouse
dbSource := fmt.Sprintf(
"tcp://%s:%s?username=%s&password=%s&database=%s&compress=true",
config.IP,
config.Port,
config.Username,
config.Pwd,
config.DbName,
)
db.ClickHouseSqlx, err = db.NewSQLX(
"clickhouse",
dbSource,
config.MaxOpenConns,
config.MaxIdleConns,
)
if err != nil {
return
}
log.Println(fmt.Sprintf("ClickHouse组件初始化成功连接%v最大打开连接数%v最大等待连接数:%v",
dbSource,
config.MaxOpenConns,
config.MaxIdleConns,
))
fn = func() {}
return
}
// 初始化mysql连接
func InitEsClient() (fn func(), err error) {
config := model.GlobConfig.Comm.ElasticSearch
db.EsClient, err = db.NewEsClient(
config.Addresses,
config.Username,
config.Password,
)
if err != nil {
return
}
log.Println(fmt.Sprintf("ES组件初始化成功连接%v用户名%v密码:%v",
config.Addresses,
config.Username,
config.Password,
))
fn = func() {}
return
}
// 初始化redis
func InitRedisPool() (fn func(), err error) {
config := model.GlobConfig.Comm.Redis
db.RedisPool = db.NewRedisPool(config.Addr, config.Passwd, config.Db, config.MaxIdle, config.MaxActive)
log.Println(fmt.Sprintf("Redis组件初始化成功连接%vDB%v密码:%v MaxIdle:%v MaxActive:%v",
config.Addr,
config.Db,
config.Passwd,
config.MaxIdle,
config.MaxActive,
))
fn = func() {}
return
}
// 初始化项目启动任务
func InitTask() (fn func(), err error) {
fn = func() {}
return
}
// 初始化项目启动任务
func InitRbac() (fn func(), err error) {
config := model.GlobConfig.Comm.Mysql
dbSource := fmt.Sprintf(
"%s:%s@tcp(%s:%s)/%s",
config.Username,
config.Pwd,
config.IP,
config.Port,
config.DbName)
err = rbac.Run("mysql", dbSource)
if err != nil {
return
}
log.Println(fmt.Sprintf("Rbac组件初始化成功连接%v",
dbSource,
))
return
}
// 掉起浏览器
func InitOpenWinBrowser() (fn func(), err error) {
config := model.GlobConfig
if !config.Manager.DeBug {
port := ":" + strconv.Itoa(config.Manager.Port)
uri := fmt.Sprintf("%s%s", "http://127.0.0.1", port)
util.OpenWinBrowser(uri)
log.Println(fmt.Sprintf("将打开浏览器!地址为:%v",
uri,
))
}
fn = func() {}
return
}
//初始化kafka异步生产者
func InitKafkaAsyncProduce() (fn func(), err error) {
config := model.GlobConfig.Comm.Kafka
conn, err := db.NewKafkaAsyncProduce(config.Addresses, config.Username, config.Password)
if err != nil {
return
}
db.KafkaASyncProducer = conn
fn = func() {
log.Println("KafkaASyncProducer 关闭了")
db.KafkaASyncProducer.Close()
}
return
}
//初始化kafka同步步生产者
func InitKafkaSyncProduce() (fn func(), err error) {
config := model.GlobConfig.Comm.Kafka
conn, err := db.NewKafkaSyncProduce(config.Addresses, config.Username, config.Password)
if err != nil {
return
}
db.KafkaSyncProducer = conn
fn = func() {
log.Println("KafkaSyncProducer 关闭了")
db.KafkaSyncProducer.Close()
}
return
}
//
func InitDebugSarama() (fn func(), err error) {
debugSarama := sinker.NewKafkaSarama()
err = debugSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.DebugDataTopicName, model.GlobConfig.Comm.Kafka.DebugDataGroup, func(msg model.InputMessage, markFn func()) {
distinctId := gjson.GetBytes(msg.Value, "distinct_id").String()
managerMap, ok := controller.ConnUUidMap.Load(distinctId)
if ok {
managerMap.(*controller.ManagerConnMap).Conns.Range(func(key, value interface{}) bool {
if err := value.(*websocket.Conn).WriteJSON(map[string]interface{}{
"code": 1,
"data": util.Bytes2str(msg.Value),
}); err != nil {
if err == io.EOF {
logs.Logger.Error("客户端已经断开WsSocket!", zap.Error(err))
} else if err.Error() == "use of closed network connection" {
logs.Logger.Error("服务端已经断开WsSocket!", zap.Error(err))
} else {
logs.Logger.Error("socket err!", zap.Error(err))
}
managerMap.(*controller.ManagerConnMap).DeleteConn(key.(string))
controller.ConnUUidMap.Store(distinctId, managerMap)
}
return true
})
}
}, func() {
})
if err != nil {
return
}
go debugSarama.Run()
log.Println(fmt.Sprintf("Debug数据消费者启动"))
fn = func() {
debugSarama.Stop()
}
return
}
func RefreshTableId() (fn func(), err error) {
fn = func() {
go report.RefreshTableIdMap()
}
return
}