This commit is contained in:
1340691923@qq.com 2022-03-03 19:16:25 +08:00
parent e45788427e
commit 51b3e423b0
7 changed files with 71 additions and 57 deletions

View File

@ -73,7 +73,7 @@ func main() {
} }
} }
}() }()
go sinker.ClearDimsCacheByTime(time.Minute * 2) go sinker.ClearDimsCacheByTime(time.Minute * 30)
router := fasthttprouter.New() router := fasthttprouter.New()

View File

@ -29,25 +29,46 @@ var MetaAttrRelationSet sync.Map
var AttributeMap sync.Map var AttributeMap sync.Map
var MetaEventMap sync.Map var MetaEventMap sync.Map
var metaAttrRelationChan = make(chan map[string]interface{}, 10000) var metaAttrRelationChan = make(chan metaAttrRelationModel, 10000)
var attributeChan = make(chan map[string]interface{}, 10000) var attributeChan = make(chan attributeModel, 10000)
var metaEventChan = make(chan map[string]interface{}, 10000) var metaEventChan = make(chan metaEventModel, 10000)
type metaAttrRelationModel struct {
EventName string
EventAttr string
AppId string
}
type attributeModel struct {
AttributeName string
DataType int
AttributeType int
attribute_source int
App_id string
}
type metaEventModel struct {
EventName string
AppId string
}
func MysqlConsumer() { func MysqlConsumer() {
for { for {
select { select {
case m := <-metaAttrRelationChan: case m := <-metaAttrRelationChan:
if _, err := db.SqlBuilder.Insert("meta_attr_relation").SetMap(m).RunWith(db.Sqlx).Exec(); err != nil && !strings.Contains(err.Error(), "1062") { if _, err := db.Sqlx.Exec(`insert into meta_attr_relation(app_id,event_name,event_attr) values (?,?,?);`,
logs.Logger.Error("meta_attr_relation insert", zap.Error(err)) m.AppId,m.EventName,m.EventAttr); err != nil && !strings.Contains(err.Error(), "1062") {
logs.Logger.Sugar().Errorf("meta_attr_relation insert",m, err)
} }
case m := <-attributeChan: case m := <-attributeChan:
if _, err := db.SqlBuilder.Insert("attribute").SetMap(m).RunWith(db.Sqlx).Exec(); err != nil && !strings.Contains(err.Error(), "1062") { if _, err := db.Sqlx.Exec(`insert into attribute(app_id,attribute_source,attribute_type,data_type,attribute_name) values (?,?,?,?,?);`,
logs.Logger.Error("attribute insert", zap.Error(err)) m.App_id,m.attribute_source,m.AttributeType,m.DataType,m.AttributeName); err != nil && !strings.Contains(err.Error(), "1062") {
logs.Logger.Sugar().Errorf("attribute insert",m, err)
} }
case m := <-metaEventChan: case m := <-metaEventChan:
_, err := db.SqlBuilder.Insert("meta_event").SetMap(m).RunWith(db.Sqlx).Exec() _, err := db.Sqlx.Exec(`insert into meta_event(appid,event_name) values (?,?);`,m.AppId,m.EventName)
if err != nil && !strings.Contains(err.Error(), "1062") { if err != nil && !strings.Contains(err.Error(), "1062") {
logs.Logger.Error("metaEvent insert", zap.Error(err)) logs.Logger.Sugar().Errorf("metaEvent insert",m, err)
} }
default: default:
@ -66,11 +87,10 @@ func AddMetaEvent(kafkaData model.KafkaData) (err error) {
_, found := MetaEventMap.Load(bStr) _, found := MetaEventMap.Load(bStr)
if !found { if !found {
m := map[string]interface{}{ metaEventChan <- metaEventModel{
"appid": kafkaData.TableId, EventName: kafkaData.EventName,
"event_name": kafkaData.EventName, AppId: kafkaData.TableId,
} }
metaEventChan <- m
MetaEventMap.Store(bStr, struct{}{}) MetaEventMap.Store(bStr, struct{}{})
} }
} }
@ -85,7 +105,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.
return return
} }
obj, err := ReqDataObject.GetParseObject().Object() obj, err := ReqDataObject.GetParseObject().Object()
if err != nil { if err != nil {
logs.Logger.Error("ReqDataObject.GetParseObject().Object()", zap.Error(err)) logs.Logger.Error("ReqDataObject.GetParseObject().Object()", zap.Error(err))
@ -129,7 +148,7 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.
obj.Visit(func(key []byte, v *fastjson.Value) { obj.Visit(func(key []byte, v *fastjson.Value) {
columnName := util.Bytes2str(key) columnName := string(key)
func() { func() {
@ -142,17 +161,14 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.
bStr := b.String() bStr := b.String()
_, found := MetaAttrRelationSet.Load(bStr) _, found := MetaAttrRelationSet.Load(bStr)
if !found { if !found {
m := map[string]interface{}{ metaAttrRelationChan <- metaAttrRelationModel{
"event_name": kafkaData.EventName, EventName: kafkaData.EventName,
"event_attr": columnName, EventAttr: columnName,
"app_id": kafkaData.TableId, AppId: kafkaData.TableId,
} }
metaAttrRelationChan <- m
MetaAttrRelationSet.Store(bStr, struct{}{}) MetaAttrRelationSet.Store(bStr, struct{}{})
} }
}() }()
if !util.InstrArr(knownKeys, columnName) { if !util.InstrArr(knownKeys, columnName) {
@ -188,14 +204,14 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.
case model.EventReportType: case model.EventReportType:
attributeSource = IsEventAttribute attributeSource = IsEventAttribute
} }
m := map[string]interface{}{
"attribute_name": columnName, attributeChan <- attributeModel{
"data_type": parser.FjDetectType(obj.Get(columnName)), AttributeName:columnName,
"attribute_type": attributeType, DataType: parser.FjDetectType(obj.Get(columnName)),
"attribute_source": attributeSource, AttributeType: attributeType,
"app_id": kafkaData.TableId, attribute_source: attributeSource,
App_id: kafkaData.TableId,
} }
attributeChan <- m
AttributeMap.Store(AttributeMapkey, struct{}{}) AttributeMap.Store(AttributeMapkey, struct{}{})
} }
@ -214,9 +230,11 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.
dimsCachekey := sinker.GetDimsCachekey(model.GlobConfig.Comm.ClickHouse.DbName, tableName) dimsCachekey := sinker.GetDimsCachekey(model.GlobConfig.Comm.ClickHouse.DbName, tableName)
_, err = redisConn.Do("unlink", dimsCachekey) _, err = redisConn.Do("unlink", dimsCachekey)
if err != nil { if err != nil {
redisConn.Do("del", dimsCachekey) _,err = redisConn.Do("del", dimsCachekey)
if err != nil {
logs.Logger.Error("err", zap.Error(err)) logs.Logger.Error("err", zap.Error(err))
} }
}
sinker.ClearDimsCacheByKey(dimsCachekey) sinker.ClearDimsCacheByKey(dimsCachekey)
}() }()

View File

@ -109,7 +109,7 @@ func main() {
realTimeDataSarama := sinker.NewKafkaSarama() realTimeDataSarama := sinker.NewKafkaSarama()
reportData2CKSarama := realTimeDataSarama.Clone() reportData2CKSarama := realTimeDataSarama.Clone()
go action.MysqlConsumer() go action.MysqlConsumer()
go sinker.ClearDimsCacheByTime(time.Minute * 2) go sinker.ClearDimsCacheByTime(time.Minute * 30)
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
pp, err := parser.NewParserPool() pp, err := parser.NewParserPool()
if err != nil { if err != nil {
@ -239,9 +239,12 @@ func main() {
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset)
kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition)
jsonParser := pp.Get()
defer pp.Put(jsonParser)
metric, err := jsonParser.Parse(kafkaData.ReqData)
//解析开发者上报的json数据 //解析开发者上报的json数据
metric, err := parser.ParseKafkaData(pp,kafkaData.ReqData)
if err != nil { if err != nil {
logs.Logger.Error("ParseKafkaData err", zap.Error(err)) logs.Logger.Error("ParseKafkaData err", zap.Error(err))
markFn() markFn()

View File

@ -82,6 +82,9 @@ func (this ManagerUserController) ModifyPwd(ctx *Ctx) error {
if err != nil { if err != nil {
return this.Error(ctx, err) return this.Error(ctx, err)
} }
util.TokenBucket.LoadOrStore(token, claims.ExpiresAt)
return this.Success(ctx, response.OperateSuccess, nil) return this.Success(ctx, response.OperateSuccess, nil)
} }

View File

@ -3,8 +3,10 @@ package analysis
import ( import (
"fmt" "fmt"
"github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/db"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/platform-basic-libs/util" "github.com/1340691923/xwl_bi/platform-basic-libs/util"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"go.uber.org/zap"
"time" "time"
) )
@ -19,7 +21,10 @@ func ClearCacheByAppid(key string) (err error) {
defer conn.Close() defer conn.Close()
_, err = conn.Do("unlink", key) _, err = conn.Do("unlink", key)
if err != nil { if err != nil {
conn.Do("del", key) _,err = conn.Do("del", key)
if err != nil {
logs.Logger.Error("err", zap.Error(err))
}
} }
return return
} }

View File

@ -19,7 +19,6 @@ type ReportData2CK struct {
bufferMutex *sync.RWMutex bufferMutex *sync.RWMutex
batchSize int batchSize int
flushInterval int flushInterval int
pool sync.Pool
} }
func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK {
@ -37,18 +36,6 @@ func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK {
return reportData2CK return reportData2CK
} }
func(this *ReportData2CK)GetBuffer()*bytes.Buffer{
v := this.pool.Get()
if v == nil {
return new(bytes.Buffer)
}
return v.(*bytes.Buffer)
}
func(this *ReportData2CK)PutBuffer(buff *bytes.Buffer){
buff.Reset()
this.pool.Put(buff)
}
func (this *ReportData2CK) Flush() (err error) { func (this *ReportData2CK) Flush() (err error) {
this.bufferMutex.Lock() this.bufferMutex.Lock()
@ -79,8 +66,7 @@ func (this *ReportData2CK) Flush() (err error) {
} }
} }
bytesbuffer:=this.GetBuffer() bytesbuffer:=bytes.Buffer{}
defer this.PutBuffer(bytesbuffer)
TableColumnMap.Range(func(key, value interface{}) bool { TableColumnMap.Range(func(key, value interface{}) bool {
@ -93,11 +79,7 @@ func (this *ReportData2CK) Flush() (err error) {
params := make([]string, len(seriesDims)) params := make([]string, len(seriesDims))
for i, serDim := range seriesDims { for i, serDim := range seriesDims {
bytesbuffer.WriteString("`") serDimsQuoted[i] ="`"+serDim.Name+"`"
bytesbuffer.WriteString(serDim.Name)
bytesbuffer.WriteString("`")
serDimsQuoted[i] = bytesbuffer.String()
bytesbuffer.Reset()
params[i] = "?" params[i] = "?"
} }
@ -125,6 +107,7 @@ func (this *ReportData2CK) Flush() (err error) {
defer stmt.Close() defer stmt.Close()
haveFail := false haveFail := false
for _, row := range rowsMap[tableName] { for _, row := range rowsMap[tableName] {
logs.Logger.Sugar().Infof("insertSQL", insertSql,row)
if _, err := stmt.Exec(row...); err != nil { if _, err := stmt.Exec(row...); err != nil {
logs.Logger.Error("CK入库失败", zap.Error(err)) logs.Logger.Error("CK入库失败", zap.Error(err))
haveFail = true haveFail = true

View File

@ -60,10 +60,12 @@ func ClearDimsCacheByRedis(key string){
_, err := redisConn.Do("unlink", key) _, err := redisConn.Do("unlink", key)
if err != nil { if err != nil {
redisConn.Do("del", key) _,err = redisConn.Do("del", key)
if err!=nil{
logs.Logger.Error("err", zap.Error(err)) logs.Logger.Error("err", zap.Error(err))
} }
} }
}
func ClearDimsCacheByKey(key string){ func ClearDimsCacheByKey(key string){
dimsCacheMap.Delete(key) dimsCacheMap.Delete(key)