diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index 8035341..4570105 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -115,6 +115,7 @@ func main() { var json = jsoniter.ConfigCompatibleWithStandardLibrary err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { + //ETL var kafkaData model.KafkaData err = json.Unmarshal(msg.Value, &kafkaData) diff --git a/controller/report_controller.go b/controller/report_controller.go index 1249d59..2813bf3 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -11,43 +11,28 @@ import ( "github.com/1340691923/xwl_bi/platform-basic-libs/sinker" parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse" "github.com/1340691923/xwl_bi/platform-basic-libs/util" - jsoniter "github.com/json-iterator/go" "github.com/tidwall/gjson" "github.com/valyala/fasthttp" "go.uber.org/zap" "math" "strings" - "sync" "time" ) + + type ReportController struct { BaseController } var parserPool *parser.Pool -var reportTypeDataPool *sync.Pool - -var Marshaler func(v interface{}) ([]byte, error) - -func init() { +func init(){ var err error - parserPool, err = parser.NewParserPool("fastjson") - if err != nil { + parserPool,err = parser.NewParserPool("fastjson") + if err!=nil{ panic(err) } - reportTypeDataPool = new(sync.Pool) - var json = jsoniter.ConfigCompatibleWithStandardLibrary - Marshaler = json.Marshal -} - -func GetReportTypeDataPool()*report.ReportTypeData{ - v := reportTypeDataPool.Get() - if reportTypeDataPool.Get() != nil{ - return new(report.ReportTypeData) - } - return v.(*report.ReportTypeData) } //上报接口 @@ -58,42 +43,26 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } var ( - typ = ctx.UserValue("typ").(string) - appkey = ctx.UserValue("appkey").(string) - err error + typ = ctx.UserValue("typ").(string) + appid = ctx.UserValue("appid").(string) + appkey = ctx.UserValue("appkey").(string) + debug = ctx.UserValue("debug").(string) + eventName = ctx.UserValue("eventName").(string) + body = ctx.Request.Body() ) - - reportTypeData := GetReportTypeDataPool() - - reportTypeData.Appid = ctx.UserValue("appid").(string) - reportTypeData.Debug = ctx.UserValue("debug").(string) - reportTypeData.EventName = ctx.UserValue("eventName").(string) - reportTypeData.Body = ctx.Request.Body() - - defer func() { - reportTypeData.Appid = "" - reportTypeData.TableId = "" - reportTypeData.TimeNow = "" - reportTypeData.Debug = "" - reportTypeData.EventName = "" - reportTypeData.Ip = "" - reportTypeData.Body = nil - reportTypeDataPool.Put(reportTypeData) - }() - - if strings.TrimSpace(reportTypeData.EventName) == "" { + if strings.TrimSpace(eventName) == ""{ this.FastError(ctx, errors.New("事件名 不能为空")) return } - if strings.TrimSpace(reportTypeData.Appid) == "" { + if strings.TrimSpace(appid) == ""{ this.FastError(ctx, errors.New("appid 不能为空")) return } reportService := report.ReportService{} - reportTypeData.TableId, err = reportService.GetTableid(reportTypeData.Appid, appkey) + tableId, err := reportService.GetTableid(appid, appkey) if err != nil { this.FastError(ctx, err) return @@ -106,34 +75,35 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { return } + defer duck.Put() - gjsonArr := gjson.GetManyBytes(reportTypeData.Body, "xwl_distinct_id", "xwl_ip", "xwl_part_date") + gjsonArr := gjson.GetManyBytes(body, "xwl_distinct_id", "xwl_ip", "xwl_part_date") xwlDistinctId := gjsonArr[0].String() - reportTypeData.Ip = gjsonArr[1].String() - reportTypeData.TimeNow = gjsonArr[2].String() + xwlIp := gjsonArr[1].String() + xwlPartDate := gjsonArr[2].String() if xwlDistinctId == "" { this.FastError(ctx, errors.New("xwl_distinct_id 不能为空")) return } - if reportTypeData.Ip == "" { - reportTypeData.Ip = util.CtxClientIP(ctx) + if xwlIp == "" { + xwlIp = util.CtxClientIP(ctx) } - if reportTypeData.TimeNow == "" { - reportTypeData.TimeNow = time.Now().Format(util.TimeFormat) + if xwlPartDate == "" { + xwlPartDate = time.Now().Format(util.TimeFormat) } - duck.NewReportType(reportTypeData) - kafkaData := duck.GetkafkaData() + duck.NewReportType(appid, tableId, debug, xwlPartDate, eventName, xwlIp, ctx.PostBody()) - if reportService.IsDebugUser(reportTypeData.Debug, xwlDistinctId, reportTypeData.TableId) { + if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { + kafkaData := duck.GetkafkaData() pool := parserPool.Get() defer parserPool.Put(pool) - metric, debugErr := pool.Parse(reportTypeData.Body) + metric, debugErr := pool.Parse(body) if debugErr != nil { logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) @@ -141,7 +111,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { return } - dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx, true) + dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx,true) if err != nil { logs.Logger.Error("sinker.GetDims", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) @@ -173,7 +143,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } } - xwlUpdateTime := gjson.GetBytes(reportTypeData.Body, "xwl_update_time").String() + xwlUpdateTime := gjson.GetBytes(body, "xwl_update_time").String() clinetT := util.Str2Time(xwlUpdateTime, util.TimeFormat) serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat) if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { @@ -185,7 +155,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { m["data_judge"] = "数据检验通过" } - err = reportService.InflowOfDebugData(m, reportTypeData.EventName) + err = reportService.InflowOfDebugData(m, eventName) if err != nil { logs.Logger.Error("reportService.InflowOfDebugData", zap.Error(err)) @@ -198,7 +168,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { this.FastError(ctx, my_error.NewError(m["error_reason"].(string), 10006)) return } - if reportTypeData.Debug == report.DebugNotToDB { + if debug == report.DebugNotToDB { this.Output(ctx, map[string]interface{}{ "code": 0, "msg": "上报成功(数据不入库)", @@ -207,7 +177,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } } - err = duck.InflowOfKakfa(Marshaler) + err = duck.InflowOfKakfa() if err != nil { this.FastError(ctx, err) return diff --git a/platform-basic-libs/service/report/report_interface.go b/platform-basic-libs/service/report/report_interface.go index b0071e9..5afe77b 100644 --- a/platform-basic-libs/service/report/report_interface.go +++ b/platform-basic-libs/service/report/report_interface.go @@ -7,13 +7,14 @@ import ( "sync" "github.com/Shopify/sarama" + jsoniter "github.com/json-iterator/go" "time" ) type ReportInterface interface { - NewReportType(data *ReportTypeData) + NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) GetkafkaData() model.KafkaData - InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) + InflowOfKakfa() (err error) Put() } @@ -33,23 +34,13 @@ var eventPool = sync.Pool{ }, } -type ReportTypeData struct { - Appid string - TableId string - Debug string - TimeNow string - EventName string - Ip string - Body []byte -} - -func (this *UserReport) NewReportType(data *ReportTypeData) { - this.kafkaData.APPID = data.Appid - this.kafkaData.TableId = data.TableId - this.kafkaData.Debug = data.Debug - this.kafkaData.ReqData = data.Body - this.kafkaData.Ip = data.Ip - this.kafkaData.ReportTime = data.TimeNow +func (this *UserReport) NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) { + this.kafkaData.APPID = appid + this.kafkaData.TableId = tableId + this.kafkaData.Debug = debug + this.kafkaData.ReqData = body + this.kafkaData.Ip = ip + this.kafkaData.ReportTime = timeNow this.kafkaData.ReportType = model.UserReportType this.kafkaData.EventName = "用户属性" } @@ -58,11 +49,12 @@ func (this *UserReport) GetkafkaData() model.KafkaData { return this.kafkaData } -func (this *UserReport) InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) { +func (this *UserReport) InflowOfKakfa() (err error) { + var json = jsoniter.ConfigCompatibleWithStandardLibrary msg := &sarama.ProducerMessage{} msg.Topic = model.GlobConfig.Comm.Kafka.ReportTopicName - sendData, _ := marshaler(this.kafkaData) + sendData, _ := json.Marshal(this.kafkaData) msg.Value = sarama.ByteEncoder(sendData) msg.Timestamp = time.Now() @@ -77,23 +69,22 @@ type EventReport struct { kafkaData model.KafkaData } -func (this *EventReport) NewReportType(data *ReportTypeData) { - this.kafkaData.APPID = data.Appid - this.kafkaData.TableId = data.TableId - this.kafkaData.Debug = data.Debug - this.kafkaData.ReqData = data.Body - this.kafkaData.ReportTime = data.TimeNow +func (this *EventReport) NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) { + this.kafkaData.APPID = appid + this.kafkaData.TableId = tableId + this.kafkaData.Debug = debug + this.kafkaData.ReqData = body + this.kafkaData.ReportTime = timeNow this.kafkaData.ReportType = model.EventReportType - this.kafkaData.EventName = data.EventName - this.kafkaData.Ip = data.Ip + this.kafkaData.EventName = eventName + this.kafkaData.Ip = ip } -func (this *EventReport) InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) { - +func (this *EventReport) InflowOfKakfa() (err error) { + var json = jsoniter.ConfigCompatibleWithStandardLibrary msg := &sarama.ProducerMessage{} msg.Topic = model.GlobConfig.Comm.Kafka.ReportTopicName - sendData, _ := marshaler(this.kafkaData) - + sendData, _ := json.Marshal(this.kafkaData) msg.Value = sarama.ByteEncoder(sendData) msg.Timestamp = time.Now()