diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index f0aa851..7892f0a 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -154,10 +154,7 @@ func main() { panic(err) } - parserPool, err := parser.NewParserPool("fastjson") - if err != nil { - panic(err) - } + err = reportData2CKSarama.Init( model.GlobConfig.Comm.Kafka, @@ -254,9 +251,9 @@ func main() { kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) - pool := parserPool.Get() - defer parserPool.Put(pool) - metric, err := pool.Parse(kafkaData.ReqData) + pp := parser.FastjsonParser{} + + metric, err := pp.Parse(kafkaData.ReqData) //解析开发者上报的json数据 if err != nil { @@ -295,10 +292,10 @@ func main() { }); err != nil { logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err)) } - //添加数据到ck用于后台统计 - if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{ - tableName: metric, + if err := reportData2CK.Add(consumer_data.FastjsonMetricData{ + TableName:tableName, + FastjsonMetric: metric, }); err != nil { logs.Logger.Error("reportData2CK err", zap.Error(err)) markFn() diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index 1e3baea..08d86b4 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -16,16 +16,21 @@ import ( var TableColumnMap sync.Map type ReportData2CK struct { - buffer []map[string]*parser.FastjsonMetric + buffer []FastjsonMetricData bufferMutex *sync.RWMutex batchSize int flushInterval int } +type FastjsonMetricData struct { + FastjsonMetric *parser.FastjsonMetric + TableName string +} + func NewReportData2CK(config model.BatchConfig) *ReportData2CK { logs.Logger.Info("NewReportData2CK", zap.Int("batchSize", config.BufferSize), zap.Int("flushInterval", config.FlushInterval)) reportData2CK := &ReportData2CK{ - buffer: make([]map[string]*parser.FastjsonMetric, 0, config.BufferSize), + buffer: make([]FastjsonMetricData, 0, config.BufferSize), bufferMutex: new(sync.RWMutex), batchSize: config.BufferSize, flushInterval: config.FlushInterval, @@ -37,10 +42,9 @@ func NewReportData2CK(config model.BatchConfig) *ReportData2CK { return reportData2CK } - func (this *ReportData2CK) Flush() (err error) { this.bufferMutex.Lock() - if len(this.buffer)==0{ + if len(this.buffer) == 0 { this.bufferMutex.Unlock() return nil } @@ -48,28 +52,28 @@ func (this *ReportData2CK) Flush() (err error) { rowsMap := map[string][][]interface{}{} - for _,obj := range this.buffer { - for tableName,data := range obj { - rowArr := []interface{}{} - rows := [][]interface{}{} - if _, haveKey := rowsMap[tableName]; haveKey { - rows = rowsMap[tableName] - } else { - rowsMap[tableName] = rows - } - dims, _ := TableColumnMap.Load(tableName) - for _, dim := range dims.([]*model2.ColumnWithType) { - val := parser.GetValueByType(data, dim) - rowArr = append(rowArr, val) - } + for _, obj := range this.buffer { - rows = append(rows, rowArr) - rowsMap[tableName] = rows + rowArr := []interface{}{} + rows := [][]interface{}{} + if _, haveKey := rowsMap[obj.TableName]; haveKey { + rows = rowsMap[obj.TableName] + } else { + rowsMap[obj.TableName] = rows } + dims, _ := TableColumnMap.Load(obj.TableName) + for _, dim := range dims.([]*model2.ColumnWithType) { + + val := parser.GetValueByType(obj.FastjsonMetric, dim) + rowArr = append(rowArr, val) + } + + rows = append(rows, rowArr) + rowsMap[obj.TableName] = rows + } - - bytesbuffer:=bytes.Buffer{} + bytesbuffer := bytes.Buffer{} TableColumnMap.Range(func(tableName, value interface{}) bool { @@ -80,7 +84,7 @@ func (this *ReportData2CK) Flush() (err error) { params := make([]string, len(seriesDims)) for i, serDim := range seriesDims { - serDimsQuoted[i] ="`"+serDim.Name+"`" + serDimsQuoted[i] = "`" + serDim.Name + "`" params[i] = "?" } @@ -131,13 +135,14 @@ func (this *ReportData2CK) Flush() (err error) { return true }) - this.buffer = make([]map[string]*parser.FastjsonMetric, 0, this.batchSize) + this.buffer = make([]FastjsonMetricData, 0, this.batchSize) this.bufferMutex.Unlock() return nil } -func (this *ReportData2CK) Add(data map[string]*parser.FastjsonMetric) (err error) { +func (this *ReportData2CK) Add(data FastjsonMetricData) (err error) { this.bufferMutex.Lock() + this.buffer = append(this.buffer, data) this.bufferMutex.Unlock()