From 9e7b2a1673dfeee3647362686e1542655c7a0339 Mon Sep 17 00:00:00 2001 From: "1340691923@qq.com" <1340691923@qq.com> Date: Fri, 4 Mar 2022 09:52:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/sinker/action/action.go | 2 +- controller/report_controller.go | 2 +- .../consumer_data/real_time_warehousing.go | 1 + .../consumer_data/report_accpet_status.go | 4 +-- .../service/consumer_data/reportdata2ck.go | 33 +++++++++---------- platform-basic-libs/sinker/clickhouse.go | 12 ++++--- 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index 30a9f58..bdcbd41 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -99,7 +99,7 @@ func AddMetaEvent(kafkaData model.KafkaData) (err error) { func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data.ReportAcceptStatusData), tableName string, ReqDataObject *parser.FastjsonMetric) (err error) { - dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, tableName, nil, db.ClickHouseSqlx) + dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, tableName, nil, db.ClickHouseSqlx,false) if err != nil { logs.Logger.Error("sinker.GetDims", zap.Error(err)) return diff --git a/controller/report_controller.go b/controller/report_controller.go index abf33e4..9ef9d38 100644 --- a/controller/report_controller.go +++ b/controller/report_controller.go @@ -106,7 +106,7 @@ func (this ReportController) ReportAction(ctx *fasthttp.RequestCtx) { return } - dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx) + dims, err := sinker.GetDims(model.GlobConfig.Comm.ClickHouse.DbName, kafkaData.GetTableName(), []string{}, db.ClickHouseSqlx,true) if err != nil { logs.Logger.Error("sinker.GetDims", zap.Error(err)) this.FastError(ctx, errors.New("服务异常")) diff --git a/platform-basic-libs/service/consumer_data/real_time_warehousing.go b/platform-basic-libs/service/consumer_data/real_time_warehousing.go index 2d65a7e..0da35e3 100644 --- a/platform-basic-libs/service/consumer_data/real_time_warehousing.go +++ b/platform-basic-libs/service/consumer_data/real_time_warehousing.go @@ -24,6 +24,7 @@ type RealTimeWarehousing struct { } func NewRealTimeWarehousing(batchSize, flushInterval int) *RealTimeWarehousing { + logs.Logger.Info("NewRealTimeWarehousing", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval)) realTimeWarehousing := &RealTimeWarehousing{ buffer: make([]*RealTimeWarehousingData, 0, batchSize), bufferMutex: new(sync.RWMutex), diff --git a/platform-basic-libs/service/consumer_data/report_accpet_status.go b/platform-basic-libs/service/consumer_data/report_accpet_status.go index d7276dd..8cf8fc5 100644 --- a/platform-basic-libs/service/consumer_data/report_accpet_status.go +++ b/platform-basic-libs/service/consumer_data/report_accpet_status.go @@ -31,6 +31,7 @@ const FailStatus = 0 const SuccessStatus = 1 func NewReportAcceptStatus(batchSize int, flushInterval int) *ReportAcceptStatus { + logs.Logger.Info("NewReportAcceptStatus", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval)) reportAcceptStatus := &ReportAcceptStatus{ buffer: make([]*ReportAcceptStatusData, 0, batchSize), bufferMutex: new(sync.RWMutex), @@ -85,10 +86,9 @@ func (this *ReportAcceptStatus) Flush() (err error) { if err := tx.Commit(); err != nil { logs.Logger.Error("入库数据状态出现错误", zap.Error(err)) } else { - lostTime := time.Now().Sub(startNow).String() len := len(this.buffer) if len > 0 { - logs.Logger.Info("入库数据状态成功", zap.String("所花时间", lostTime), zap.Int("数据长度为", len)) + logs.Logger.Info("入库数据状态成功", zap.String("所花时间", time.Now().Sub(startNow).String()), zap.Int("数据长度为", len)) } } stmt.Close() diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index 71c9e16..f046ef8 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -46,18 +46,19 @@ func (this *ReportData2CK) Flush() (err error) { startNow := time.Now() rowsMap := map[string][][]interface{}{} + rowArr := []interface{}{} + rows := [][]interface{}{} for bufferIndex := range this.buffer { for tableName := range this.buffer[bufferIndex] { - rows := [][]interface{}{} + rows := rows[0:0] if _, haveKey := rowsMap[tableName]; haveKey { rows = rowsMap[tableName] } else { rowsMap[tableName] = rows } - v, _ := TableColumnMap.Load(tableName) - dims := v.([]*model2.ColumnWithType) - var rowArr []interface{} - for _, dim := range dims { + dims, _ := TableColumnMap.Load(tableName) + rowArr = rowArr[0:0] + for _, dim := range dims.([]*model2.ColumnWithType) { val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim) rowArr = append(rowArr, val) } @@ -68,11 +69,9 @@ func (this *ReportData2CK) Flush() (err error) { bytesbuffer:=bytes.Buffer{} - TableColumnMap.Range(func(key, value interface{}) bool { + TableColumnMap.Range(func(tableName, value interface{}) bool { - tableName := key.(string) - - if _, haveKey := rowsMap[tableName]; haveKey { + if _, haveKey := rowsMap[tableName.(string)]; haveKey { seriesDims := value.([]*model2.ColumnWithType) serDimsQuoted := make([]string, len(seriesDims)) @@ -84,7 +83,7 @@ func (this *ReportData2CK) Flush() (err error) { } bytesbuffer.WriteString("INSERT INTO ") - bytesbuffer.WriteString(tableName) + bytesbuffer.WriteString(tableName.(string)) bytesbuffer.WriteString(" (") bytesbuffer.WriteString(strings.Join(serDimsQuoted, ",")) bytesbuffer.WriteString(") ") @@ -92,22 +91,23 @@ func (this *ReportData2CK) Flush() (err error) { bytesbuffer.WriteString(strings.Join(params, ",")) bytesbuffer.WriteString(")") - insertSql := bytesbuffer.String() - bytesbuffer.Reset() + defer func() { + bytesbuffer.Reset() + }() + tx, err := db.ClickHouseSqlx.Begin() if err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) return false } - stmt, err := tx.Prepare(insertSql) + stmt, err := tx.Prepare(bytesbuffer.String()) if err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) return false } defer stmt.Close() haveFail := false - for _, row := range rowsMap[tableName] { - logs.Logger.Sugar().Infof("insertSQL", insertSql,row) + for _, row := range rowsMap[tableName.(string)] { if _, err := stmt.Exec(row...); err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) haveFail = true @@ -118,9 +118,8 @@ func (this *ReportData2CK) Flush() (err error) { logs.Logger.Error("CK入库失败", zap.Error(err)) return false } else { - lostTime := time.Now().Sub(startNow).String() len := len(this.buffer) - logs.Logger.Info("CK入库成功,", zap.String("所花时间", lostTime), zap.Int("数据长度为", len)) + logs.Logger.Info("CK入库成功,", zap.String("所花时间", time.Now().Sub(startNow).String()), zap.Int("数据长度为", len)) } } } diff --git a/platform-basic-libs/sinker/clickhouse.go b/platform-basic-libs/sinker/clickhouse.go index 52a3e83..eebb9fc 100644 --- a/platform-basic-libs/sinker/clickhouse.go +++ b/platform-basic-libs/sinker/clickhouse.go @@ -84,12 +84,14 @@ func ClearDimsCacheByKey(key string){ dimsCacheMap.Delete(key) } -func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (dims []*model2.ColumnWithType, err error) { - dimsCachekey := GetDimsCachekey(database, table) - cache,load := dimsCacheMap.Load(dimsCachekey) +func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB,onlyRedis bool) (dims []*model2.ColumnWithType, err error) { - if load { - return cache.([]*model2.ColumnWithType),nil + dimsCachekey := GetDimsCachekey(database, table) + if !onlyRedis{ + cache,load := dimsCacheMap.Load(dimsCachekey) + if load { + return cache.([]*model2.ColumnWithType),nil + } } var json = jsoniter.ConfigCompatibleWithStandardLibrary