From 39ae332a5b10ca18b953682a62a39fcf8e5c1e30 Mon Sep 17 00:00:00 2001 From: "1340691923@qq.com" <1340691923@qq.com> Date: Thu, 10 Mar 2022 17:10:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/sinker/main.go | 322 +++++++++--------- .../service/consumer_data/reportdata2ck.go | 8 +- 2 files changed, 166 insertions(+), 164 deletions(-) diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index 751e6ce..f0aa851 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -121,41 +121,41 @@ func main() { model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { - //ETL - var kafkaData model.KafkaData - err = json.Unmarshal(msg.Value, &kafkaData) - if err != nil { - logs.Logger.Error("json.Unmarshal Err", zap.Error(err)) - markFn() - return - } - appid,err := strconv.Atoi(kafkaData.TableId) - if err != nil { - logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err)) - markFn() - return - } - //添加实时数据 - err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{ - Appid: int64(appid), - EventName: kafkaData.EventName, - CreateTime: kafkaData.ReportTime, - Data: kafkaData.ReqData, - }) + //ETL + var kafkaData model.KafkaData + err = json.Unmarshal(msg.Value, &kafkaData) + if err != nil { + logs.Logger.Error("json.Unmarshal Err", zap.Error(err)) + markFn() + return + } + appid, err := strconv.Atoi(kafkaData.TableId) + if err != nil { + logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err)) + markFn() + return + } + //添加实时数据 + err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{ + Appid: int64(appid), + EventName: kafkaData.EventName, + CreateTime: kafkaData.ReportTime, + Data: kafkaData.ReqData, + }) - if err != nil { - logs.Logger.Error("AddRealTimeData err", zap.Error(err)) - } - markFn() + if err != nil { + logs.Logger.Error("AddRealTimeData err", zap.Error(err)) + } + markFn() - }, func() {}) + }, func() {}) if err != nil { panic(err) } - parserPool,err := parser.NewParserPool("fastjson") - if err!=nil{ + parserPool, err := parser.NewParserPool("fastjson") + if err != nil { panic(err) } @@ -165,150 +165,150 @@ func main() { model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) { - var kafkaData model.KafkaData - err = json.Unmarshal(msg.Value, &kafkaData) - if err != nil { - logs.Logger.Error("json.Unmarshal Err", zap.Error(err)) - markFn() - return - } + var kafkaData model.KafkaData - kafkaData.Offset = msg.Offset - kafkaData.ConsumptionTime = msg.Timestamp.Format(util.TimeFormat) - - gjsonArr := gjson.GetManyBytes(kafkaData.ReqData, "xwl_distinct_id", "xwl_client_time") - - xwlDistinctId := gjsonArr[0].String() - - xwlClientTime := gjsonArr[1].String() - - tableId, _ := strconv.Atoi(kafkaData.TableId) - - if kafkaData.EventName == ""{ - markFn() - return - } - - if xwlDistinctId == "" { - logs.Logger.Error("xwl_distinct_id 为空", zap.String("kafkaData.ReqData", util.Bytes2str(kafkaData.ReqData))) - - var eventType = "" - - switch kafkaData.ReportType { - case model.UserReportType: - eventType = "用户属性类型不合法" - case model.EventReportType: - eventType = "事件属性类型不合法" - } - reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ - PartDate: kafkaData.ReportTime, - TableId: tableId, - ReportType: eventType, - DataName: kafkaData.EventName, - ErrorReason: "xwl_distinct_id 不能为空", - ErrorHandling: "丢弃数据", - ReportData: util.Bytes2str(kafkaData.ReqData), - XwlKafkaOffset: kafkaData.Offset, - Status: consumer_data.FailStatus, - }) - markFn() - return - } - - if kafkaData.Ip != "" { - province, city, err := geoip2.GetAreaFromIP(kafkaData.Ip) + err = json.Unmarshal(msg.Value, &kafkaData) if err != nil { - logs.Logger.Sugar().Errorf("err", err) + logs.Logger.Error("json.Unmarshal Err", zap.Error(err)) + markFn() + return } - if province != "" { - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_province", province) - } - if city != "" { - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_city", city) - } - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_ip", kafkaData.Ip) - } - clinetT := util.Str2Time(xwlClientTime, util.TimeFormat) - serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat) - if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { - reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ + kafkaData.Offset = msg.Offset + kafkaData.ConsumptionTime = msg.Timestamp.Format(util.TimeFormat) + + gjsonArr := gjson.GetManyBytes(kafkaData.ReqData, "xwl_distinct_id", "xwl_client_time") + + xwlDistinctId := gjsonArr[0].String() + + xwlClientTime := gjsonArr[1].String() + + tableId, _ := strconv.Atoi(kafkaData.TableId) + + if kafkaData.EventName == "" { + markFn() + return + } + + if xwlDistinctId == "" { + logs.Logger.Error("xwl_distinct_id 为空", zap.String("kafkaData.ReqData", util.Bytes2str(kafkaData.ReqData))) + + var eventType = "" + + switch kafkaData.ReportType { + case model.UserReportType: + eventType = "用户属性类型不合法" + case model.EventReportType: + eventType = "事件属性类型不合法" + } + reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ + PartDate: kafkaData.ReportTime, + TableId: tableId, + ReportType: eventType, + DataName: kafkaData.EventName, + ErrorReason: "xwl_distinct_id 不能为空", + ErrorHandling: "丢弃数据", + ReportData: util.Bytes2str(kafkaData.ReqData), + XwlKafkaOffset: kafkaData.Offset, + Status: consumer_data.FailStatus, + }) + markFn() + return + } + + if kafkaData.Ip != "" { + province, city, err := geoip2.GetAreaFromIP(kafkaData.Ip) + if err != nil { + logs.Logger.Sugar().Errorf("err", err) + } + if province != "" { + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_province", province) + } + if city != "" { + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_city", city) + } + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_ip", kafkaData.Ip) + } + clinetT := util.Str2Time(xwlClientTime, util.TimeFormat) + serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat) + + if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { + reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ + PartDate: kafkaData.ReportTime, + TableId: tableId, + ReportType: kafkaData.GetReportTypeErr(), + DataName: kafkaData.EventName, + ErrorReason: "客户端上报时间误差大于十分钟", + ErrorHandling: "丢弃数据", + ReportData: util.Bytes2str(kafkaData.ReqData), + XwlKafkaOffset: kafkaData.Offset, + Status: consumer_data.FailStatus, + }) + logs.Logger.Sugar().Errorf("客户端上报时间误差大于十分钟", xwlClientTime, kafkaData.ReportTime) + markFn() + return + } + + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_event", kafkaData.EventName) + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_date", xwlClientTime) + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) + kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) + pool := parserPool.Get() + defer parserPool.Put(pool) + metric, err := pool.Parse(kafkaData.ReqData) + + //解析开发者上报的json数据 + if err != nil { + logs.Logger.Error("ParseKafkaData err", zap.Error(err)) + markFn() + return + } + + //生成表名 + tableName := kafkaData.GetTableName() + + //新增表结构 + if err := action.AddTableColumn( + kafkaData, + func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) }, + tableName, + metric, + ); err != nil { + logs.Logger.Error("addTableColumn err", zap.String("tableName", tableName), zap.Error(err)) + markFn() + return + } + + //添加元数据 + if err := action.AddMetaEvent(kafkaData); err != nil { + logs.Logger.Error("addMetaEvent err", zap.Error(err)) + } + + //入库成功 + if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ PartDate: kafkaData.ReportTime, TableId: tableId, - ReportType: kafkaData.GetReportTypeErr(), DataName: kafkaData.EventName, - ErrorReason: "客户端上报时间误差大于十分钟", - ErrorHandling: "丢弃数据", - ReportData: util.Bytes2str(kafkaData.ReqData), XwlKafkaOffset: kafkaData.Offset, - Status: consumer_data.FailStatus, - }) - logs.Logger.Sugar().Errorf("客户端上报时间误差大于十分钟", xwlClientTime, kafkaData.ReportTime) + Status: consumer_data.SuccessStatus, + }); err != nil { + logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err)) + } + + //添加数据到ck用于后台统计 + if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{ + tableName: metric, + }); err != nil { + logs.Logger.Error("reportData2CK err", zap.Error(err)) + markFn() + return + } markFn() - return - } - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_event", kafkaData.EventName) - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_date", xwlClientTime) - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) - kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) - pool := parserPool.Get() - defer parserPool.Put(pool) - metric, err := pool.Parse(kafkaData.ReqData) + //logs.Logger.Info("链路所花时长", zap.String("time", time.Now().Sub(startT).String())) - //解析开发者上报的json数据 - if err != nil { - logs.Logger.Error("ParseKafkaData err", zap.Error(err)) - markFn() - return - } - - //生成表名 - tableName := kafkaData.GetTableName() - - //新增表结构 - if err := action.AddTableColumn( - kafkaData, - func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) }, - tableName, - metric, - ); err != nil { - logs.Logger.Error("addTableColumn err", zap.String("tableName", tableName), zap.Error(err)) - markFn() - return - } - - - //添加元数据 - if err := action.AddMetaEvent(kafkaData); err != nil { - logs.Logger.Error("addMetaEvent err", zap.Error(err)) - } - - //入库成功 - if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ - PartDate: kafkaData.ReportTime, - TableId: tableId, - DataName: kafkaData.EventName, - XwlKafkaOffset: kafkaData.Offset, - Status: consumer_data.SuccessStatus, - }); err != nil { - logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err)) - } - - //添加数据到ck用于后台统计 - if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{ - tableName: metric, - }); err != nil { - logs.Logger.Error("reportData2CK err", zap.Error(err)) - markFn() - return - } - markFn() - - //logs.Logger.Info("链路所花时长", zap.String("time", time.Now().Sub(startT).String())) - - }, func() {}) + }, func() {}) if err != nil { panic(err) diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index f8d1b2e..1e3baea 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -48,8 +48,8 @@ func (this *ReportData2CK) Flush() (err error) { rowsMap := map[string][][]interface{}{} - for bufferIndex := range this.buffer { - for tableName := range this.buffer[bufferIndex] { + for _,obj := range this.buffer { + for tableName,data := range obj { rowArr := []interface{}{} rows := [][]interface{}{} if _, haveKey := rowsMap[tableName]; haveKey { @@ -59,7 +59,7 @@ func (this *ReportData2CK) Flush() (err error) { } dims, _ := TableColumnMap.Load(tableName) for _, dim := range dims.([]*model2.ColumnWithType) { - val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim) + val := parser.GetValueByType(data, dim) rowArr = append(rowArr, val) } @@ -68,6 +68,7 @@ func (this *ReportData2CK) Flush() (err error) { } } + bytesbuffer:=bytes.Buffer{} TableColumnMap.Range(func(tableName, value interface{}) bool { @@ -107,6 +108,7 @@ func (this *ReportData2CK) Flush() (err error) { } defer stmt.Close() haveFail := false + for _, row := range rowsMap[tableName.(string)] { if _, err := stmt.Exec(row...); err != nil {