diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index 8e916ff..022e545 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -146,6 +146,11 @@ func main() { panic(err) } + parserPool,err := parser.NewParserPool("fastjson") + if err!=nil{ + panic(err) + } + err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) { var kafkaData model.KafkaData @@ -236,8 +241,9 @@ func main() { kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) - - metric, err := parser.ParseKafkaData(kafkaData.ReqData) + pool := parserPool.Get() + defer parserPool.Put(pool) + metric, err := pool.Parse(kafkaData.ReqData) //解析开发者上报的json数据 if err != nil { diff --git a/controller/report_controller.go b/controller/report_controller.go index 721eed2..cf999ea 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -25,6 +25,16 @@ type ReportController struct { BaseController } +var parserPool *parser.Pool + +func init(){ + var err error + parserPool,err = parser.NewParserPool("fastjson") + if err!=nil{ + panic(err) + } +} + //上报接口 func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { @@ -92,7 +102,10 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { kafkaData := duck.GetkafkaData() if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { - metric, debugErr := parser.ParseKafkaData(body) + pool := parserPool.Get() + defer parserPool.Put(pool) + metric, debugErr := pool.Parse(body) + if debugErr != nil { logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) diff --git a/platform-basic-libs/sinker/parse/fastjson.go b/platform-basic-libs/sinker/parse/fastjson.go index ea18c00..265f068 100644 --- a/platform-basic-libs/sinker/parse/fastjson.go +++ b/platform-basic-libs/sinker/parse/fastjson.go @@ -12,7 +12,6 @@ import ( ) type FastjsonParser struct { - pp *Pool fjp fastjson.Parser } @@ -22,12 +21,11 @@ func (p *FastjsonParser) Parse(bs []byte) (metric *FastjsonMetric, err error) { err = errors.Wrapf(err, "") return } - metric = &FastjsonMetric{pp: p.pp, value: value} + metric = &FastjsonMetric{ value: value} return } type FastjsonMetric struct { - pp *Pool value *fastjson.Value } @@ -106,7 +104,7 @@ func (c *FastjsonMetric) GetDateTime(key string, nullable bool) (val interface{} val = getDefaultDateTime(nullable) return } - if val, err = c.pp.ParseDateTime(key, util.Bytes2str(b)); err != nil { + if val, err = c.ParseDateTime(util.Bytes2str(b)); err != nil { val = getDefaultDateTime(nullable) } default: @@ -115,6 +113,22 @@ func (c *FastjsonMetric) GetDateTime(key string, nullable bool) (val interface{} return } + +func (c *FastjsonMetric) ParseDateTime(val string) (t time.Time, err error) { + + var t2 time.Time + if val == "" { + err = ErrParseDateTime + return + } + if t2, err = time.ParseInLocation(util.TimeFormat, val, time.Local); err != nil { + err = ErrParseDateTime + return + } + t = t2.UTC() + return +} + func (c *FastjsonMetric) GetElasticDateTime(key string, nullable bool) (val interface{}) { t := c.GetDateTime(key, nullable) if t != nil { @@ -175,7 +189,7 @@ func (c *FastjsonMetric) GetArray(key string, typ int) (val interface{}) { t = Epoch } else { var err error - if t, err = c.pp.ParseDateTime(key, util.Bytes2str(b)); err != nil { + if t, err = c.ParseDateTime(util.Bytes2str(b)); err != nil { t = Epoch } } @@ -292,7 +306,7 @@ func FjDetectType(v *fastjson.Value) (typ int) { case fastjson.TypeString: typ = String if val, err := v.StringBytes(); err == nil { - if _, layout := parseInLocation(util.Bytes2str(val), time.Local); layout != "" { + if _, err := parseInLocation(util.Bytes2str(val), time.Local); err == nil { typ = DateTime } } diff --git a/platform-basic-libs/sinker/parse/parser.go b/platform-basic-libs/sinker/parse/parser.go index e6e2749..715393a 100644 --- a/platform-basic-libs/sinker/parse/parser.go +++ b/platform-basic-libs/sinker/parse/parser.go @@ -3,6 +3,7 @@ package parser import ( "fmt" "github.com/1340691923/xwl_bi/engine/logs" + "github.com/1340691923/xwl_bi/platform-basic-libs/util" "math" "sync" "time" @@ -18,7 +19,6 @@ var ( ErrParseDateTime = errors.Errorf("value doesn't contain DateTime") ) - // Parse is the Parser interface type Parser interface { Parse(bs []byte) (metric *FastjsonMetric, err error) @@ -36,14 +36,14 @@ type Pool struct { func NewParserPool(name string) (pp *Pool, err error) { pp = &Pool{ - name: name, - timeZone: time.Local, + name: name, + timeZone: time.Local, } return } -func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { +/*func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { pp, err := NewParserPool("fastjson") if err != nil { return @@ -53,7 +53,7 @@ func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { defer pp.Put(jsonParser) metric, err = jsonParser.Parse(data) return -} +}*/ // Get returns a Parser from pp. // @@ -63,9 +63,9 @@ func (pp *Pool) Get() Parser { if v == nil { switch pp.name { case "fastjson": - return &FastjsonParser{pp: pp} + return &FastjsonParser{} default: - return &FastjsonParser{pp: pp} + return &FastjsonParser{} } } return v.(Parser) @@ -79,36 +79,6 @@ func (pp *Pool) Put(p Parser) { pp.pool.Put(p) } -func (pp *Pool) ParseDateTime(key string, val string) (t time.Time, err error) { - var layout string - var lay interface{} - var ok bool - var t2 time.Time - if val == "" { - err = ErrParseDateTime - return - } - if lay, ok = pp.knownLayouts.Load(key); !ok { - t2, layout = parseInLocation(val, pp.timeZone) - if layout == "" { - err = ErrParseDateTime - return - } - t = t2 - return - } - if layout, ok = lay.(string); !ok { - err = ErrParseDateTime - return - } - if t2, err = time.ParseInLocation(layout, val, pp.timeZone); err != nil { - err = ErrParseDateTime - return - } - t = t2.UTC() - return -} - func makeArray(typ int) (val interface{}) { switch typ { case Int: @@ -125,17 +95,14 @@ func makeArray(typ int) (val interface{}) { return } -func parseInLocation(val string, loc *time.Location) (t time.Time, layout string) { - var err error - var lay string - for _, lay = range Layouts { - if t, err = time.ParseInLocation(lay, val, loc); err == nil { - t = t.UTC() - layout = lay - return - } +func parseInLocation(val string, loc *time.Location) (t time.Time, err error) { + + if t, err = time.ParseInLocation(util.TimeFormat, val, loc); err == nil { + t = t.UTC() + return } - return + + return t,err } func UnixFloat(sec float64) (t time.Time) {