diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index b41e232..9a54084 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -29,9 +29,9 @@ var MetaAttrRelationSet sync.Map var AttributeMap sync.Map var MetaEventMap sync.Map -var metaAttrRelationChan = make(chan map[string]interface{}, 1000) -var attributeChan = make(chan map[string]interface{}, 1000) -var metaEventChan = make(chan map[string]interface{}, 1000) +var metaAttrRelationChan = make(chan map[string]interface{}, 10000) +var attributeChan = make(chan map[string]interface{}, 10000) +var metaEventChan = make(chan map[string]interface{}, 10000) func MysqlConsumer() { for { @@ -55,11 +55,6 @@ func MysqlConsumer() { } } -func AddRealTimeData(realTimeWarehousingData *consumer_data.RealTimeWarehousingData, realTimeWarehousing *consumer_data.RealTimeWarehousing) (err error) { - - err = realTimeWarehousing.Add(realTimeWarehousingData) - return err -} func AddMetaEvent(kafkaData model.KafkaData) (err error) { if kafkaData.ReportType == model.EventReportType { @@ -86,6 +81,8 @@ func AddMetaEvent(kafkaData model.KafkaData) (err error) { return nil } + + func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.ReportAcceptStatusData), tableName string, ReqDataObject *parser.FastjsonMetric) (err error) { dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, tableName, nil, db.ClickHouseSqlx) diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index e813d6f..a1b5b21 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -37,6 +37,7 @@ func init() { flag.StringVar(&configFileName, "configFileName", "config", "配置文件名") flag.StringVar(&configFileExt, "configFileExt", "json", "配置文件后缀") flag.Parse() + } //核心逻辑都在sinker这边 @@ -108,6 +109,10 @@ func main() { reportData2CKSarama := realTimeDataSarama.Clone() go action.MysqlConsumer() var json = jsoniter.ConfigCompatibleWithStandardLibrary + pp, err := parser.NewParserPool("fastjson", nil, "", "") + if err != nil { + panic(err) + } err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { //ETL var kafkaData model.KafkaData @@ -234,13 +239,12 @@ func main() { kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) //解析开发者上报的json数据 - metric, err := parser.ParseKafkaData(kafkaData.ReqData) + metric, err := parser.ParseKafkaData(pp,kafkaData.ReqData) if err != nil { logs.Logger.Error("ParseKafkaData err", zap.Error(err)) markFn() return } - log.Println(metric.GetParseObject().String()) //生成表名 tableName := kafkaData.GetTableName() diff --git a/controller/report_controller.go b/controller/report_controller.go index 0eefe95..53e4d5b 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -25,6 +25,12 @@ type ReportController struct { BaseController } +var pp *parser.Pool + +func init(){ + pp, _ = parser.NewParserPool("fastjson", nil, "", "") +} + //上报接口 func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { @@ -93,7 +99,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { if reportService.IsDebugUser(debug, xwlDistinctId, tableId) { - metric, debugErr := parser.ParseKafkaData(body) + metric, debugErr := parser.ParseKafkaData(pp,body) if debugErr != nil { logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) diff --git a/platform-basic-libs/sinker/parse/parser.go b/platform-basic-libs/sinker/parse/parser.go index df989c6..75f8483 100644 --- a/platform-basic-libs/sinker/parse/parser.go +++ b/platform-basic-libs/sinker/parse/parser.go @@ -60,14 +60,9 @@ func NewParserPool(name string, csvFormat []string, delimiter string, timezone s return } -func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) { - pp, err := NewParserPool("fastjson", nil, "", "") - if err != nil { - return - } - - jsonParser := pp.Get() - defer pp.Put(jsonParser) +func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error) { + jsonParser := pool.Get() + defer pool.Put(jsonParser) metric, err = jsonParser.Parse(data) return }