优化sinker

This commit is contained in:
1340691923@qq.com 2022-03-07 17:15:08 +08:00
parent f51a469ca1
commit a6342403dc
4 changed files with 56 additions and 56 deletions

View File

@ -146,6 +146,11 @@ func main() {
panic(err) panic(err)
} }
parserPool,err := parser.NewParserPool("fastjson")
if err!=nil{
panic(err)
}
err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) { err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) {
var kafkaData model.KafkaData var kafkaData model.KafkaData
@ -236,8 +241,9 @@ func main() {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
pool := parserPool.Get()
metric, err := parser.ParseKafkaData(kafkaData.ReqData) defer parserPool.Put(pool)
metric, err := pool.Parse(kafkaData.ReqData)
//解析开发者上报的json数据 //解析开发者上报的json数据
if err != nil { if err != nil {

View File

@ -25,6 +25,16 @@ type ReportController struct {
BaseController BaseController
} }
var parserPool *parser.Pool
func init(){
var err error
parserPool,err = parser.NewParserPool("fastjson")
if err!=nil{
panic(err)
}
}
//上报接口 //上报接口
func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
@ -92,7 +102,10 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
kafkaData := duck.GetkafkaData() kafkaData := duck.GetkafkaData()
if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { if reportService.IsDebugUser(debug, xwlDistinctId, tableId) {
metric, debugErr := parser.ParseKafkaData(body) pool := parserPool.Get()
defer parserPool.Put(pool)
metric, debugErr := pool.Parse(body)
if debugErr != nil { if debugErr != nil {
logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err))
this.FastError(ctx, errors.New("服务异常")) this.FastError(ctx, errors.New("服务异常"))

View File

@ -12,7 +12,6 @@ import (
) )
type FastjsonParser struct { type FastjsonParser struct {
pp *Pool
fjp fastjson.Parser fjp fastjson.Parser
} }
@ -22,12 +21,11 @@ func (p *FastjsonParser) Parse(bs []byte) (metric *FastjsonMetric, err error) {
err = errors.Wrapf(err, "") err = errors.Wrapf(err, "")
return return
} }
metric = &FastjsonMetric{pp: p.pp, value: value} metric = &FastjsonMetric{ value: value}
return return
} }
type FastjsonMetric struct { type FastjsonMetric struct {
pp *Pool
value *fastjson.Value value *fastjson.Value
} }
@ -106,7 +104,7 @@ func (c *FastjsonMetric) GetDateTime(key string, nullable bool) (val interface{}
val = getDefaultDateTime(nullable) val = getDefaultDateTime(nullable)
return return
} }
if val, err = c.pp.ParseDateTime(key, util.Bytes2str(b)); err != nil { if val, err = c.ParseDateTime(util.Bytes2str(b)); err != nil {
val = getDefaultDateTime(nullable) val = getDefaultDateTime(nullable)
} }
default: default:
@ -115,6 +113,22 @@ func (c *FastjsonMetric) GetDateTime(key string, nullable bool) (val interface{}
return return
} }
func (c *FastjsonMetric) ParseDateTime(val string) (t time.Time, err error) {
var t2 time.Time
if val == "" {
err = ErrParseDateTime
return
}
if t2, err = time.ParseInLocation(util.TimeFormat, val, time.Local); err != nil {
err = ErrParseDateTime
return
}
t = t2.UTC()
return
}
func (c *FastjsonMetric) GetElasticDateTime(key string, nullable bool) (val interface{}) { func (c *FastjsonMetric) GetElasticDateTime(key string, nullable bool) (val interface{}) {
t := c.GetDateTime(key, nullable) t := c.GetDateTime(key, nullable)
if t != nil { if t != nil {
@ -175,7 +189,7 @@ func (c *FastjsonMetric) GetArray(key string, typ int) (val interface{}) {
t = Epoch t = Epoch
} else { } else {
var err error var err error
if t, err = c.pp.ParseDateTime(key, util.Bytes2str(b)); err != nil { if t, err = c.ParseDateTime(util.Bytes2str(b)); err != nil {
t = Epoch t = Epoch
} }
} }
@ -292,7 +306,7 @@ func FjDetectType(v *fastjson.Value) (typ int) {
case fastjson.TypeString: case fastjson.TypeString:
typ = String typ = String
if val, err := v.StringBytes(); err == nil { if val, err := v.StringBytes(); err == nil {
if _, layout := parseInLocation(util.Bytes2str(val), time.Local); layout != "" { if _, err := parseInLocation(util.Bytes2str(val), time.Local); err == nil {
typ = DateTime typ = DateTime
} }
} }

View File

@ -3,6 +3,7 @@ package parser
import ( import (
"fmt" "fmt"
"github.com/1340691923/xwl_bi/engine/logs" "github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/platform-basic-libs/util"
"math" "math"
"sync" "sync"
"time" "time"
@ -18,7 +19,6 @@ var (
ErrParseDateTime = errors.Errorf("value doesn't contain DateTime") ErrParseDateTime = errors.Errorf("value doesn't contain DateTime")
) )
// Parse is the Parser interface // Parse is the Parser interface
type Parser interface { type Parser interface {
Parse(bs []byte) (metric *FastjsonMetric, err error) Parse(bs []byte) (metric *FastjsonMetric, err error)
@ -36,14 +36,14 @@ type Pool struct {
func NewParserPool(name string) (pp *Pool, err error) { func NewParserPool(name string) (pp *Pool, err error) {
pp = &Pool{ pp = &Pool{
name: name, name: name,
timeZone: time.Local, timeZone: time.Local,
} }
return return
} }
func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { /*func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) {
pp, err := NewParserPool("fastjson") pp, err := NewParserPool("fastjson")
if err != nil { if err != nil {
return return
@ -53,7 +53,7 @@ func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) {
defer pp.Put(jsonParser) defer pp.Put(jsonParser)
metric, err = jsonParser.Parse(data) metric, err = jsonParser.Parse(data)
return return
} }*/
// Get returns a Parser from pp. // Get returns a Parser from pp.
// //
@ -63,9 +63,9 @@ func (pp *Pool) Get() Parser {
if v == nil { if v == nil {
switch pp.name { switch pp.name {
case "fastjson": case "fastjson":
return &FastjsonParser{pp: pp} return &FastjsonParser{}
default: default:
return &FastjsonParser{pp: pp} return &FastjsonParser{}
} }
} }
return v.(Parser) return v.(Parser)
@ -79,36 +79,6 @@ func (pp *Pool) Put(p Parser) {
pp.pool.Put(p) pp.pool.Put(p)
} }
func (pp *Pool) ParseDateTime(key string, val string) (t time.Time, err error) {
var layout string
var lay interface{}
var ok bool
var t2 time.Time
if val == "" {
err = ErrParseDateTime
return
}
if lay, ok = pp.knownLayouts.Load(key); !ok {
t2, layout = parseInLocation(val, pp.timeZone)
if layout == "" {
err = ErrParseDateTime
return
}
t = t2
return
}
if layout, ok = lay.(string); !ok {
err = ErrParseDateTime
return
}
if t2, err = time.ParseInLocation(layout, val, pp.timeZone); err != nil {
err = ErrParseDateTime
return
}
t = t2.UTC()
return
}
func makeArray(typ int) (val interface{}) { func makeArray(typ int) (val interface{}) {
switch typ { switch typ {
case Int: case Int:
@ -125,17 +95,14 @@ func makeArray(typ int) (val interface{}) {
return return
} }
func parseInLocation(val string, loc *time.Location) (t time.Time, layout string) { func parseInLocation(val string, loc *time.Location) (t time.Time, err error) {
var err error
var lay string if t, err = time.ParseInLocation(util.TimeFormat, val, loc); err == nil {
for _, lay = range Layouts { t = t.UTC()
if t, err = time.ParseInLocation(lay, val, loc); err == nil { return
t = t.UTC()
layout = lay
return
}
} }
return
return t,err
} }
func UnixFloat(sec float64) (t time.Time) { func UnixFloat(sec float64) (t time.Time) {