bi/cmd/sinker/action/action.go

246 lines
6.6 KiB
Go
Raw Normal View History

2022-01-26 16:40:50 +08:00
package action
import (
"bytes"
"errors"
"fmt"
"github.com/1340691923/xwl_bi/engine/db"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/model"
"github.com/1340691923/xwl_bi/platform-basic-libs/service/consumer_data"
"github.com/1340691923/xwl_bi/platform-basic-libs/sinker"
parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse"
"github.com/1340691923/xwl_bi/platform-basic-libs/util"
"github.com/valyala/fastjson"
"go.uber.org/zap"
2022-01-26 16:40:50 +08:00
"strconv"
"strings"
"sync"
)
const (
PresetAttribute = 1
CustomAttribute = 2
IsUserAttribute = 1
IsEventAttribute = 2
)
var MetaAttrRelationSet sync.Map
var AttributeMap sync.Map
var MetaEventMap sync.Map
2022-03-03 19:16:25 +08:00
var metaAttrRelationChan = make(chan metaAttrRelationModel, 10000)
var attributeChan = make(chan attributeModel, 10000)
var metaEventChan = make(chan metaEventModel, 10000)
type metaAttrRelationModel struct {
EventName string
EventAttr string
AppId string
}
type attributeModel struct {
AttributeName string
DataType int
AttributeType int
attribute_source int
App_id string
}
type metaEventModel struct {
EventName string
AppId string
}
2022-01-26 16:40:50 +08:00
func MysqlConsumer() {
for {
select {
case m := <-metaAttrRelationChan:
2022-03-03 19:16:25 +08:00
if _, err := db.Sqlx.Exec(`insert into meta_attr_relation(app_id,event_name,event_attr) values (?,?,?);`,
m.AppId,m.EventName,m.EventAttr); err != nil && !strings.Contains(err.Error(), "1062") {
logs.Logger.Sugar().Errorf("meta_attr_relation insert",m, err)
2022-01-26 16:40:50 +08:00
}
case m := <-attributeChan:
2022-03-03 19:16:25 +08:00
if _, err := db.Sqlx.Exec(`insert into attribute(app_id,attribute_source,attribute_type,data_type,attribute_name) values (?,?,?,?,?);`,
m.App_id,m.attribute_source,m.AttributeType,m.DataType,m.AttributeName); err != nil && !strings.Contains(err.Error(), "1062") {
logs.Logger.Sugar().Errorf("attribute insert",m, err)
2022-01-26 16:40:50 +08:00
}
case m := <-metaEventChan:
2022-03-03 19:16:25 +08:00
_, err := db.Sqlx.Exec(`insert into meta_event(appid,event_name) values (?,?);`,m.AppId,m.EventName)
2022-01-26 16:40:50 +08:00
if err != nil && !strings.Contains(err.Error(), "1062") {
2022-03-03 19:16:25 +08:00
logs.Logger.Sugar().Errorf("metaEvent insert",m, err)
2022-01-26 16:40:50 +08:00
}
default:
}
}
}
func AddMetaEvent(kafkaData model.KafkaData) (err error) {
if kafkaData.ReportType == model.EventReportType {
b := bytes.Buffer{}
b.WriteString(kafkaData.TableId)
b.WriteString("_")
b.WriteString(kafkaData.EventName)
bStr := b.String()
_, found := MetaEventMap.Load(bStr)
if !found {
2022-03-03 19:16:25 +08:00
metaEventChan <- metaEventModel{
EventName: kafkaData.EventName,
AppId: kafkaData.TableId,
2022-01-26 16:40:50 +08:00
}
MetaEventMap.Store(bStr, struct{}{})
}
}
return nil
}
func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.ReportAcceptStatusData), tableName string, ReqDataObject *parser.FastjsonMetric) (err error) {
2022-03-04 09:52:18 +08:00
dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, tableName, nil, db.ClickHouseSqlx,false)
2022-01-26 16:40:50 +08:00
if err != nil {
logs.Logger.Error("sinker.GetDims", zap.Error(err))
return
}
obj, err := ReqDataObject.GetParseObject().Object()
if err != nil {
logs.Logger.Error("ReqDataObject.GetParseObject().Object()", zap.Error(err))
return
}
knownKeys := []string{}
newKeys := new(sync.Map)
var foundNewKey bool
tableId, _ := strconv.Atoi(kafkaData.TableId)
GetReportTypeErr := kafkaData.GetReportTypeErr()
for _, column := range dims {
knownKeys = append(knownKeys, column.Name)
if obj.Get(column.Name) != nil {
reportType := parser.FjDetectType(obj.Get(column.Name))
if reportType != column.Type {
if !(reportType == parser.Int && column.Type == parser.Float) && !(reportType == parser.Float && column.Type == parser.Int) {
errorReason := fmt.Sprintf("%s的类型错误正确类型为%v上报类型为%v(%v)", column.Name, parser.TypeRemarkMap[column.Type], parser.TypeRemarkMap[reportType], obj.Get(column.Name).String())
failFunc(consumer_data.ReportAcceptStatusData{
PartDate: kafkaData.ReportTime,
TableId: tableId,
ReportType: GetReportTypeErr,
DataName: kafkaData.EventName,
ErrorReason: errorReason,
ErrorHandling: "丢弃数据",
ReportData: util.Bytes2str(kafkaData.ReqData),
XwlKafkaOffset: kafkaData.Offset,
Status: consumer_data.FailStatus,
})
return errors.New(errorReason)
}
}
}
}
b := bytes.Buffer{}
obj.Visit(func(key []byte, v *fastjson.Value) {
2022-03-03 19:16:25 +08:00
columnName := string(key)
2022-01-26 16:40:50 +08:00
func() {
b.Reset()
b.WriteString(kafkaData.TableId)
b.WriteString("_")
b.WriteString(kafkaData.EventName)
b.WriteString("_")
b.WriteString(columnName)
bStr := b.String()
_, found := MetaAttrRelationSet.Load(bStr)
if !found {
2022-03-03 19:16:25 +08:00
metaAttrRelationChan <- metaAttrRelationModel{
EventName: kafkaData.EventName,
EventAttr: columnName,
AppId: kafkaData.TableId,
2022-01-26 16:40:50 +08:00
}
MetaAttrRelationSet.Store(bStr, struct{}{})
}
}()
if !util.InstrArr(knownKeys, columnName) {
foundNewKey = true
newKeys.Store(columnName, parser.FjDetectType(obj.Get(columnName)))
}
func() {
b.Reset()
b.WriteString(kafkaData.TableId)
b.WriteString("_xwl_")
b.WriteString(strconv.Itoa(kafkaData.ReportType))
b.WriteString("_")
b.WriteString(columnName)
AttributeMapkey := b.String()
_, found := AttributeMap.Load(AttributeMapkey)
if !found {
var (
attributeType, attributeSource int
)
if _, ok := parser.SysColumn[columnName]; ok {
attributeType = PresetAttribute
} else {
attributeType = CustomAttribute
}
switch kafkaData.ReportType {
case model.UserReportType:
attributeSource = IsUserAttribute
case model.EventReportType:
attributeSource = IsEventAttribute
}
2022-03-03 19:16:25 +08:00
attributeChan <- attributeModel{
AttributeName:columnName,
DataType: parser.FjDetectType(obj.Get(columnName)),
AttributeType: attributeType,
attribute_source: attributeSource,
App_id: kafkaData.TableId,
2022-01-26 16:40:50 +08:00
}
AttributeMap.Store(AttributeMapkey, struct{}{})
}
}()
})
if foundNewKey {
dims, err = sinker.ChangeSchema(newKeys, model.GlobConfig.Comm.ClickHouse.DbName, tableName, dims)
if err != nil {
logs.Logger.Error("err", zap.Error(err))
}
func() {
redisConn := db.RedisPool.Get()
defer redisConn.Close()
dimsCachekey := sinker.GetDimsCachekey(model.GlobConfig.Comm.ClickHouse.DbName, tableName)
2022-03-03 15:50:32 +08:00
_, err = redisConn.Do("unlink", dimsCachekey)
2022-01-26 16:40:50 +08:00
if err != nil {
2022-03-03 19:16:25 +08:00
_,err = redisConn.Do("del", dimsCachekey)
if err != nil {
logs.Logger.Error("err", zap.Error(err))
}
2022-01-26 16:40:50 +08:00
}
2022-03-03 15:50:32 +08:00
sinker.ClearDimsCacheByKey(dimsCachekey)
2022-01-26 16:40:50 +08:00
}()
}
consumer_data.TableColumnMap.Store(tableName, dims)
return
}