From 844c0d282904f38bc235facb85413bd9306fd3fe Mon Sep 17 00:00:00 2001 From: "1340691923@qq.com" <1340691923@qq.com> Date: Fri, 4 Mar 2022 13:02:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/sinker/main.go | 9 +--- controller/report_controller.go | 9 +--- .../service/consumer_data/reportdata2ck.go | 1 - platform-basic-libs/sinker/parse/parser.go | 48 ++++++++++++++----- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index ae8735c..8e916ff 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -111,10 +111,7 @@ func main() { go action.MysqlConsumer() go sinker.ClearDimsCacheByTime(time.Minute * 30) var json = jsoniter.ConfigCompatibleWithStandardLibrary - pp, err := parser.NewParserPool() - if err != nil { - panic(err) - } + err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { //ETL var kafkaData model.KafkaData @@ -239,10 +236,8 @@ func main() { kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) - jsonParser := pp.Get() - defer pp.Put(jsonParser) - metric, err := jsonParser.Parse(kafkaData.ReqData) + metric, err := parser.ParseKafkaData(kafkaData.ReqData) //解析开发者上报的json数据 if err != nil { diff --git a/controller/report_controller.go b/controller/report_controller.go index 9ef9d38..721eed2 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -25,12 +25,6 @@ type ReportController struct { BaseController } -var pp *parser.Pool - -func init(){ - pp, _ = parser.NewParserPool() -} - //上报接口 func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { @@ -98,8 +92,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { kafkaData := duck.GetkafkaData() if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { - - metric, debugErr := parser.ParseKafkaData(pp,body) + metric, debugErr := parser.ParseKafkaData(body) if debugErr != nil { logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index c9e7824..054d346 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -47,7 +47,6 @@ func (this *ReportData2CK) Flush() (err error) { rowsMap := map[string][][]interface{}{} - for bufferIndex := range this.buffer { for tableName := range this.buffer[bufferIndex] { rowArr := []interface{}{} diff --git a/platform-basic-libs/sinker/parse/parser.go b/platform-basic-libs/sinker/parse/parser.go index 3dfec6c..e6e2749 100644 --- a/platform-basic-libs/sinker/parse/parser.go +++ b/platform-basic-libs/sinker/parse/parser.go @@ -18,6 +18,7 @@ var ( ErrParseDateTime = errors.Errorf("value doesn't contain DateTime") ) + // Parse is the Parser interface type Parser interface { Parse(bs []byte) (metric *FastjsonMetric, err error) @@ -25,24 +26,31 @@ type Parser interface { // Pool may be used for pooling Parsers for similarly typed JSONs. type Pool struct { + name string timeZone *time.Location + knownLayouts sync.Map pool sync.Pool } // NewParserPool creates a parser pool -func NewParserPool() (pp *Pool, err error) { - var tz = time.Local +func NewParserPool(name string) (pp *Pool, err error) { pp = &Pool{ - timeZone: tz, + name: name, + timeZone: time.Local, } return } -func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error) { - jsonParser := pool.Get() - defer pool.Put(jsonParser) +func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { + pp, err := NewParserPool("fastjson") + if err != nil { + return + } + + jsonParser := pp.Get() + defer pp.Put(jsonParser) metric, err = jsonParser.Parse(data) return } @@ -53,7 +61,12 @@ func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error) func (pp *Pool) Get() Parser { v := pp.pool.Get() if v == nil { - return &FastjsonParser{pp: pp} + switch pp.name { + case "fastjson": + return &FastjsonParser{pp: pp} + default: + return &FastjsonParser{pp: pp} + } } return v.(Parser) } @@ -67,15 +80,28 @@ func (pp *Pool) Put(p Parser) { } func (pp *Pool) ParseDateTime(key string, val string) (t time.Time, err error) { - + var layout string + var lay interface{} + var ok bool var t2 time.Time if val == "" { err = ErrParseDateTime return } - - - if t2, err = time.ParseInLocation("2006-01-02 15:04:05", val, pp.timeZone); err != nil { + if lay, ok = pp.knownLayouts.Load(key); !ok { + t2, layout = parseInLocation(val, pp.timeZone) + if layout == "" { + err = ErrParseDateTime + return + } + t = t2 + return + } + if layout, ok = lay.(string); !ok { + err = ErrParseDateTime + return + } + if t2, err = time.ParseInLocation(layout, val, pp.timeZone); err != nil { err = ErrParseDateTime return }