bi/cmd/sinker/main.go

338 lines
10 KiB
Go
Raw Normal View History

2022-01-26 16:40:50 +08:00
package main
import (
"flag"
"fmt"
"github.com/1340691923/xwl_bi/application"
"github.com/1340691923/xwl_bi/cmd/sinker/action"
"github.com/1340691923/xwl_bi/cmd/sinker/geoip"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/model"
"github.com/1340691923/xwl_bi/platform-basic-libs/service/consumer_data"
"github.com/1340691923/xwl_bi/platform-basic-libs/sinker"
parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse"
"github.com/1340691923/xwl_bi/platform-basic-libs/util"
_ "github.com/ClickHouse/clickhouse-go"
_ "github.com/go-sql-driver/mysql"
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"go.uber.org/zap"
"log"
"math"
"net/http"
_ "net/http/pprof"
"runtime"
"strconv"
2022-03-03 15:50:32 +08:00
"time"
2022-01-26 16:40:50 +08:00
)
var (
configFileDir string
configFileName string
configFileExt string
)
func init() {
flag.StringVar(&configFileDir, "configFileDir", "config", "配置文件夹名")
flag.StringVar(&configFileName, "configFileName", "config", "配置文件名")
flag.StringVar(&configFileExt, "configFileExt", "json", "配置文件后缀")
flag.Parse()
2022-03-02 18:20:08 +08:00
2022-01-26 16:40:50 +08:00
}
//核心逻辑都在sinker这边
/*
1 实时数据入es 也就是RealTimeWarehousing
2 获取数据类型和ck的把没有的字段记录下来 并且入mysql
3 判断数据类型 不正确看能不能转化 ReportAcceptStatus
4 最终数据入 ck
*/
func main() {
defer func() {
if r := recover(); r != nil {
//打印调用栈信息
buf := make([]byte, 2048)
n := runtime.Stack(buf, false)
stackInfo := fmt.Sprintf("%s", buf[:n])
logs.Logger.Sugar().Errorf("panic stack info %s", stackInfo)
logs.Logger.Sugar().Errorf("--->consumer_data Error:", r)
}
}()
app := application.NewApp(
"sinker",
application.WithConfigFileDir(configFileDir),
application.WithConfigFileName(configFileName),
application.WithConfigFileExt(configFileExt),
application.RegisterInitFnObserver(application.InitLogs),
application.RegisterInitFnObserver(application.InitMysql),
application.RegisterInitFnObserver(application.InitClickHouse),
application.RegisterInitFnObserver(application.InitRedisPool),
)
err := app.InitConfig().
NotifyInitFnObservers().
Error()
if err != nil {
logs.Logger.Error("Sinker 初始化失败", zap.Error(err))
panic(err)
}
2022-03-08 14:23:21 +08:00
defer app.Close()
2022-01-26 16:40:50 +08:00
geoip2, err := geoip.NewGeoip(geoip.GeoipMmdbByte)
if err != nil {
logs.Logger.Error("Geoip 初始化失败", zap.Error(err))
panic(err)
}
defer geoip2.Close()
go func() {
if model.GlobConfig.Sinker.PprofHttpPort != 0 {
2022-03-01 09:46:36 +08:00
httpPort := ":" + strconv.Itoa(int(model.GlobConfig.Sinker.PprofHttpPort))
2022-01-26 16:40:50 +08:00
if err := http.ListenAndServe(httpPort, nil); err != nil {
logs.Logger.Info("err", zap.Error(err))
}
}
}()
log.Println(fmt.Sprintf("sinker 服务启动成功,性能检测入口为: http://127.0.0.1:%v", model.GlobConfig.Sinker.PprofHttpPort))
realTimeWarehousing := consumer_data.NewRealTimeWarehousing(model.GlobConfig.Sinker.RealTimeWarehousing.BufferSize, model.GlobConfig.Sinker.RealTimeWarehousing.FlushInterval)
reportAcceptStatus := consumer_data.NewReportAcceptStatus(model.GlobConfig.Sinker.ReportAcceptStatus.BufferSize, model.GlobConfig.Sinker.ReportAcceptStatus.FlushInterval)
reportData2CK := consumer_data.NewReportData2CK(model.GlobConfig.Sinker.ReportData2CK.BufferSize, model.GlobConfig.Sinker.ReportData2CK.FlushInterval)
realTimeDataSarama := sinker.NewKafkaSarama()
reportData2CKSarama := realTimeDataSarama.Clone()
go action.MysqlConsumer()
2022-03-03 19:16:25 +08:00
go sinker.ClearDimsCacheByTime(time.Minute * 30)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
2022-03-04 13:02:20 +08:00
2022-01-26 16:40:50 +08:00
err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) {
2022-03-08 17:56:07 +08:00
2022-01-26 16:40:50 +08:00
//ETL
var kafkaData model.KafkaData
err = json.Unmarshal(msg.Value, &kafkaData)
if err != nil {
logs.Logger.Error("json.Unmarshal Err", zap.Error(err))
markFn()
return
}
appid,err := strconv.Atoi(kafkaData.TableId)
2022-01-26 16:40:50 +08:00
if err != nil {
logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err))
2022-01-26 16:40:50 +08:00
markFn()
return
}
//添加实时数据
err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{
Appid: int64(appid),
EventName: kafkaData.EventName,
CreateTime: kafkaData.ReportTime,
Data: kafkaData.ReqData,
})
2022-01-26 16:40:50 +08:00
if err != nil {
logs.Logger.Error("AddRealTimeData err", zap.Error(err))
}
markFn()
}, func() {})
if err != nil {
panic(err)
}
2022-03-07 17:15:08 +08:00
parserPool,err := parser.NewParserPool("fastjson")
if err!=nil{
panic(err)
}
2022-01-26 16:40:50 +08:00
err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) {
var kafkaData model.KafkaData
err = json.Unmarshal(msg.Value, &kafkaData)
if err != nil {
logs.Logger.Error("json.Unmarshal Err", zap.Error(err))
markFn()
return
}
kafkaData.Offset = msg.Offset
kafkaData.ConsumptionTime = msg.Timestamp.Format(util.TimeFormat)
gjsonArr := gjson.GetManyBytes(kafkaData.ReqData, "xwl_distinct_id", "xwl_client_time")
xwlDistinctId := gjsonArr[0].String()
xwlClientTime := gjsonArr[1].String()
tableId, _ := strconv.Atoi(kafkaData.TableId)
if kafkaData.EventName == ""{
markFn()
return
}
if xwlDistinctId == "" {
logs.Logger.Error("xwl_distinct_id 为空", zap.String("kafkaData.ReqData", util.Bytes2str(kafkaData.ReqData)))
var eventType = ""
switch kafkaData.ReportType {
case model.UserReportType:
eventType = "用户属性类型不合法"
case model.EventReportType:
eventType = "事件属性类型不合法"
}
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
2022-01-26 16:40:50 +08:00
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: eventType,
DataName: kafkaData.EventName,
ErrorReason: "xwl_distinct_id 不能为空",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
markFn()
return
}
if kafkaData.Ip != "" {
province, city, err := geoip2.GetAreaFromIP(kafkaData.Ip)
if err != nil {
logs.Logger.Sugar().Errorf("err", err)
}
if province != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_province", province)
}
if city != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_city", city)
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_ip", kafkaData.Ip)
}
clinetT := util.Str2Time(xwlClientTime, util.TimeFormat)
serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat)
if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 {
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
2022-01-26 16:40:50 +08:00
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: kafkaData.GetReportTypeErr(),
DataName: kafkaData.EventName,
ErrorReason: "客户端上报时间误差大于十分钟",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
logs.Logger.Sugar().Errorf("客户端上报时间误差大于十分钟", xwlClientTime, kafkaData.ReportTime)
markFn()
return
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_event", kafkaData.EventName)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_date", xwlClientTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
2022-03-07 17:15:08 +08:00
pool := parserPool.Get()
defer parserPool.Put(pool)
metric, err := pool.Parse(kafkaData.ReqData)
2022-01-26 16:40:50 +08:00
//解析开发者上报的json数据
if err != nil {
logs.Logger.Error("ParseKafkaData err", zap.Error(err))
markFn()
return
}
2022-03-02 19:31:17 +08:00
2022-01-26 16:40:50 +08:00
//生成表名
tableName := kafkaData.GetTableName()
//新增表结构
if err := action.AddTableColumn(
kafkaData,
func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) },
2022-01-26 16:40:50 +08:00
tableName,
metric,
); err != nil {
logs.Logger.Error("addTableColumn err", zap.String("tableName", tableName), zap.Error(err))
markFn()
return
}
2022-03-02 19:31:17 +08:00
2022-01-26 16:40:50 +08:00
//添加元数据
if err := action.AddMetaEvent(kafkaData); err != nil {
logs.Logger.Error("addMetaEvent err", zap.Error(err))
}
//入库成功
if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
2022-01-26 16:40:50 +08:00
PartDate: kafkaData.ReportTime,
TableId: tableId,
DataName: kafkaData.EventName,
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.SuccessStatus,
}); err != nil {
logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err))
}
//添加数据到ck用于后台统计
if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{
tableName: metric,
}); err != nil {
logs.Logger.Error("reportData2CK err", zap.Error(err))
markFn()
return
}
markFn()
//logs.Logger.Info("链路所花时长", zap.String("time", time.Now().Sub(startT).String()))
}, func() {})
if err != nil {
panic(err)
}
go reportData2CKSarama.Run()
go realTimeDataSarama.Run()
app.WaitForExitSign(func() {
if err := reportData2CKSarama.Stop(); err != nil {
logs.Logger.Sugar().Infof("reportData2CKSarama 停止失败", err)
}
if err := realTimeDataSarama.Stop(); err != nil {
logs.Logger.Sugar().Infof("realTimeDataSarama 停止失败", err)
}
}, func() {
if err := reportData2CK.FlushAll(); err != nil {
logs.Logger.Sugar().Infof("清理 reportData2CK FlushAll 失败", err)
} else {
logs.Logger.Sugar().Infof("清理reportData2CK完毕")
}
}, func() {
if err := realTimeWarehousing.FlushAll(); err != nil {
logs.Logger.Sugar().Infof("清理 realTimeWarehousing 失败", err)
} else {
logs.Logger.Sugar().Infof("清理realTimeWarehousing 完毕")
}
}, func() {
if err := reportAcceptStatus.FlushAll(); err != nil {
logs.Logger.Sugar().Infof("清理 reportAcceptStatus 失败", err)
} else {
logs.Logger.Sugar().Infof("清理reportAcceptStatus 完毕")
}
})
}