From 1d3687dee2d89448387f9d54c2aad3d0314bdbde Mon Sep 17 00:00:00 2001 From: "1340691923@qq.com" <1340691923@qq.com> Date: Tue, 8 Mar 2022 15:47:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8sync.pool=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller/report_controller.go | 92 ++++++++++++------- .../service/report/report_interface.go | 56 ++++++----- .../service/report/report_service.go | 13 ++- platform-basic-libs/sinker/parse/parser.go | 13 --- platform-basic-libs/util/array.go | 3 +- 5 files changed, 100 insertions(+), 77 deletions(-) diff --git a/controller/report_controller.go b/controller/report_controller.go index cf999ea..1249d59 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -11,28 +11,43 @@ import ( "github.com/1340691923/xwl_bi/platform-basic-libs/sinker" parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse" "github.com/1340691923/xwl_bi/platform-basic-libs/util" + jsoniter "github.com/json-iterator/go" "github.com/tidwall/gjson" "github.com/valyala/fasthttp" "go.uber.org/zap" "math" "strings" + "sync" "time" ) - - type ReportController struct { BaseController } var parserPool *parser.Pool -func init(){ +var reportTypeDataPool *sync.Pool + +var Marshaler func(v interface{}) ([]byte, error) + +func init() { var err error - parserPool,err = parser.NewParserPool("fastjson") - if err!=nil{ + parserPool, err = parser.NewParserPool("fastjson") + if err != nil { panic(err) } + reportTypeDataPool = new(sync.Pool) + var json = jsoniter.ConfigCompatibleWithStandardLibrary + Marshaler = json.Marshal +} + +func GetReportTypeDataPool()*report.ReportTypeData{ + v := reportTypeDataPool.Get() + if reportTypeDataPool.Get() != nil{ + return new(report.ReportTypeData) + } + return v.(*report.ReportTypeData) } //上报接口 @@ -43,26 +58,42 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } var ( - typ = ctx.UserValue("typ").(string) - appid = ctx.UserValue("appid").(string) - appkey = ctx.UserValue("appkey").(string) - debug = ctx.UserValue("debug").(string) - eventName = ctx.UserValue("eventName").(string) - body = ctx.Request.Body() + typ = ctx.UserValue("typ").(string) + appkey = ctx.UserValue("appkey").(string) + err error ) - if strings.TrimSpace(eventName) == ""{ + + reportTypeData := GetReportTypeDataPool() + + reportTypeData.Appid = ctx.UserValue("appid").(string) + reportTypeData.Debug = ctx.UserValue("debug").(string) + reportTypeData.EventName = ctx.UserValue("eventName").(string) + reportTypeData.Body = ctx.Request.Body() + + defer func() { + reportTypeData.Appid = "" + reportTypeData.TableId = "" + reportTypeData.TimeNow = "" + reportTypeData.Debug = "" + reportTypeData.EventName = "" + reportTypeData.Ip = "" + reportTypeData.Body = nil + reportTypeDataPool.Put(reportTypeData) + }() + + if strings.TrimSpace(reportTypeData.EventName) == "" { this.FastError(ctx, errors.New("事件名 不能为空")) return } - if strings.TrimSpace(appid) == ""{ + if strings.TrimSpace(reportTypeData.Appid) == "" { this.FastError(ctx, errors.New("appid 不能为空")) return } reportService := report.ReportService{} - tableId, err := reportService.GetTableid(appid, appkey) + reportTypeData.TableId, err = reportService.GetTableid(reportTypeData.Appid, appkey) if err != nil { this.FastError(ctx, err) return @@ -75,36 +106,34 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { return } - defer duck.Put() - gjsonArr := gjson.GetManyBytes(body, "xwl_distinct_id", "xwl_ip", "xwl_part_date") + gjsonArr := gjson.GetManyBytes(reportTypeData.Body, "xwl_distinct_id", "xwl_ip", "xwl_part_date") xwlDistinctId := gjsonArr[0].String() - xwlIp := gjsonArr[1].String() - xwlPartDate := gjsonArr[2].String() + reportTypeData.Ip = gjsonArr[1].String() + reportTypeData.TimeNow = gjsonArr[2].String() if xwlDistinctId == "" { this.FastError(ctx, errors.New("xwl_distinct_id 不能为空")) return } - if xwlIp == "" { - xwlIp = util.CtxClientIP(ctx) + if reportTypeData.Ip == "" { + reportTypeData.Ip = util.CtxClientIP(ctx) } - if xwlPartDate == "" { - xwlPartDate = time.Now().Format(util.TimeFormat) + if reportTypeData.TimeNow == "" { + reportTypeData.TimeNow = time.Now().Format(util.TimeFormat) } - - duck.NewReportType(appid, tableId, debug, xwlPartDate, eventName, xwlIp, ctx.PostBody()) + duck.NewReportType(reportTypeData) kafkaData := duck.GetkafkaData() - if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { + if reportService.IsDebugUser(reportTypeData.Debug, xwlDistinctId, reportTypeData.TableId) { pool := parserPool.Get() defer parserPool.Put(pool) - metric, debugErr := pool.Parse(body) + metric, debugErr := pool.Parse(reportTypeData.Body) if debugErr != nil { logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) @@ -112,7 +141,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { return } - dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx,true) + dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx, true) if err != nil { logs.Logger.Error("sinker.GetDims", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) @@ -144,7 +173,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } } - xwlUpdateTime := gjson.GetBytes(body, "xwl_update_time").String() + xwlUpdateTime := gjson.GetBytes(reportTypeData.Body, "xwl_update_time").String() clinetT := util.Str2Time(xwlUpdateTime, util.TimeFormat) serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat) if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { @@ -156,7 +185,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { m["data_judge"] = "数据检验通过" } - err = reportService.InflowOfDebugData(m, eventName) + err = reportService.InflowOfDebugData(m, reportTypeData.EventName) if err != nil { logs.Logger.Error("reportService.InflowOfDebugData", zap.Error(err)) @@ -169,7 +198,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { this.FastError(ctx, my_error.NewError(m["error_reason"].(string), 10006)) return } - if debug == report.DebugNotToDB { + if reportTypeData.Debug == report.DebugNotToDB { this.Output(ctx, map[string]interface{}{ "code": 0, "msg": "上报成功(数据不入库)", @@ -178,8 +207,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { } } - - err = duck.InflowOfKakfa() + err = duck.InflowOfKakfa(Marshaler) if err != nil { this.FastError(ctx, err) return diff --git a/platform-basic-libs/service/report/report_interface.go b/platform-basic-libs/service/report/report_interface.go index 4345ff7..b0071e9 100644 --- a/platform-basic-libs/service/report/report_interface.go +++ b/platform-basic-libs/service/report/report_interface.go @@ -7,14 +7,13 @@ import ( "sync" "github.com/Shopify/sarama" - jsoniter "github.com/json-iterator/go" "time" ) type ReportInterface interface { - NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) + NewReportType(data *ReportTypeData) GetkafkaData() model.KafkaData - InflowOfKakfa() (err error) + InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) Put() } @@ -34,13 +33,23 @@ var eventPool = sync.Pool{ }, } -func (this *UserReport) NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) { - this.kafkaData.APPID = appid - this.kafkaData.TableId = tableId - this.kafkaData.Debug = debug - this.kafkaData.ReqData = body - this.kafkaData.Ip = ip - this.kafkaData.ReportTime = timeNow +type ReportTypeData struct { + Appid string + TableId string + Debug string + TimeNow string + EventName string + Ip string + Body []byte +} + +func (this *UserReport) NewReportType(data *ReportTypeData) { + this.kafkaData.APPID = data.Appid + this.kafkaData.TableId = data.TableId + this.kafkaData.Debug = data.Debug + this.kafkaData.ReqData = data.Body + this.kafkaData.Ip = data.Ip + this.kafkaData.ReportTime = data.TimeNow this.kafkaData.ReportType = model.UserReportType this.kafkaData.EventName = "用户属性" } @@ -49,12 +58,11 @@ func (this *UserReport) GetkafkaData() model.KafkaData { return this.kafkaData } -func (this *UserReport) InflowOfKakfa() (err error) { +func (this *UserReport) InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary msg := &sarama.ProducerMessage{} msg.Topic = model.GlobConfig.Comm.Kafka.ReportTopicName - sendData, _ := json.Marshal(this.kafkaData) + sendData, _ := marshaler(this.kafkaData) msg.Value = sarama.ByteEncoder(sendData) msg.Timestamp = time.Now() @@ -69,22 +77,22 @@ type EventReport struct { kafkaData model.KafkaData } -func (this *EventReport) NewReportType(appid, tableId, debug, timeNow, eventName, ip string, body []byte) { - this.kafkaData.APPID = appid - this.kafkaData.TableId = tableId - this.kafkaData.Debug = debug - this.kafkaData.ReqData = body - this.kafkaData.ReportTime = timeNow +func (this *EventReport) NewReportType(data *ReportTypeData) { + this.kafkaData.APPID = data.Appid + this.kafkaData.TableId = data.TableId + this.kafkaData.Debug = data.Debug + this.kafkaData.ReqData = data.Body + this.kafkaData.ReportTime = data.TimeNow this.kafkaData.ReportType = model.EventReportType - this.kafkaData.EventName = eventName - this.kafkaData.Ip = ip + this.kafkaData.EventName = data.EventName + this.kafkaData.Ip = data.Ip } -func (this *EventReport) InflowOfKakfa() (err error) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary +func (this *EventReport) InflowOfKakfa(marshaler func(v interface{}) ([]byte, error)) (err error) { + msg := &sarama.ProducerMessage{} msg.Topic = model.GlobConfig.Comm.Kafka.ReportTopicName - sendData, _ := json.Marshal(this.kafkaData) + sendData, _ := marshaler(this.kafkaData) msg.Value = sarama.ByteEncoder(sendData) msg.Timestamp = time.Now() diff --git a/platform-basic-libs/service/report/report_service.go b/platform-basic-libs/service/report/report_service.go index 9b8d6d8..bc3a4f9 100644 --- a/platform-basic-libs/service/report/report_service.go +++ b/platform-basic-libs/service/report/report_service.go @@ -17,7 +17,7 @@ import ( ) type ReportService struct { - buff bytes.Buffer + } var tableIdMap sync.Map @@ -33,12 +33,11 @@ func RefreshTableIdMap(t time.Duration) { } func (this *ReportService) GetTableid(appid, appkey string) (table string, err error) { - - this.buff.Reset() - this.buff.WriteString(appid) - this.buff.WriteString("_xwl_") - this.buff.WriteString(appkey) - key := this.buff.String() + buff := new(bytes.Buffer) + buff.WriteString(appid) + buff.WriteString("_xwl_") + buff.WriteString(appkey) + key := buff.String() if val, found := tableIdMap.Load(key); found { table = val.(string) diff --git a/platform-basic-libs/sinker/parse/parser.go b/platform-basic-libs/sinker/parse/parser.go index 715393a..168fdff 100644 --- a/platform-basic-libs/sinker/parse/parser.go +++ b/platform-basic-libs/sinker/parse/parser.go @@ -28,7 +28,6 @@ type Parser interface { type Pool struct { name string timeZone *time.Location - knownLayouts sync.Map pool sync.Pool } @@ -43,18 +42,6 @@ func NewParserPool(name string) (pp *Pool, err error) { return } -/*func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { - pp, err := NewParserPool("fastjson") - if err != nil { - return - } - - jsonParser := pp.Get() - defer pp.Put(jsonParser) - metric, err = jsonParser.Parse(data) - return -}*/ - // Get returns a Parser from pp. // // The Parser must be Put to pp after use. diff --git a/platform-basic-libs/util/array.go b/platform-basic-libs/util/array.go index fa4203c..36676b6 100644 --- a/platform-basic-libs/util/array.go +++ b/platform-basic-libs/util/array.go @@ -22,7 +22,8 @@ func InArr(array []int, column int) bool { func InstrArr(array []string, column string) bool { i := 0 - for i < len(array) { + l := len(array) + for i < l { if array[i] == column { return true }