From ba035974b5f1cb520774dee2b98faa3cccf8c590 Mon Sep 17 00:00:00 2001 From: "1340691923@qq.com" <1340691923@qq.com> Date: Wed, 2 Mar 2022 19:31:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/sinker/action/action.go | 5 ----- cmd/sinker/main.go | 2 ++ platform-basic-libs/sinker/clickhouse.go | 1 - 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index a113133..899ba2c 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -57,9 +57,6 @@ func MysqlConsumer() { func AddMetaEvent(kafkaData model.KafkaData) (err error) { if kafkaData.ReportType == model.EventReportType { - redisConn := db.RedisPool.Get() - defer redisConn.Close() - b := bytes.Buffer{} b.WriteString(kafkaData.TableId) b.WriteString("_") @@ -128,7 +125,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. } } - b := bytes.Buffer{} obj.Visit(func(key []byte, v *fastjson.Value) { @@ -207,7 +203,6 @@ func AddTableColumn(kafkaData model.KafkaData, failFunc func(data consumer_data. }() }) - if foundNewKey { dims, err = sinker.ChangeSchema(newKeys, model.GlobConfig.Comm.ClickHouse.DbName, tableName, dims) if err != nil { diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index b791600..b10ef0f 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -245,6 +245,7 @@ func main() { markFn() return } + //生成表名 tableName := kafkaData.GetTableName() @@ -260,6 +261,7 @@ func main() { return } + //添加元数据 if err := action.AddMetaEvent(kafkaData); err != nil { logs.Logger.Error("addMetaEvent err", zap.Error(err)) diff --git a/platform-basic-libs/sinker/clickhouse.go b/platform-basic-libs/sinker/clickhouse.go index c38380e..6f23de9 100644 --- a/platform-basic-libs/sinker/clickhouse.go +++ b/platform-basic-libs/sinker/clickhouse.go @@ -50,7 +50,6 @@ func GetDims(database, table string, excludedColumns []string, conn *sqlx.DB) (d redisConn := db.RedisPool.Get() defer redisConn.Close() - dimsBytes, redisErr := redis.Bytes(redisConn.Do("get", dimsCachekey)) if redisErr == nil && len(dimsBytes) != 0 {