diff --git a/cmd/report_server/main.go b/cmd/report_server/main.go index 1d76ad1..da5a4a2 100644 --- a/cmd/report_server/main.go +++ b/cmd/report_server/main.go @@ -73,7 +73,7 @@ func main() { } } }() - go sinker.ClearDimsCacheByTime(time.Minute * 2) + go sinker.ClearDimsCacheByTime(time.Minute * 30) router := fasthttprouter.New() diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index b6e491a..30a9f58 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -29,25 +29,46 @@ var MetaAttrRelationSet sync.Map var AttributeMap sync.Map var MetaEventMap sync.Map -var metaAttrRelationChan = make(chan map[string]interface{}, 10000) -var attributeChan = make(chan map[string]interface{}, 10000) -var metaEventChan = make(chan map[string]interface{}, 10000) +var metaAttrRelationChan = make(chan metaAttrRelationModel, 10000) +var attributeChan = make(chan attributeModel, 10000) +var metaEventChan = make(chan metaEventModel, 10000) + +type metaAttrRelationModel struct { + EventName string + EventAttr string + AppId string +} + +type attributeModel struct { + AttributeName string + DataType int + AttributeType int + attribute_source int + App_id string +} + +type metaEventModel struct { + EventName string + AppId string +} func MysqlConsumer() { for { select { case m := <-metaAttrRelationChan: - if _, err := db.SqlBuilder.Insert("meta_attr_relation").SetMap(m).RunWith(db.Sqlx).Exec(); err != nil && !strings.Contains(err.Error(), "1062") { - logs.Logger.Error("meta_attr_relation insert", zap.Error(err)) + if _, err := db.Sqlx.Exec(`insert into meta_attr_relation(app_id,event_name,event_attr) values (?,?,?);`, + m.AppId,m.EventName,m.EventAttr); err != nil && !strings.Contains(err.Error(), "1062") { + logs.Logger.Sugar().Errorf("meta_attr_relation insert",m, err) } case m := <-attributeChan: - if _, err := db.SqlBuilder.Insert("attribute").SetMap(m).RunWith(db.Sqlx).Exec(); err != nil && !strings.Contains(err.Error(), "1062") { - logs.Logger.Error("attribute insert", zap.Error(err)) + if _, err := db.Sqlx.Exec(`insert into attribute(app_id,attribute_source,attribute_type,data_type,attribute_name) values (?,?,?,?,?);`, + m.App_id,m.attribute_source,m.AttributeType,m.DataType,m.AttributeName); err != nil && !strings.Contains(err.Error(), "1062") { + logs.Logger.Sugar().Errorf("attribute insert",m, err) } case m := <-metaEventChan: - _, err := db.SqlBuilder.Insert("meta_event").SetMap(m).RunWith(db.Sqlx).Exec() + _, err := db.Sqlx.Exec(`insert into meta_event(appid,event_name) values (?,?);`,m.AppId,m.EventName) if err != nil && !strings.Contains(err.Error(), "1062") { - logs.Logger.Error("metaEvent insert", zap.Error(err)) + logs.Logger.Sugar().Errorf("metaEvent insert",m, err) } default: @@ -66,11 +87,10 @@ func AddMetaEvent(kafkaData model.KafkaData) (err error) { _, found := MetaEventMap.Load(bStr) if !found { - m := map[string]interface{}{ - "appid": kafkaData.TableId, - "event_name": kafkaData.EventName, + metaEventChan <- metaEventModel{ + EventName: kafkaData.EventName, + AppId: kafkaData.TableId, } - metaEventChan <- m MetaEventMap.Store(bStr, struct{}{}) } } @@ -85,7 +105,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. return } - obj, err := ReqDataObject.GetParseObject().Object() if err != nil { logs.Logger.Error("ReqDataObject.GetParseObject().Object()", zap.Error(err)) @@ -129,7 +148,7 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. obj.Visit(func(key []byte, v *fastjson.Value) { - columnName := util.Bytes2str(key) + columnName := string(key) func() { @@ -142,17 +161,14 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. bStr := b.String() _, found := MetaAttrRelationSet.Load(bStr) - if !found { - m := map[string]interface{}{ - "event_name": kafkaData.EventName, - "event_attr": columnName, - "app_id": kafkaData.TableId, + metaAttrRelationChan <- metaAttrRelationModel{ + EventName: kafkaData.EventName, + EventAttr: columnName, + AppId: kafkaData.TableId, } - metaAttrRelationChan <- m MetaAttrRelationSet.Store(bStr, struct{}{}) } - }() if !util.InstrArr(knownKeys, columnName) { @@ -188,14 +204,14 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. case model.EventReportType: attributeSource = IsEventAttribute } - m := map[string]interface{}{ - "attribute_name": columnName, - "data_type": parser.FjDetectType(obj.Get(columnName)), - "attribute_type": attributeType, - "attribute_source": attributeSource, - "app_id": kafkaData.TableId, + + attributeChan <- attributeModel{ + AttributeName:columnName, + DataType: parser.FjDetectType(obj.Get(columnName)), + AttributeType: attributeType, + attribute_source: attributeSource, + App_id: kafkaData.TableId, } - attributeChan <- m AttributeMap.Store(AttributeMapkey, struct{}{}) } @@ -214,8 +230,10 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. dimsCachekey := sinker.GetDimsCachekey(model.GlobConfig.Comm.ClickHouse.DbName, tableName) _, err = redisConn.Do("unlink", dimsCachekey) if err != nil { - redisConn.Do("del", dimsCachekey) - logs.Logger.Error("err", zap.Error(err)) + _,err = redisConn.Do("del", dimsCachekey) + if err != nil { + logs.Logger.Error("err", zap.Error(err)) + } } sinker.ClearDimsCacheByKey(dimsCachekey) diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index 0ced460..ae8735c 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -109,7 +109,7 @@ func main() { realTimeDataSarama := sinker.NewKafkaSarama() reportData2CKSarama := realTimeDataSarama.Clone() go action.MysqlConsumer() - go sinker.ClearDimsCacheByTime(time.Minute * 2) + go sinker.ClearDimsCacheByTime(time.Minute * 30) var json = jsoniter.ConfigCompatibleWithStandardLibrary pp, err := parser.NewParserPool() if err != nil { @@ -239,9 +239,12 @@ func main() { kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_server_time", kafkaData.ReportTime) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_offset", msg.Offset) kafkaData.ReqData, _ = sjson.SetBytes(kafkaData.ReqData, "xwl_kafka_partition", msg.Partition) + jsonParser := pp.Get() + defer pp.Put(jsonParser) + + metric, err := jsonParser.Parse(kafkaData.ReqData) //解析开发者上报的json数据 - metric, err := parser.ParseKafkaData(pp,kafkaData.ReqData) if err != nil { logs.Logger.Error("ParseKafkaData err", zap.Error(err)) markFn() diff --git a/controller/manager_user_controller.go b/controller/manager_user_controller.go index 31278a1..e46aac4 100644 --- a/controller/manager_user_controller.go +++ b/controller/manager_user_controller.go @@ -82,6 +82,9 @@ func (this ManagerUserController) ModifyPwd(ctx *Ctx) error { if err != nil { return this.Error(ctx, err) } + + util.TokenBucket.LoadOrStore(token, claims.ExpiresAt) + return this.Success(ctx, response.OperateSuccess, nil) } diff --git a/platform-basic-libs/service/analysis/cache.go b/platform-basic-libs/service/analysis/cache.go index ed0ac9c..93ee9ee 100644 --- a/platform-basic-libs/service/analysis/cache.go +++ b/platform-basic-libs/service/analysis/cache.go @@ -3,8 +3,10 @@ package analysis import ( "fmt" "github.com/1340691923/xwl_bi/engine/db" + "github.com/1340691923/xwl_bi/engine/logs" "github.com/1340691923/xwl_bi/platform-basic-libs/util" "github.com/garyburd/redigo/redis" + "go.uber.org/zap" "time" ) @@ -19,7 +21,10 @@ func ClearCacheByAppid(key string) (err error) { defer conn.Close() _, err = conn.Do("unlink", key) if err != nil { - conn.Do("del", key) + _,err = conn.Do("del", key) + if err != nil { + logs.Logger.Error("err", zap.Error(err)) + } } return } diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index 1683034..71c9e16 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -19,7 +19,6 @@ type ReportData2CK struct { bufferMutex *sync.RWMutex batchSize int flushInterval int - pool sync.Pool } func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { @@ -37,18 +36,6 @@ func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { return reportData2CK } -func(this *ReportData2CK)GetBuffer()*bytes.Buffer{ - v := this.pool.Get() - if v == nil { - return new(bytes.Buffer) - } - return v.(*bytes.Buffer) -} - -func(this *ReportData2CK)PutBuffer(buff *bytes.Buffer){ - buff.Reset() - this.pool.Put(buff) -} func (this *ReportData2CK) Flush() (err error) { this.bufferMutex.Lock() @@ -79,8 +66,7 @@ func (this *ReportData2CK) Flush() (err error) { } } - bytesbuffer:=this.GetBuffer() - defer this.PutBuffer(bytesbuffer) + bytesbuffer:=bytes.Buffer{} TableColumnMap.Range(func(key, value interface{}) bool { @@ -93,11 +79,7 @@ func (this *ReportData2CK) Flush() (err error) { params := make([]string, len(seriesDims)) for i, serDim := range seriesDims { - bytesbuffer.WriteString("`") - bytesbuffer.WriteString(serDim.Name) - bytesbuffer.WriteString("`") - serDimsQuoted[i] = bytesbuffer.String() - bytesbuffer.Reset() + serDimsQuoted[i] ="`"+serDim.Name+"`" params[i] = "?" } @@ -125,6 +107,7 @@ func (this *ReportData2CK) Flush() (err error) { defer stmt.Close() haveFail := false for _, row := range rowsMap[tableName] { + logs.Logger.Sugar().Infof("insertSQL", insertSql,row) if _, err := stmt.Exec(row...); err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) haveFail = true diff --git a/platform-basic-libs/sinker/clickhouse.go b/platform-basic-libs/sinker/clickhouse.go index 4f2fdb6..0a727f1 100644 --- a/platform-basic-libs/sinker/clickhouse.go +++ b/platform-basic-libs/sinker/clickhouse.go @@ -60,8 +60,10 @@ func ClearDimsCacheByRedis(key string){ _, err := redisConn.Do("unlink", key) if err != nil { - redisConn.Do("del", key) - logs.Logger.Error("err", zap.Error(err)) + _,err = redisConn.Do("del", key) + if err!=nil{ + logs.Logger.Error("err", zap.Error(err)) + } } }