This commit is contained in:
1340691923@qq.com 2022-03-10 17:10:33 +08:00
parent cf77900bc4
commit 39ae332a5b
2 changed files with 166 additions and 164 deletions

View File

@ -121,41 +121,41 @@ func main() {
model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportTopicName,
model.GlobConfig.Comm.Kafka.RealTimeDataGroup, model.GlobConfig.Comm.Kafka.RealTimeDataGroup,
func(msg model.InputMessage, markFn func()) { func(msg model.InputMessage, markFn func()) {
//ETL //ETL
var kafkaData model.KafkaData var kafkaData model.KafkaData
err = json.Unmarshal(msg.Value, &kafkaData) err = json.Unmarshal(msg.Value, &kafkaData)
if err != nil { if err != nil {
logs.Logger.Error("json.Unmarshal Err", zap.Error(err)) logs.Logger.Error("json.Unmarshal Err", zap.Error(err))
markFn() markFn()
return return
} }
appid,err := strconv.Atoi(kafkaData.TableId) appid, err := strconv.Atoi(kafkaData.TableId)
if err != nil { if err != nil {
logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err)) logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err))
markFn() markFn()
return return
} }
//添加实时数据 //添加实时数据
err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{ err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{
Appid: int64(appid), Appid: int64(appid),
EventName: kafkaData.EventName, EventName: kafkaData.EventName,
CreateTime: kafkaData.ReportTime, CreateTime: kafkaData.ReportTime,
Data: kafkaData.ReqData, Data: kafkaData.ReqData,
}) })
if err != nil { if err != nil {
logs.Logger.Error("AddRealTimeData err", zap.Error(err)) logs.Logger.Error("AddRealTimeData err", zap.Error(err))
} }
markFn() markFn()
}, func() {}) }, func() {})
if err != nil { if err != nil {
panic(err) panic(err)
} }
parserPool,err := parser.NewParserPool("fastjson") parserPool, err := parser.NewParserPool("fastjson")
if err!=nil{ if err != nil {
panic(err) panic(err)
} }
@ -165,150 +165,150 @@ func main() {
model.GlobConfig.Comm.Kafka.ReportData2CKGroup, model.GlobConfig.Comm.Kafka.ReportData2CKGroup,
func(msg model.InputMessage, markFn func()) { func(msg model.InputMessage, markFn func()) {
var kafkaData model.KafkaData var kafkaData model.KafkaData
err = json.Unmarshal(msg.Value, &kafkaData)
if err != nil {
logs.Logger.Error("json.Unmarshal Err", zap.Error(err))
markFn()
return
}
kafkaData.Offset = msg.Offset err = json.Unmarshal(msg.Value, &kafkaData)
kafkaData.ConsumptionTime = msg.Timestamp.Format(util.TimeFormat)
gjsonArr := gjson.GetManyBytes(kafkaData.ReqData, "xwl_distinct_id", "xwl_client_time")
xwlDistinctId := gjsonArr[0].String()
xwlClientTime := gjsonArr[1].String()
tableId, _ := strconv.Atoi(kafkaData.TableId)
if kafkaData.EventName == ""{
markFn()
return
}
if xwlDistinctId == "" {
logs.Logger.Error("xwl_distinct_id 为空", zap.String("kafkaData.ReqData", util.Bytes2str(kafkaData.ReqData)))
var eventType = ""
switch kafkaData.ReportType {
case model.UserReportType:
eventType = "用户属性类型不合法"
case model.EventReportType:
eventType = "事件属性类型不合法"
}
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: eventType,
DataName: kafkaData.EventName,
ErrorReason: "xwl_distinct_id 不能为空",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
markFn()
return
}
if kafkaData.Ip != "" {
province, city, err := geoip2.GetAreaFromIP(kafkaData.Ip)
if err != nil { if err != nil {
logs.Logger.Sugar().Errorf("err", err) logs.Logger.Error("json.Unmarshal Err", zap.Error(err))
markFn()
return
} }
if province != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_province", province)
}
if city != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_city", city)
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_ip", kafkaData.Ip)
}
clinetT := util.Str2Time(xwlClientTime, util.TimeFormat)
serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat)
if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { kafkaData.Offset = msg.Offset
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ kafkaData.ConsumptionTime = msg.Timestamp.Format(util.TimeFormat)
gjsonArr := gjson.GetManyBytes(kafkaData.ReqData, "xwl_distinct_id", "xwl_client_time")
xwlDistinctId := gjsonArr[0].String()
xwlClientTime := gjsonArr[1].String()
tableId, _ := strconv.Atoi(kafkaData.TableId)
if kafkaData.EventName == "" {
markFn()
return
}
if xwlDistinctId == "" {
logs.Logger.Error("xwl_distinct_id 为空", zap.String("kafkaData.ReqData", util.Bytes2str(kafkaData.ReqData)))
var eventType = ""
switch kafkaData.ReportType {
case model.UserReportType:
eventType = "用户属性类型不合法"
case model.EventReportType:
eventType = "事件属性类型不合法"
}
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: eventType,
DataName: kafkaData.EventName,
ErrorReason: "xwl_distinct_id 不能为空",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
markFn()
return
}
if kafkaData.Ip != "" {
province, city, err := geoip2.GetAreaFromIP(kafkaData.Ip)
if err != nil {
logs.Logger.Sugar().Errorf("err", err)
}
if province != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_province", province)
}
if city != "" {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_city", city)
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_ip", kafkaData.Ip)
}
clinetT := util.Str2Time(xwlClientTime, util.TimeFormat)
serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat)
if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 {
reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: kafkaData.GetReportTypeErr(),
DataName: kafkaData.EventName,
ErrorReason: "客户端上报时间误差大于十分钟",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
logs.Logger.Sugar().Errorf("客户端上报时间误差大于十分钟", xwlClientTime, kafkaData.ReportTime)
markFn()
return
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_event", kafkaData.EventName)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_date", xwlClientTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
pool := parserPool.Get()
defer parserPool.Put(pool)
metric, err := pool.Parse(kafkaData.ReqData)
//解析开发者上报的json数据
if err != nil {
logs.Logger.Error("ParseKafkaData err", zap.Error(err))
markFn()
return
}
//生成表名
tableName := kafkaData.GetTableName()
//新增表结构
if err := action.AddTableColumn(
kafkaData,
func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) },
tableName,
metric,
); err != nil {
logs.Logger.Error("addTableColumn err", zap.String("tableName", tableName), zap.Error(err))
markFn()
return
}
//添加元数据
if err := action.AddMetaEvent(kafkaData); err != nil {
logs.Logger.Error("addMetaEvent err", zap.Error(err))
}
//入库成功
if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime, PartDate: kafkaData.ReportTime,
TableId: tableId, TableId: tableId,
ReportType: kafkaData.GetReportTypeErr(),
DataName: kafkaData.EventName, DataName: kafkaData.EventName,
ErrorReason: "客户端上报时间误差大于十分钟",
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset, XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus, Status: consumer_data.SuccessStatus,
}) }); err != nil {
logs.Logger.Sugar().Errorf("客户端上报时间误差大于十分钟", xwlClientTime, kafkaData.ReportTime) logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err))
}
//添加数据到ck用于后台统计
if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{
tableName: metric,
}); err != nil {
logs.Logger.Error("reportData2CK err", zap.Error(err))
markFn()
return
}
markFn() markFn()
return
}
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_event", kafkaData.EventName) //logs.Logger.Info("链路所花时长", zap.String("time", time.Now().Sub(startT).String()))
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_part_date", xwlClientTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
pool := parserPool.Get()
defer parserPool.Put(pool)
metric, err := pool.Parse(kafkaData.ReqData)
//解析开发者上报的json数据 }, func() {})
if err != nil {
logs.Logger.Error("ParseKafkaData err", zap.Error(err))
markFn()
return
}
//生成表名
tableName := kafkaData.GetTableName()
//新增表结构
if err := action.AddTableColumn(
kafkaData,
func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) },
tableName,
metric,
); err != nil {
logs.Logger.Error("addTableColumn err", zap.String("tableName", tableName), zap.Error(err))
markFn()
return
}
//添加元数据
if err := action.AddMetaEvent(kafkaData); err != nil {
logs.Logger.Error("addMetaEvent err", zap.Error(err))
}
//入库成功
if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime,
TableId: tableId,
DataName: kafkaData.EventName,
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.SuccessStatus,
}); err != nil {
logs.Logger.Error("reportAcceptStatus Add SuccessStatus err", zap.Error(err))
}
//添加数据到ck用于后台统计
if err := reportData2CK.Add(map[string]*parser.FastjsonMetric{
tableName: metric,
}); err != nil {
logs.Logger.Error("reportData2CK err", zap.Error(err))
markFn()
return
}
markFn()
//logs.Logger.Info("链路所花时长", zap.String("time", time.Now().Sub(startT).String()))
}, func() {})
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -48,8 +48,8 @@ func (this *ReportData2CK) Flush() (err error) {
rowsMap := map[string][][]interface{}{} rowsMap := map[string][][]interface{}{}
for bufferIndex := range this.buffer { for _,obj := range this.buffer {
for tableName := range this.buffer[bufferIndex] { for tableName,data := range obj {
rowArr := []interface{}{} rowArr := []interface{}{}
rows := [][]interface{}{} rows := [][]interface{}{}
if _, haveKey := rowsMap[tableName]; haveKey { if _, haveKey := rowsMap[tableName]; haveKey {
@ -59,7 +59,7 @@ func (this *ReportData2CK) Flush() (err error) {
} }
dims, _ := TableColumnMap.Load(tableName) dims, _ := TableColumnMap.Load(tableName)
for _, dim := range dims.([]*model2.ColumnWithType) { for _, dim := range dims.([]*model2.ColumnWithType) {
val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim) val := parser.GetValueByType(data, dim)
rowArr = append(rowArr, val) rowArr = append(rowArr, val)
} }
@ -68,6 +68,7 @@ func (this *ReportData2CK) Flush() (err error) {
} }
} }
bytesbuffer:=bytes.Buffer{} bytesbuffer:=bytes.Buffer{}
TableColumnMap.Range(func(tableName, value interface{}) bool { TableColumnMap.Range(func(tableName, value interface{}) bool {
@ -107,6 +108,7 @@ func (this *ReportData2CK) Flush() (err error) {
} }
defer stmt.Close() defer stmt.Close()
haveFail := false haveFail := false
for _, row := range rowsMap[tableName.(string)] { for _, row := range rowsMap[tableName.(string)] {
if _, err := stmt.Exec(row...); err != nil { if _, err := stmt.Exec(row...); err != nil {