优化代码
This commit is contained in:
		
							parent
							
								
									d26f484b58
								
							
						
					
					
						commit
						ffbcd7e790
					
				@ -29,9 +29,9 @@ var MetaAttrRelationSet sync.Map
 | 
			
		||||
var AttributeMap sync.Map
 | 
			
		||||
var MetaEventMap sync.Map
 | 
			
		||||
 | 
			
		||||
var metaAttrRelationChan = make(chan map[string]interface{}, 1000)
 | 
			
		||||
var attributeChan = make(chan map[string]interface{}, 1000)
 | 
			
		||||
var metaEventChan = make(chan map[string]interface{}, 1000)
 | 
			
		||||
var metaAttrRelationChan = make(chan map[string]interface{}, 10000)
 | 
			
		||||
var attributeChan = make(chan map[string]interface{}, 10000)
 | 
			
		||||
var metaEventChan = make(chan map[string]interface{}, 10000)
 | 
			
		||||
 | 
			
		||||
func MysqlConsumer() {
 | 
			
		||||
	for {
 | 
			
		||||
@ -55,11 +55,6 @@ func MysqlConsumer() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func AddRealTimeData(realTimeWarehousingData *consumer_data.RealTimeWarehousingData, realTimeWarehousing *consumer_data.RealTimeWarehousing) (err error) {
 | 
			
		||||
 | 
			
		||||
	err = realTimeWarehousing.Add(realTimeWarehousingData)
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func AddMetaEvent(kafkaData model.KafkaData) (err error) {
 | 
			
		||||
	if kafkaData.ReportType == model.EventReportType {
 | 
			
		||||
@ -86,6 +81,8 @@ func AddMetaEvent(kafkaData model.KafkaData) (err error) {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.ReportAcceptStatusData), tableName string, ReqDataObject *parser.FastjsonMetric) (err error) {
 | 
			
		||||
 | 
			
		||||
	dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, tableName, nil, db.ClickHouseSqlx)
 | 
			
		||||
 | 
			
		||||
@ -37,6 +37,7 @@ func init() {
 | 
			
		||||
	flag.StringVar(&configFileName, "configFileName", "config", "配置文件名")
 | 
			
		||||
	flag.StringVar(&configFileExt, "configFileExt", "json", "配置文件后缀")
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//核心逻辑都在sinker这边
 | 
			
		||||
@ -108,6 +109,10 @@ func main() {
 | 
			
		||||
	reportData2CKSarama := realTimeDataSarama.Clone()
 | 
			
		||||
	go action.MysqlConsumer()
 | 
			
		||||
	var json = jsoniter.ConfigCompatibleWithStandardLibrary
 | 
			
		||||
	pp, err := parser.NewParserPool("fastjson", nil, "", "")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) {
 | 
			
		||||
		//ETL
 | 
			
		||||
		var kafkaData model.KafkaData
 | 
			
		||||
@ -234,13 +239,12 @@ func main() {
 | 
			
		||||
		kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
 | 
			
		||||
 | 
			
		||||
		//解析开发者上报的json数据
 | 
			
		||||
		metric, err := parser.ParseKafkaData(kafkaData.ReqData)
 | 
			
		||||
		metric, err := parser.ParseKafkaData(pp,kafkaData.ReqData)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logs.Logger.Error("ParseKafkaData err", zap.Error(err))
 | 
			
		||||
			markFn()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		log.Println(metric.GetParseObject().String())
 | 
			
		||||
		//生成表名
 | 
			
		||||
		tableName := kafkaData.GetTableName()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -25,6 +25,12 @@ type ReportController struct {
 | 
			
		||||
	BaseController
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var pp *parser.Pool
 | 
			
		||||
 | 
			
		||||
func init(){
 | 
			
		||||
	pp, _ = parser.NewParserPool("fastjson", nil, "", "")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//上报接口
 | 
			
		||||
func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
 | 
			
		||||
 | 
			
		||||
@ -93,7 +99,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) {
 | 
			
		||||
 | 
			
		||||
	if reportService.IsDebugUser(debug, xwlDistinctId, tableId) {
 | 
			
		||||
 | 
			
		||||
		metric, debugErr := parser.ParseKafkaData(body)
 | 
			
		||||
		metric, debugErr := parser.ParseKafkaData(pp,body)
 | 
			
		||||
		if debugErr != nil {
 | 
			
		||||
			logs.Logger.Error("parser.ParseKafkaData ", zap.Error(err))
 | 
			
		||||
			this.FastError(ctx, errors.New("服务异常"))
 | 
			
		||||
 | 
			
		||||
@ -60,14 +60,9 @@ func NewParserPool(name string, csvFormat []string, delimiter string, timezone s
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ParseKafkaData(data []byte) (metric *FastjsonMetric, err error) {
 | 
			
		||||
	pp, err := NewParserPool("fastjson", nil, "", "")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	jsonParser := pp.Get()
 | 
			
		||||
	defer pp.Put(jsonParser)
 | 
			
		||||
func ParseKafkaData(pool *Pool, data []byte) (metric *FastjsonMetric, err error) {
 | 
			
		||||
	jsonParser := pool.Get()
 | 
			
		||||
	defer pool.Put(jsonParser)
 | 
			
		||||
	metric, err = jsonParser.Parse(data)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user