diff --git a/application/init.go b/application/init.go index a66c011..760be25 100644 --- a/application/init.go +++ b/application/init.go @@ -93,28 +93,6 @@ func InitClickHouse() (fn func(), err error) { return } -// 初始化mysql连接 -func InitEsClient() (fn func(), err error) { - config := model.GlobConfig.Comm.ElasticSearch - - db.EsClient, err = db.NewEsClient( - config.Addresses, - config.Username, - config.Password, - ) - - if err != nil { - return - } - log.Println(fmt.Sprintf("ES组件初始化成功!连接:%v,用户名:%v,密码:%v", - config.Addresses, - config.Username, - config.Password, - )) - fn = func() {} - return -} - // 初始化redis func InitRedisPool() (fn func(), err error) { config := model.GlobConfig.Comm.Redis diff --git a/cmd/init_app/ck/init.go b/cmd/init_app/ck/init.go index 143d541..14acbd9 100644 --- a/cmd/init_app/ck/init.go +++ b/cmd/init_app/ck/init.go @@ -61,7 +61,7 @@ func Init() { error_handling, report_type, status) - TTL part_date + toIntervalMonth(1) + TTL part_date + toIntervalMonth(3) SETTINGS index_granularity = 8192; `) if err != nil { @@ -69,5 +69,38 @@ func Init() { panic(err) } + _, err = db.ClickHouseSqlx.Exec(`DROP TABLE IF EXISTS xwl_real_time_warehousing` + sinker.GetClusterSql() + `;`) + + if err != nil { + log.Println(fmt.Sprintf("clickhouse 删除表 xwl_real_time_warehousing 失败:%s", err.Error())) + panic(err) + } + + _, err = db.ClickHouseSqlx.Exec(` + + CREATE TABLE xwl_real_time_warehousing ` + sinker.GetClusterSql() + ` + ( + + table_id Int64, + + create_time DateTime DEFAULT now(), + + event_name String, + + report_data String + ) + ENGINE = ` + sinker.GetMergeTree("xwl_real_time_warehousing") + ` + PARTITION BY (toYYYYMMDD(create_time)) + ORDER BY (toYYYYMMDD(create_time), + table_id, + event_name) + TTL create_time + toIntervalMonth(3) + SETTINGS index_granularity = 8192; +`) + if err != nil { + log.Println(fmt.Sprintf("clickhouse 建表 xwl_real_time_warehousing 失败:%s", err.Error())) + panic(err) + } + log.Println("初始化CK数据完成!") } diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 22c2dfe..226e653 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -37,7 +37,6 @@ func main() { application.RegisterInitFnObserver(application.InitRbac), application.RegisterInitFnObserver(application.InitOpenWinBrowser), application.RegisterInitFnObserver(application.InitClickHouse), - application.RegisterInitFnObserver(application.InitEsClient), application.RegisterInitFnObserver(application.InitRedisPool), application.RegisterInitFnObserver(application.InitDebugSarama), ) diff --git a/cmd/sinker/action/action.go b/cmd/sinker/action/action.go index 8ea700d..b41e232 100644 --- a/cmd/sinker/action/action.go +++ b/cmd/sinker/action/action.go @@ -4,8 +4,6 @@ import ( "bytes" "errors" "fmt" - "go.uber.org/zap" - "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" "github.com/1340691923/xwl_bi/model" @@ -14,6 +12,7 @@ import ( parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse" "github.com/1340691923/xwl_bi/platform-basic-libs/util" "github.com/valyala/fastjson" + "go.uber.org/zap" "strconv" "strings" "sync" @@ -56,19 +55,9 @@ func MysqlConsumer() { } } -func AddRealTimeData(kafkaData model.KafkaData, data string, realTimeWarehousing *consumer_data.RealTimeWarehousing) (err error) { +func AddRealTimeData(realTimeWarehousingData *consumer_data.RealTimeWarehousingData, realTimeWarehousing *consumer_data.RealTimeWarehousing) (err error) { - clientReportData := consumer_data.ClientReportData{ - Data: data, - TableId: kafkaData.TableId, - Date: util.Str2Time(kafkaData.ReportTime, util.TimeFormat).Format(util.TimeFormatDay4), - } - err = clientReportData.CreateIndex() - if err != nil { - logs.Logger.Error(" clientReportData.CreateIndex", zap.Error(err)) - } - bulkIndexRequest := clientReportData.GetReportData() - err = realTimeWarehousing.Add(bulkIndexRequest) + err = realTimeWarehousing.Add(realTimeWarehousingData) return err } diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index ab6ffe6..e813d6f 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "flag" "fmt" "github.com/1340691923/xwl_bi/application" @@ -69,7 +68,6 @@ func main() { application.RegisterInitFnObserver(application.InitLogs), application.RegisterInitFnObserver(application.InitMysql), application.RegisterInitFnObserver(application.InitClickHouse), - application.RegisterInitFnObserver(application.InitEsClient), application.RegisterInitFnObserver(application.InitRedisPool), ) @@ -109,11 +107,9 @@ func main() { realTimeDataSarama := sinker.NewKafkaSarama() reportData2CKSarama := realTimeDataSarama.Clone() go action.MysqlConsumer() - + var json = jsoniter.ConfigCompatibleWithStandardLibrary err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { - //ETL - var json = jsoniter.ConfigCompatibleWithStandardLibrary var kafkaData model.KafkaData err = json.Unmarshal(msg.Value, &kafkaData) if err != nil { @@ -121,34 +117,20 @@ func main() { markFn() return } - reportDataTmp := kafkaData.ReqData - reqData, err := json.Marshal(util.Bytes2str(reportDataTmp)) + appid,err := strconv.Atoi(kafkaData.TableId) if err != nil { - logs.Logger.Error("json.Marshal Err", zap.Error(err)) + logs.Logger.Error("strconv.Atoi(kafkaData.TableId) Err", zap.Error(err)) markFn() return } - xwlDistinctId := gjson.GetBytes(kafkaData.ReqData, "xwl_distinct_id") - - if xwlDistinctId.String() == "" { - logs.Logger.Sugar().Errorf("xwl_distinct_id 为空", util.Bytes2str(kafkaData.ReqData)) - markFn() - return - } - kafkaData.Offset = msg.Offset - - buff := bytes.Buffer{} - buff.WriteString(`{"event_name":"`) - buff.WriteString(kafkaData.EventName) - buff.WriteString(`","create_time":"`) - buff.WriteString(kafkaData.ReportTime) - buff.WriteString(`","data":`) - buff.WriteString(util.Bytes2str(reqData)) - buff.WriteString(`}`) - addRealTimeData := buff.String() - //添加实时数据 - err = action.AddRealTimeData(kafkaData, addRealTimeData, realTimeWarehousing) + err = realTimeWarehousing.Add(&consumer_data.RealTimeWarehousingData{ + Appid: int64(appid), + EventName: kafkaData.EventName, + CreateTime: kafkaData.ReportTime, + Data: kafkaData.ReqData, + }) + if err != nil { logs.Logger.Error("AddRealTimeData err", zap.Error(err)) } @@ -162,7 +144,6 @@ func main() { err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary var kafkaData model.KafkaData err = json.Unmarshal(msg.Value, &kafkaData) if err != nil { @@ -198,7 +179,7 @@ func main() { case model.EventReportType: eventType = "事件属性类型不合法" } - reportAcceptStatus.Add(consumer_data.ReportAcceptStatusData{ + reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ PartDate: kafkaData.ReportTime, TableId: tableId, ReportType: eventType, @@ -230,7 +211,7 @@ func main() { serverT := util.Str2Time(kafkaData.ReportTime, util.TimeFormat) if math.Abs(serverT.Sub(clinetT).Minutes()) > 10 { - reportAcceptStatus.Add(consumer_data.ReportAcceptStatusData{ + reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ PartDate: kafkaData.ReportTime, TableId: tableId, ReportType: kafkaData.GetReportTypeErr(), @@ -266,7 +247,7 @@ func main() { //新增表结构 if err := action.AddTableColumn( kafkaData, - func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(data) }, + func(data consumer_data.ReportAcceptStatusData) { reportAcceptStatus.Add(&data) }, tableName, metric, ); err != nil { @@ -281,7 +262,7 @@ func main() { } //入库成功 - if err := reportAcceptStatus.Add(consumer_data.ReportAcceptStatusData{ + if err := reportAcceptStatus.Add(&consumer_data.ReportAcceptStatusData{ PartDate: kafkaData.ReportTime, TableId: tableId, DataName: kafkaData.EventName, diff --git a/config/config.json b/config/config.json index 582d146..c2a331f 100644 --- a/config/config.json +++ b/config/config.json @@ -65,11 +65,6 @@ "reportData2CKGroup": "reportData2CKGroup2", "realTimeDataGroup": "realTimeDataGroup2" }, - "elasticSearch": { - "Addresses":["http://192.168.1.236:9200"], - "Username":"", - "Password":"" - }, "redis": { "addr":"192.168.1.236:6379", "passwd":"", diff --git a/controller/realdata_controller.go b/controller/realdata_controller.go index 8b39544..904c678 100644 --- a/controller/realdata_controller.go +++ b/controller/realdata_controller.go @@ -2,14 +2,18 @@ package controller import ( "errors" + "github.com/1340691923/xwl_bi/engine/db" + "github.com/1340691923/xwl_bi/engine/logs" "github.com/1340691923/xwl_bi/platform-basic-libs/jwt" "github.com/1340691923/xwl_bi/platform-basic-libs/request" "github.com/1340691923/xwl_bi/platform-basic-libs/response" - "github.com/1340691923/xwl_bi/platform-basic-libs/service/consumer_data" "github.com/1340691923/xwl_bi/platform-basic-libs/service/debug_data" "github.com/1340691923/xwl_bi/platform-basic-libs/service/realdata" + "github.com/1340691923/xwl_bi/platform-basic-libs/util" "github.com/gofiber/fiber/v2" "strconv" + "strings" + "time" ) type RealDataController struct { @@ -21,7 +25,7 @@ func (this RealDataController) List(ctx *fiber.Ctx) error { type ReqData struct { Appid int `json:"appid"` - SearchKw string `json:"search_kw"` + SearchKw string `json:"searchKw"` Date string `json:"date"` } @@ -32,18 +36,42 @@ func (this RealDataController) List(ctx *fiber.Ctx) error { } appid := strconv.Itoa(reqData.Appid) - searchKw := reqData.SearchKw - date := reqData.Date - clientReportData := consumer_data.ClientReportData{ - TableId: appid, + type Res struct { + CreateTime string `json:"create_time" db:"-"` + CreateTimeDb time.Time `json:"-" db:"create_time"` + EventName string `json:"event_name" db:"event_name"` + ReportData string `json:"report_data" db:"report_data"` } - res, err := clientReportData.GetList(ctx.Context(), searchKw, date) - if err != nil { + + filterSql := "" + + date := strings.Split(reqData.Date,",") + + args := []interface{}{appid} + + if len(date) == 2{ + filterSql = filterSql+ ` and create_time >= toDateTime(?) and create_time <=toDateTime(?) ` + args = append(args, date[0],date[1]) + } + logs.Logger.Sugar().Infof("reqData.SearchKw",reqData.SearchKw) + if strings.TrimSpace(reqData.SearchKw)!="" { + filterSql = filterSql+ ` and event_name like '%`+reqData.SearchKw+`%' ` + } + sql := `select report_data,event_name,create_time as create_time from xwl_real_time_warehousing where table_id = ? `+filterSql+` order by create_time desc limit 0,1000;` + logs.Logger.Sugar().Infof("sql",sql,args) + var res []Res + err := db.ClickHouseSqlx.Select(&res,sql, + args..., + ) + if err != nil { return this.Error(ctx, err) } + for index:= range res{ + res[index].CreateTime = res[index].CreateTimeDb.Format(util.TimeFormat) + } - return this.Success(ctx, response.SearchSuccess, map[string]interface{}{"list": res.Hits.Hits}) + return this.Success(ctx, response.SearchSuccess, map[string]interface{}{"list": res}) } //错误数据列表 diff --git a/engine/db/es.go b/engine/db/es.go deleted file mode 100644 index 8f32e96..0000000 --- a/engine/db/es.go +++ /dev/null @@ -1,19 +0,0 @@ -package db - -import "github.com/olivere/elastic" - -var EsClient *elastic.Client - -func NewEsClient(address []string, username, password string) (esClient *elastic.Client, err error) { - optList := []elastic.ClientOptionFunc{elastic.SetSniff(false)} - - optList = append(optList, elastic.SetURL(address...)) - - if username != "" || password != "" { - optList = append(optList, elastic.SetBasicAuth(username, password)) - } - - esClient, err = elastic.NewSimpleClient(optList...) - - return -} diff --git a/model/config.go b/model/config.go index 541a597..83128a7 100644 --- a/model/config.go +++ b/model/config.go @@ -22,7 +22,6 @@ type Config struct { Mysql MysqlConfig `json:"mysql"` ClickHouse ClickHouseConfig `json:"clickhouse"` Kafka KafkaCfg `json:"kafka"` - ElasticSearch EsConfig `json:"elasticSearch"` Redis RedisConfig `json:"redis"` } `json:"comm"` } @@ -50,12 +49,6 @@ type RedisConfig struct { MaxActive int `json:"maxActive"` } -type EsConfig struct { - Addresses []string `json:"addresses"` - Username string `json:"username"` - Password string `json:"password"` -} - type ClickHouseConfig struct { Username string `json:"username"` Pwd string `json:"pwd"` diff --git a/platform-basic-libs/service/consumer_data/client_report_data.go b/platform-basic-libs/service/consumer_data/client_report_data.go deleted file mode 100644 index 7491067..0000000 --- a/platform-basic-libs/service/consumer_data/client_report_data.go +++ /dev/null @@ -1,118 +0,0 @@ -package consumer_data - -import ( - "context" - "fmt" - "github.com/1340691923/xwl_bi/engine/db" - "github.com/1340691923/xwl_bi/engine/logs" - "github.com/olivere/elastic" - "go.uber.org/zap" - - "strings" -) - -type ClientReportData struct { - Data string - TableId string - Date string -} - -func (this *ClientReportData) Name() string { - return "client_report_data" + this.TableId -} - -func (this *ClientReportData) GetReportData() *elastic.BulkIndexRequest { - return elastic.NewBulkIndexRequest().Index(this.CreateReportName()).Type(this.getTyp()).Doc(this.Data) -} - -func (this *ClientReportData) CreateIndex() (err error) { - - indexName := this.CreateReportName() - indexExists, err := db.EsClient.IndexExists(indexName).Do(context.Background()) - - if err != nil { - return - } - if !indexExists { - db.EsClient.CreateIndex(indexName).Body(this.createIndexStr()).Do(context.Background()) - - _, err = db.EsClient.Alias().Add(indexName, this.GetAliasName()).Do(context.Background()) - if err != nil { - logs.Logger.Error("别名创建失败", zap.Error(err)) - } - } - return -} - -//type -func (this *ClientReportData) getTyp() string { - return "_doc" -} - -//index -func (this *ClientReportData) GetAliasName() string { - return fmt.Sprintf("%s%s", this.Name(), "_index") -} - -func (this *ClientReportData) CreateReportName() string { - return fmt.Sprintf("%v_%v", this.Name(), this.Date) -} - -func (this *ClientReportData) GetList(ctx context.Context, searchKw, date string) (*elastic.SearchResult, error) { - - search := db.EsClient.Search(this.GetAliasName()) - - dateArr := strings.Split(date, ",") - - q := elastic.NewBoolQuery() - - if len(dateArr) == 2 { - q = q.Must(elastic.NewRangeQuery("create_time"). - Gte(dateArr[0]). - Lte(dateArr[1]).IncludeLower(false).IncludeUpper(false)) - } - - if searchKw != "" { - q = q.Must(elastic.NewMatchQuery("data", searchKw)) - highlight := elastic.NewHighlight().Field("data").PreTags("").PostTags("").NumOfFragments(0) - search = search.Highlight(highlight) - } - - return search.Query(q).Sort("create_time", false).From(0).Size(1000).Do(ctx) -} - -//创建索引字符串 -func (this *ClientReportData) createIndexStr() string { - s := ` - { - "settings": { - "number_of_replicas": 0, - "number_of_shards": 1 - }, - "mappings" : { - "_doc" : { - "dynamic" : "false", - "properties": { - "event_name": { - "type": "keyword" - }, - "create_time": { - "format": "yyyy-MM-dd HH:mm:ss", - "type": "date" - }, - "data": { - "type": "text", - "analyzer":"english", - "fields": { - "keyword": { - "type": "keyword" - } - } - } - } - } - } - } - ` - return s -} diff --git a/platform-basic-libs/service/consumer_data/real_time_warehousing.go b/platform-basic-libs/service/consumer_data/real_time_warehousing.go index e276d3f..2d65a7e 100644 --- a/platform-basic-libs/service/consumer_data/real_time_warehousing.go +++ b/platform-basic-libs/service/consumer_data/real_time_warehousing.go @@ -1,18 +1,23 @@ package consumer_data import ( - "context" "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" - jsoniter "github.com/json-iterator/go" - "github.com/olivere/elastic" + "github.com/1340691923/xwl_bi/platform-basic-libs/util" "go.uber.org/zap" "sync" "time" ) +type RealTimeWarehousingData struct { + Appid int64 + EventName string + CreateTime string + Data []byte +} + type RealTimeWarehousing struct { - buffer []*elastic.BulkIndexRequest + buffer []*RealTimeWarehousingData bufferMutex *sync.RWMutex batchSize int flushInterval int @@ -20,7 +25,7 @@ type RealTimeWarehousing struct { func NewRealTimeWarehousing(batchSize, flushInterval int) *RealTimeWarehousing { realTimeWarehousing := &RealTimeWarehousing{ - buffer: make([]*elastic.BulkIndexRequest, 0, batchSize), + buffer: make([]*RealTimeWarehousingData, 0, batchSize), bufferMutex: new(sync.RWMutex), batchSize: batchSize, flushInterval: flushInterval, @@ -37,39 +42,47 @@ func (this *RealTimeWarehousing) Flush() (err error) { this.bufferMutex.Lock() if len(this.buffer) > 0 { startNow := time.Now() - var json = jsoniter.ConfigCompatibleWithStandardLibrary - bulkRequest := db.EsClient.Bulk() + + tx, err := db.ClickHouseSqlx.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare("INSERT INTO xwl_real_time_warehousing (table_id,event_name,create_time, report_data) VALUES (?,?,?)") + if err != nil { + return err + } for _, buffer := range this.buffer { - bulkRequest.Add(buffer) - } - res, err := bulkRequest.Do(context.Background()) - - if err != nil { - logs.Logger.Error("ES出现错误,休息10秒钟继续", zap.Error(err)) - time.Sleep(time.Second * 10) - this.Flush() - } else { - if res.Errors { - resStr, _ := json.MarshalToString(res) - logs.Logger.Error("ES出现错误", zap.String("res", resStr)) - } else { - lostTime := time.Now().Sub(startNow).String() - len := len(this.buffer) - if len > 0 { - logs.Logger.Info("ES入库成功", zap.String("所花时间", lostTime), zap.Int("数据长度为", len)) - } - + if _, err := stmt.Exec( + buffer.Appid, + buffer.EventName, + buffer.CreateTime, + util.Bytes2str(buffer.Data), + ); err != nil { + stmt.Close() + return err } } - this.buffer = make([]*elastic.BulkIndexRequest, 0, this.batchSize) + if err := tx.Commit(); err != nil { + logs.Logger.Error("入库数据状态出现错误", zap.Error(err)) + } else { + lostTime := time.Now().Sub(startNow).String() + len := len(this.buffer) + if len > 0 { + logs.Logger.Info("入库数据状态成功", zap.String("所花时间", lostTime), zap.Int("数据长度为", len)) + } + } + stmt.Close() + + this.buffer = make([]*RealTimeWarehousingData, 0, this.batchSize) } this.bufferMutex.Unlock() return nil } -func (this *RealTimeWarehousing) Add(data *elastic.BulkIndexRequest) (err error) { +func (this *RealTimeWarehousing) Add(data *RealTimeWarehousingData) (err error) { this.bufferMutex.Lock() this.buffer = append(this.buffer, data) this.bufferMutex.Unlock() diff --git a/platform-basic-libs/service/consumer_data/report_accpet_status.go b/platform-basic-libs/service/consumer_data/report_accpet_status.go index b038743..d7276dd 100644 --- a/platform-basic-libs/service/consumer_data/report_accpet_status.go +++ b/platform-basic-libs/service/consumer_data/report_accpet_status.go @@ -4,7 +4,6 @@ import ( "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" "go.uber.org/zap" - "log" "sync" "time" ) @@ -22,7 +21,7 @@ type ReportAcceptStatusData struct { } type ReportAcceptStatus struct { - buffer []ReportAcceptStatusData + buffer []*ReportAcceptStatusData bufferMutex *sync.RWMutex batchSize int flushInterval int @@ -33,7 +32,7 @@ const SuccessStatus = 1 func NewReportAcceptStatus(batchSize int, flushInterval int) *ReportAcceptStatus { reportAcceptStatus := &ReportAcceptStatus{ - buffer: make([]ReportAcceptStatusData, 0, batchSize), + buffer: make([]*ReportAcceptStatusData, 0, batchSize), bufferMutex: new(sync.RWMutex), batchSize: batchSize, flushInterval: flushInterval, @@ -58,16 +57,14 @@ func (this *ReportAcceptStatus) Flush() (err error) { tx, err := db.ClickHouseSqlx.Begin() if err != nil { - return + return err } stmt, err := tx.Prepare("INSERT INTO xwl_acceptance_status (status,part_date, table_id,report_type, data_name, error_reason, error_handling, report_data, xwl_kafka_offset) VALUES (?,?,?, ?, ?, ?, ?, ?, ?, ?, ?)") if err != nil { - return + return err } - defer stmt.Close() - for _, buffer := range this.buffer { if _, err := stmt.Exec( buffer.Status, @@ -80,27 +77,27 @@ func (this *ReportAcceptStatus) Flush() (err error) { buffer.ReportData, buffer.XwlKafkaOffset, ); err != nil { - log.Fatal(err) + stmt.Close() + return err } } if err := tx.Commit(); err != nil { logs.Logger.Error("入库数据状态出现错误", zap.Error(err)) } else { - lostTime := time.Now().Sub(startNow).String() len := len(this.buffer) if len > 0 { logs.Logger.Info("入库数据状态成功", zap.String("所花时间", lostTime), zap.Int("数据长度为", len)) } } - - this.buffer = make([]ReportAcceptStatusData, 0, this.batchSize) + stmt.Close() + this.buffer = make([]*ReportAcceptStatusData, 0, this.batchSize) this.bufferMutex.Unlock() return nil } -func (this *ReportAcceptStatus) Add(data ReportAcceptStatusData) (err error) { +func (this *ReportAcceptStatus) Add(data *ReportAcceptStatusData) (err error) { this.bufferMutex.Lock() this.buffer = append(this.buffer, data) this.bufferMutex.Unlock() diff --git a/platform-basic-libs/service/report/report_interface.go b/platform-basic-libs/service/report/report_interface.go index 7fff9a6..4345ff7 100644 --- a/platform-basic-libs/service/report/report_interface.go +++ b/platform-basic-libs/service/report/report_interface.go @@ -59,8 +59,6 @@ func (this *UserReport) InflowOfKakfa() (err error) { msg.Timestamp = time.Now() return sendMsg(msg) - - return } func (this *UserReport) Put() { diff --git a/vue/src/views/manager/components/realData2Es.vue b/vue/src/views/manager/components/realData2Es.vue index 7c095bb..6392a52 100644 --- a/vue/src/views/manager/components/realData2Es.vue +++ b/vue/src/views/manager/components/realData2Es.vue @@ -238,22 +238,19 @@ export default { let list = [] let index = 0 for (const v of res.data.list) { - const _source = v['_source'] - _source['dataFormat'] = JSON.stringify(JSON.parse(v['_source']['data']), null, '\t') + const _source = {} - if (v.hasOwnProperty('highlight')) { - _source['data'] = v['highlight']['data'][0] - } else { - _source['data'] = v['_source']['data'] - } + _source['dataFormat'] = JSON.stringify(JSON.parse(v["report_data"]), null, '\t') + _source['event_name'] = v["event_name"] + _source['create_time'] = v["create_time"] + _source['data'] = v["report_data"] _source['isFormatData'] = false _source['index'] = index - list.push(_source) index++ } list = filterData(list, this.input.trim()) - + console.log("list",list) this.total = list.length this.list = list this.trueList = list