bi/controller/report_controller.go

189 lines
4.9 KiB
Go
Raw Normal View History

2022-01-26 16:40:50 +08:00
package controller
import (
"errors"
"fmt"
"github.com/1340691923/xwl_bi/engine/db"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/model"
"github.com/1340691923/xwl_bi/platform-basic-libs/my_error"
"github.com/1340691923/xwl_bi/platform-basic-libs/service/report"
"github.com/1340691923/xwl_bi/platform-basic-libs/sinker"
parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse"
"github.com/1340691923/xwl_bi/platform-basic-libs/util"
"github.com/tidwall/gjson"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"math"
"strings"
"time"
)
2022-03-08 17:56:07 +08:00
2022-01-26 16:40:50 +08:00
type ReportController struct {
BaseController
}
2022-03-07 17:15:08 +08:00
var parserPool *parser.Pool
2022-03-08 17:56:07 +08:00
func init(){
2022-03-07 17:15:08 +08:00
var err error
2022-03-08 17:56:07 +08:00
parserPool,err = parser.NewParserPool("fastjson")
if err!=nil{
2022-03-07 17:15:08 +08:00
panic(err)
}
}
2022-01-26 16:40:50 +08:00
//上报接口
func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
if strings.ToUpper(util.Bytes2str(ctx.Method())) == "OPTIONS" {
return
}
var (
2022-03-08 17:56:07 +08:00
typ = ctx.UserValue("typ").(string)
appid = ctx.UserValue("appid").(string)
appkey = ctx.UserValue("appkey").(string)
debug = ctx.UserValue("debug").(string)
eventName = ctx.UserValue("eventName").(string)
body = ctx.Request.Body()
2022-01-26 16:40:50 +08:00
)
2022-03-08 17:56:07 +08:00
if strings.TrimSpace(eventName) == ""{
2022-01-26 16:40:50 +08:00
this.FastError(ctx, errors.New("事件名 不能为空"))
return
}
2022-03-08 17:56:07 +08:00
if strings.TrimSpace(appid) == ""{
2022-01-26 16:40:50 +08:00
this.FastError(ctx, errors.New("appid 不能为空"))
return
}
reportService := report.ReportService{}
2022-03-08 17:56:07 +08:00
tableId, err := reportService.GetTableid(appid, appkey)
2022-01-26 16:40:50 +08:00
if err != nil {
this.FastError(ctx, err)
return
}
duck, err := report.GetReportDuck(typ)
if err != nil {
this.FastError(ctx, err)
return
}
2022-03-08 17:56:07 +08:00
2022-01-26 16:40:50 +08:00
defer duck.Put()
2022-03-08 17:56:07 +08:00
gjsonArr := gjson.GetManyBytes(body, "xwl_distinct_id", "xwl_ip", "xwl_part_date")
2022-01-26 16:40:50 +08:00
xwlDistinctId := gjsonArr[0].String()
2022-03-08 17:56:07 +08:00
xwlIp := gjsonArr[1].String()
xwlPartDate := gjsonArr[2].String()
2022-01-26 16:40:50 +08:00
if xwlDistinctId == "" {
this.FastError(ctx, errors.New("xwl_distinct_id 不能为空"))
return
}
2022-03-08 17:56:07 +08:00
if xwlIp == "" {
xwlIp = util.CtxClientIP(ctx)
2022-01-26 16:40:50 +08:00
}
2022-03-08 17:56:07 +08:00
if xwlPartDate == "" {
xwlPartDate = time.Now().Format(util.TimeFormat)
2022-01-26 16:40:50 +08:00
}
2022-03-08 17:56:07 +08:00
duck.NewReportType(appid, tableId, debug, xwlPartDate, eventName, xwlIp, ctx.PostBody())
2022-01-26 16:40:50 +08:00
2022-03-08 17:56:07 +08:00
if reportService.IsDebugUser(debug, xwlDistinctId, tableId) {
kafkaData := duck.GetkafkaData()
2022-03-07 17:15:08 +08:00
pool := parserPool.Get()
defer parserPool.Put(pool)
2022-03-08 17:56:07 +08:00
metric, debugErr := pool.Parse(body)
2022-03-07 17:15:08 +08:00
2022-01-26 16:40:50 +08:00
if debugErr != nil {
logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err))
this.FastError(ctx, errors.New("服务异常"))
return
}
2022-03-08 17:56:07 +08:00
dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx,true)
2022-01-26 16:40:50 +08:00
if err != nil {
logs.Logger.Error("sinker.GetDims", zap.Error(err))
this.FastError(ctx, errors.New("服务异常"))
return
}
obj := metric.GetParseObject()
m := map[string]interface{}{
"data_name": kafkaData.EventName,
"report_data": util.Bytes2str(ctx.PostBody()),
"report_time": kafkaData.ReportTime,
"appid": kafkaData.TableId,
"distinct_id": xwlDistinctId,
}
haveFailAttr := false
var eventType = kafkaData.GetReportTypeErr()
for _, column := range dims {
if obj.Get(column.Name) != nil {
reportType := parser.FjDetectType(obj.Get(column.Name))
if reportType != column.Type {
if !(reportType == parser.Int && column.Type == parser.Float) && !(reportType == parser.Float && column.Type == parser.Int) {
errorReason := fmt.Sprintf("%s的类型错误正确类型为%v上报类型为%v(%v)", column.Name, parser.TypeRemarkMap[column.Type], parser.TypeRemarkMap[reportType], obj.Get(column.Name).String())
haveFailAttr = true
m["error_reason"] = errorReason
m["data_judge"] = eventType
}
}
}
}
2022-03-08 17:56:07 +08:00
xwlUpdateTime := gjson.GetBytes(body, "xwl_update_time").String()
2022-01-26 16:40:50 +08:00
clinetT := util.Str2Time(xwlUpdateTime, util.TimeFormat)
serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat)
if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 {
m["error_reason"] = "客户端上报时间误差大于十分钟"
m["data_judge"] = eventType
}
if !haveFailAttr {
m["data_judge"] = "数据检验通过"
}
2022-03-08 17:56:07 +08:00
err = reportService.InflowOfDebugData(m, eventName)
2022-01-26 16:40:50 +08:00
if err != nil {
logs.Logger.Error("reportService.InflowOfDebugData", zap.Error(err))
this.FastError(ctx, errors.New("服务异常"))
return
}
if haveFailAttr {
logs.Logger.Error("reportService.InflowOfDebugData", zap.String("error_reason", m["error_reason"].(string)))
this.FastError(ctx, my_error.NewError(m["error_reason"].(string), 10006))
return
}
2022-03-08 17:56:07 +08:00
if debug == report.DebugNotToDB {
2022-01-26 16:40:50 +08:00
this.Output(ctx, map[string]interface{}{
"code": 0,
"msg": "上报成功(数据不入库)",
})
return
}
}
2022-03-08 17:56:07 +08:00
err = duck.InflowOfKakfa()
2022-01-26 16:40:50 +08:00
if err != nil {
this.FastError(ctx, err)
return
}
ctx.WriteString(`{"code":0,"msg":"上报成功"}`)
return
}