优化
This commit is contained in:
parent
079aa5d652
commit
844c0d2829
@ -111,10 +111,7 @@ func main() {
|
|||||||
go action.MysqlConsumer()
|
go action.MysqlConsumer()
|
||||||
go sinker.ClearDimsCacheByTime(time.Minute * 30)
|
go sinker.ClearDimsCacheByTime(time.Minute * 30)
|
||||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||||
pp, err := parser.NewParserPool()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) {
|
err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) {
|
||||||
//ETL
|
//ETL
|
||||||
var kafkaData model.KafkaData
|
var kafkaData model.KafkaData
|
||||||
@ -239,10 +236,8 @@ func main() {
|
|||||||
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
|
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
|
||||||
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
|
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
|
||||||
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
|
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
|
||||||
jsonParser := pp.Get()
|
|
||||||
defer pp.Put(jsonParser)
|
|
||||||
|
|
||||||
metric, err := jsonParser.Parse(kafkaData.ReqData)
|
metric, err := parser.ParseKafkaData(kafkaData.ReqData)
|
||||||
|
|
||||||
//解析开发者上报的json数据
|
//解析开发者上报的json数据
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -25,12 +25,6 @@ type ReportController struct {
|
|||||||
BaseController
|
BaseController
|
||||||
}
|
}
|
||||||
|
|
||||||
var pp *parser.Pool
|
|
||||||
|
|
||||||
func init(){
|
|
||||||
pp, _ = parser.NewParserPool()
|
|
||||||
}
|
|
||||||
|
|
||||||
//上报接口
|
//上报接口
|
||||||
func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
|
func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
@ -98,8 +92,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
|
|||||||
kafkaData := duck.GetkafkaData()
|
kafkaData := duck.GetkafkaData()
|
||||||
|
|
||||||
if reportService.IsDebugUser(debug, xwlDistinctId, tableId) {
|
if reportService.IsDebugUser(debug, xwlDistinctId, tableId) {
|
||||||
|
metric, debugErr := parser.ParseKafkaData(body)
|
||||||
metric, debugErr := parser.ParseKafkaData(pp,body)
|
|
||||||
if debugErr != nil {
|
if debugErr != nil {
|
||||||
logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err))
|
logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err))
|
||||||
this.FastError(ctx, errors.New("服务异常"))
|
this.FastError(ctx, errors.New("服务异常"))
|
||||||
|
@ -47,7 +47,6 @@ func (this *ReportData2CK) Flush() (err error) {
|
|||||||
|
|
||||||
rowsMap := map[string][][]interface{}{}
|
rowsMap := map[string][][]interface{}{}
|
||||||
|
|
||||||
|
|
||||||
for bufferIndex := range this.buffer {
|
for bufferIndex := range this.buffer {
|
||||||
for tableName := range this.buffer[bufferIndex] {
|
for tableName := range this.buffer[bufferIndex] {
|
||||||
rowArr := []interface{}{}
|
rowArr := []interface{}{}
|
||||||
|
@ -18,6 +18,7 @@ var (
|
|||||||
ErrParseDateTime = errors.Errorf("value doesn't contain DateTime")
|
ErrParseDateTime = errors.Errorf("value doesn't contain DateTime")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
// Parse is the Parser interface
|
// Parse is the Parser interface
|
||||||
type Parser interface {
|
type Parser interface {
|
||||||
Parse(bs []byte) (metric *FastjsonMetric, err error)
|
Parse(bs []byte) (metric *FastjsonMetric, err error)
|
||||||
@ -25,24 +26,31 @@ type Parser interface {
|
|||||||
|
|
||||||
// Pool may be used for pooling Parsers for similarly typed JSONs.
|
// Pool may be used for pooling Parsers for similarly typed JSONs.
|
||||||
type Pool struct {
|
type Pool struct {
|
||||||
|
name string
|
||||||
timeZone *time.Location
|
timeZone *time.Location
|
||||||
|
knownLayouts sync.Map
|
||||||
pool sync.Pool
|
pool sync.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewParserPool creates a parser pool
|
// NewParserPool creates a parser pool
|
||||||
func NewParserPool() (pp *Pool, err error) {
|
func NewParserPool(name string) (pp *Pool, err error) {
|
||||||
var tz = time.Local
|
|
||||||
|
|
||||||
pp = &Pool{
|
pp = &Pool{
|
||||||
timeZone: tz,
|
name: name,
|
||||||
|
timeZone: time.Local,
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error) {
|
func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) {
|
||||||
jsonParser := pool.Get()
|
pp, err := NewParserPool("fastjson")
|
||||||
defer pool.Put(jsonParser)
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonParser := pp.Get()
|
||||||
|
defer pp.Put(jsonParser)
|
||||||
metric, err = jsonParser.Parse(data)
|
metric, err = jsonParser.Parse(data)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -53,7 +61,12 @@ func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error)
|
|||||||
func (pp *Pool) Get() Parser {
|
func (pp *Pool) Get() Parser {
|
||||||
v := pp.pool.Get()
|
v := pp.pool.Get()
|
||||||
if v == nil {
|
if v == nil {
|
||||||
|
switch pp.name {
|
||||||
|
case "fastjson":
|
||||||
return &FastjsonParser{pp: pp}
|
return &FastjsonParser{pp: pp}
|
||||||
|
default:
|
||||||
|
return &FastjsonParser{pp: pp}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return v.(Parser)
|
return v.(Parser)
|
||||||
}
|
}
|
||||||
@ -67,15 +80,28 @@ func (pp *Pool) Put(p Parser) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pp *Pool) ParseDateTime(key string, val string) (t time.Time, err error) {
|
func (pp *Pool) ParseDateTime(key string, val string) (t time.Time, err error) {
|
||||||
|
var layout string
|
||||||
|
var lay interface{}
|
||||||
|
var ok bool
|
||||||
var t2 time.Time
|
var t2 time.Time
|
||||||
if val == "" {
|
if val == "" {
|
||||||
err = ErrParseDateTime
|
err = ErrParseDateTime
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if lay, ok = pp.knownLayouts.Load(key); !ok {
|
||||||
|
t2, layout = parseInLocation(val, pp.timeZone)
|
||||||
if t2, err = time.ParseInLocation("2006-01-02 15:04:05", val, pp.timeZone); err != nil {
|
if layout == "" {
|
||||||
|
err = ErrParseDateTime
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t = t2
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if layout, ok = lay.(string); !ok {
|
||||||
|
err = ErrParseDateTime
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if t2, err = time.ParseInLocation(layout, val, pp.timeZone); err != nil {
|
||||||
err = ErrParseDateTime
|
err = ErrParseDateTime
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user