diff --git a/cmd/report_server/main.go b/cmd/report_server/main.go index f8ef80d..1d76ad1 100644 --- a/cmd/report_server/main.go +++ b/cmd/report_server/main.go @@ -12,6 +12,7 @@ import ( "github.com/1340691923/xwl_bi/engine/logs" "github.com/1340691923/xwl_bi/middleware" "github.com/1340691923/xwl_bi/model" + "github.com/1340691923/xwl_bi/platform-basic-libs/sinker" _ "github.com/ClickHouse/clickhouse-go" "github.com/buaazp/fasthttprouter" _ "github.com/go-sql-driver/mysql" @@ -72,6 +73,7 @@ func main() { } } }() + go sinker.ClearDimsCacheByTime(time.Minute * 2) router := fasthttprouter.New() diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index 899ba2c..b6e491a 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -212,10 +212,13 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. redisConn := db.RedisPool.Get() defer redisConn.Close() dimsCachekey := sinker.GetDimsCachekey(model.GlobConfig.Comm.ClickHouse.DbName, tableName) - _, err = redisConn.Do("del", dimsCachekey) + _, err = redisConn.Do("unlink", dimsCachekey) if err != nil { + redisConn.Do("del", dimsCachekey) logs.Logger.Error("err", zap.Error(err)) } + sinker.ClearDimsCacheByKey(dimsCachekey) + }() } diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index b10ef0f..0ced460 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -24,6 +24,7 @@ import ( _ "net/http/pprof" "runtime" "strconv" + "time" ) var ( @@ -108,6 +109,7 @@ func main() { realTimeDataSarama := sinker.NewKafkaSarama() reportData2CKSarama := realTimeDataSarama.Clone() go action.MysqlConsumer() + go sinker.ClearDimsCacheByTime(time.Minute * 2) var json = jsoniter.ConfigCompatibleWithStandardLibrary pp, err := parser.NewParserPool() if err != nil { diff --git a/platform-basic-libs/service/analysis/cache.go b/platform-basic-libs/service/analysis/cache.go index ee4d3d6..ed0ac9c 100644 --- a/platform-basic-libs/service/analysis/cache.go +++ b/platform-basic-libs/service/analysis/cache.go @@ -19,7 +19,7 @@ func ClearCacheByAppid(key string) (err error) { defer conn.Close() _, err = conn.Do("unlink", key) if err != nil { - _, err = conn.Do("del", key) + conn.Do("del", key) } return } diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index 18b7fb0..1683034 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -7,7 +7,6 @@ import ( model2 "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/model" parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse" "go.uber.org/zap" - "log" "strings" "sync" "time" @@ -20,6 +19,7 @@ type ReportData2CK struct { bufferMutex *sync.RWMutex batchSize int flushInterval int + pool sync.Pool } func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { @@ -37,6 +37,19 @@ func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { return reportData2CK } +func(this *ReportData2CK)GetBuffer()*bytes.Buffer{ + v := this.pool.Get() + if v == nil { + return new(bytes.Buffer) + } + return v.(*bytes.Buffer) +} + +func(this *ReportData2CK)PutBuffer(buff *bytes.Buffer){ + buff.Reset() + this.pool.Put(buff) +} + func (this *ReportData2CK) Flush() (err error) { this.bufferMutex.Lock() if len(this.buffer)==0{ @@ -46,8 +59,8 @@ func (this *ReportData2CK) Flush() (err error) { startNow := time.Now() rowsMap := map[string][][]interface{}{} - for _, data := range this.buffer { - for tableName, metric := range data { + for bufferIndex := range this.buffer { + for tableName := range this.buffer[bufferIndex] { rows := [][]interface{}{} if _, haveKey := rowsMap[tableName]; haveKey { rows = rowsMap[tableName] @@ -58,7 +71,7 @@ func (this *ReportData2CK) Flush() (err error) { dims := v.([]*model2.ColumnWithType) var rowArr []interface{} for _, dim := range dims { - val := parser.GetValueByType(metric, dim) + val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim) rowArr = append(rowArr, val) } rows = append(rows, rowArr) @@ -66,7 +79,8 @@ func (this *ReportData2CK) Flush() (err error) { } } - buffer := bytes.Buffer{} + bytesbuffer:=this.GetBuffer() + defer this.PutBuffer(bytesbuffer) TableColumnMap.Range(func(key, value interface{}) bool { @@ -77,22 +91,27 @@ func (this *ReportData2CK) Flush() (err error) { seriesDims := value.([]*model2.ColumnWithType) serDimsQuoted := make([]string, len(seriesDims)) params := make([]string, len(seriesDims)) + for i, serDim := range seriesDims { - serDimsQuoted[i] = "`" + serDim.Name + "`" + bytesbuffer.WriteString("`") + bytesbuffer.WriteString(serDim.Name) + bytesbuffer.WriteString("`") + serDimsQuoted[i] = bytesbuffer.String() + bytesbuffer.Reset() params[i] = "?" } - buffer.WriteString("INSERT INTO ") - buffer.WriteString(tableName) - buffer.WriteString(" (") - buffer.WriteString(strings.Join(serDimsQuoted, ",")) - buffer.WriteString(") ") - buffer.WriteString("VALUES (") - buffer.WriteString(strings.Join(params, ",")) - buffer.WriteString(")") + bytesbuffer.WriteString("INSERT INTO ") + bytesbuffer.WriteString(tableName) + bytesbuffer.WriteString(" (") + bytesbuffer.WriteString(strings.Join(serDimsQuoted, ",")) + bytesbuffer.WriteString(") ") + bytesbuffer.WriteString("VALUES (") + bytesbuffer.WriteString(strings.Join(params, ",")) + bytesbuffer.WriteString(")") - insertSql := buffer.String() - buffer.Reset() + insertSql := bytesbuffer.String() + bytesbuffer.Reset() tx, err := db.ClickHouseSqlx.Begin() if err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) @@ -106,7 +125,6 @@ func (this *ReportData2CK) Flush() (err error) { defer stmt.Close() haveFail := false for _, row := range rowsMap[tableName] { - log.Println("row",row) if _, err := stmt.Exec(row...); err != nil { logs.Logger.Error("CK入库失败", zap.Error(err)) haveFail = true diff --git a/platform-basic-libs/sinker/clickhouse.go b/platform-basic-libs/sinker/clickhouse.go index 6f23de9..4f2fdb6 100644 --- a/platform-basic-libs/sinker/clickhouse.go +++ b/platform-basic-libs/sinker/clickhouse.go @@ -18,6 +18,7 @@ import ( "regexp" "strings" "sync" + "time" ) var ( @@ -38,27 +39,64 @@ func GetDimsCachekey(database, table string) string { return dimsCachekey } +var dimsCacheMap sync.Map +func ClearDimsCacheByTime(clearTime time.Duration){ -func init() { + for{ + time.Sleep(clearTime) + dimsCacheMap.Range(func(key, value interface{}) bool { + ClearDimsCacheByRedis(key.(string)) + dimsCacheMap.Delete(key) + return true + }) + } +} + +func ClearDimsCacheByRedis(key string){ + redisConn := db.RedisPool.Get() + defer redisConn.Close() + + _, err := redisConn.Do("unlink", key) + if err != nil { + redisConn.Do("del", key) + logs.Logger.Error("err", zap.Error(err)) + } +} + +func ClearDimsCacheByKey(key string){ + dimsCacheMap.Delete(key) } func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (dims []*model2.ColumnWithType, err error) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary dimsCachekey := GetDimsCachekey(database, table) + cache,load := dimsCacheMap.Load(dimsCachekey) + + if load { + return cache.([]*model2.ColumnWithType),nil + } + + var json = jsoniter.ConfigCompatibleWithStandardLibrary redisConn := db.RedisPool.Get() defer redisConn.Close() dimsBytes, redisErr := redis.Bytes(redisConn.Do("get", dimsCachekey)) if redisErr == nil && len(dimsBytes) != 0 { - jsonErr := json.Unmarshal(dimsBytes, &dims) - if jsonErr == nil { - return - } else { - logs.Logger.Error("jsonErr", zap.Error(jsonErr)) + dimsCache,err:=util.GzipUnCompressByte(dimsBytes) + if err==nil{ + jsonErr := json.Unmarshal(dimsCache, &dims) + if jsonErr == nil { + dimsCacheMap.Store(dimsCachekey,dims) + return dims,err + } else { + logs.Logger.Error("jsonErr", zap.Error(jsonErr)) + } + }else{ + logs.Logger.Error("GzipUnCompressByte Err", zap.Error(err)) } + } else { logs.Logger.Error("redisErr", zap.Error(redisErr)) } @@ -66,7 +104,7 @@ func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (d var rs *sql.Rows if rs, err = conn.Query(fmt.Sprintf(selectSQLTemplate, database, table)); err != nil { err = errors.Wrapf(err, "") - return + return dims,err } defer rs.Close() @@ -74,7 +112,7 @@ func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (d for rs.Next() { if err = rs.Scan(&name, &typ, &defaultKind); err != nil { err = errors.Wrapf(err, "") - return + return dims,err } typ = lowCardinalityRegexp.ReplaceAllString(typ, "$1") if !util.InstrArr(excludedColumns, name) && defaultKind != "MATERIALIZED" { @@ -84,14 +122,18 @@ func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (d } if len(dims) == 0 { err = errors.Wrapf(ErrTblNotExist, "%s.%s", database, table) - return + return dims,err } + dimsCacheMap.Store(dimsCachekey,dims) res, _ := json.Marshal(dims) + s,err:=util.GzipCompressByte(res) + if err!=nil{ + return dims,err + } + _, err = redisConn.Do("SETEX", dimsCachekey, 60*60*6, s) - _, err = redisConn.Do("SETEX", dimsCachekey, 60*60*6, res) - - return + return dims,err } func GetSourceName(name string) (sourcename string) { diff --git a/platform-basic-libs/util/gzip.go b/platform-basic-libs/util/gzip.go index eb59862..22742e3 100644 --- a/platform-basic-libs/util/gzip.go +++ b/platform-basic-libs/util/gzip.go @@ -6,6 +6,17 @@ import ( "io/ioutil" ) +func GzipCompressByte(data []byte) ([]byte, error) { + buf := bytes.NewBuffer(nil) + gzW := gzip.NewWriter(buf) + _, err := gzW.Write(data) + if err != nil { + return nil, err + } + gzW.Close() + return buf.Bytes(), err +} + func GzipCompress(data string) ([]byte, error) { buf := bytes.NewBuffer(nil) gzW := gzip.NewWriter(buf) @@ -25,3 +36,12 @@ func GzipUnCompress(data []byte) (string, error) { b, err := ioutil.ReadAll(gzR) return Bytes2str(b), err } + +func GzipUnCompressByte(data []byte) ([]byte, error) { + gzR, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(gzR) + return b, err +}