diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index a113133..899ba2c 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -57,9 +57,6 @@ func MysqlConsumer() { func AddMetaEvent(kafkaData model.KafkaData) (err error) { if kafkaData.ReportType == model.EventReportType { - redisConn := db.RedisPool.Get() - defer redisConn.Close() - b := bytes.Buffer{} b.WriteString(kafkaData.TableId) b.WriteString("_") @@ -128,7 +125,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. } } - b := bytes.Buffer{} obj.Visit(func(key []byte, v *fastjson.Value) { @@ -207,7 +203,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. }() }) - if foundNewKey { dims, err = sinker.ChangeSchema(newKeys, model.GlobConfig.Comm.ClickHouse.DbName, tableName, dims) if err != nil { diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index b791600..b10ef0f 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -245,6 +245,7 @@ func main() { markFn() return } + //生成表名 tableName := kafkaData.GetTableName() @@ -260,6 +261,7 @@ func main() { return } + //添加元数据 if err := action.AddMetaEvent(kafkaData); err != nil { logs.Logger.Error("addMetaEvent err", zap.Error(err)) diff --git a/platform-basic-libs/sinker/clickhouse.go b/platform-basic-libs/sinker/clickhouse.go index c38380e..6f23de9 100644 --- a/platform-basic-libs/sinker/clickhouse.go +++ b/platform-basic-libs/sinker/clickhouse.go @@ -50,7 +50,6 @@ func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (d redisConn := db.RedisPool.Get() defer redisConn.Close() - dimsBytes, redisErr := redis.Bytes(redisConn.Do("get", dimsCachekey)) if redisErr == nil && len(dimsBytes) != 0 {