2022-01-26 16:40:50 +08:00
|
|
|
|
package consumer_data
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"github.com/1340691923/xwl_bi/engine/db"
|
|
|
|
|
"github.com/1340691923/xwl_bi/engine/logs"
|
|
|
|
|
model2 "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/model"
|
|
|
|
|
parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse"
|
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var TableColumnMap sync.Map
|
|
|
|
|
|
|
|
|
|
type ReportData2CK struct {
|
|
|
|
|
buffer []map[string]*parser.FastjsonMetric
|
|
|
|
|
bufferMutex *sync.RWMutex
|
|
|
|
|
batchSize int
|
|
|
|
|
flushInterval int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK {
|
|
|
|
|
logs.Logger.Info("NewReportData2CK", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval))
|
|
|
|
|
reportData2CK := &ReportData2CK{
|
|
|
|
|
buffer: make([]map[string]*parser.FastjsonMetric, 0, batchSize),
|
|
|
|
|
bufferMutex: new(sync.RWMutex),
|
|
|
|
|
batchSize: batchSize,
|
|
|
|
|
flushInterval: flushInterval,
|
|
|
|
|
}
|
|
|
|
|
if flushInterval > 0 {
|
|
|
|
|
reportData2CK.RegularFlushing()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return reportData2CK
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-03 15:50:32 +08:00
|
|
|
|
|
2022-01-26 16:40:50 +08:00
|
|
|
|
func (this *ReportData2CK) Flush() (err error) {
|
|
|
|
|
this.bufferMutex.Lock()
|
2022-03-01 11:44:45 +08:00
|
|
|
|
if len(this.buffer)==0{
|
|
|
|
|
this.bufferMutex.Unlock()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2022-01-26 16:40:50 +08:00
|
|
|
|
startNow := time.Now()
|
|
|
|
|
|
|
|
|
|
rowsMap := map[string][][]interface{}{}
|
2022-03-04 09:52:18 +08:00
|
|
|
|
rowArr := []interface{}{}
|
|
|
|
|
rows := [][]interface{}{}
|
2022-03-03 15:50:32 +08:00
|
|
|
|
for bufferIndex := range this.buffer {
|
|
|
|
|
for tableName := range this.buffer[bufferIndex] {
|
2022-03-04 09:52:18 +08:00
|
|
|
|
rows := rows[0:0]
|
2022-01-26 16:40:50 +08:00
|
|
|
|
if _, haveKey := rowsMap[tableName]; haveKey {
|
|
|
|
|
rows = rowsMap[tableName]
|
|
|
|
|
} else {
|
|
|
|
|
rowsMap[tableName] = rows
|
|
|
|
|
}
|
2022-03-04 09:52:18 +08:00
|
|
|
|
dims, _ := TableColumnMap.Load(tableName)
|
|
|
|
|
rowArr = rowArr[0:0]
|
|
|
|
|
for _, dim := range dims.([]*model2.ColumnWithType) {
|
2022-03-03 15:50:32 +08:00
|
|
|
|
val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim)
|
2022-01-26 16:40:50 +08:00
|
|
|
|
rowArr = append(rowArr, val)
|
|
|
|
|
}
|
|
|
|
|
rows = append(rows, rowArr)
|
|
|
|
|
rowsMap[tableName] = rows
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-03 19:16:25 +08:00
|
|
|
|
bytesbuffer:=bytes.Buffer{}
|
2022-01-26 16:40:50 +08:00
|
|
|
|
|
2022-03-04 09:52:18 +08:00
|
|
|
|
TableColumnMap.Range(func(tableName, value interface{}) bool {
|
2022-01-26 16:40:50 +08:00
|
|
|
|
|
2022-03-04 09:52:18 +08:00
|
|
|
|
if _, haveKey := rowsMap[tableName.(string)]; haveKey {
|
2022-01-26 16:40:50 +08:00
|
|
|
|
|
|
|
|
|
seriesDims := value.([]*model2.ColumnWithType)
|
|
|
|
|
serDimsQuoted := make([]string, len(seriesDims))
|
|
|
|
|
params := make([]string, len(seriesDims))
|
2022-03-03 15:50:32 +08:00
|
|
|
|
|
2022-01-26 16:40:50 +08:00
|
|
|
|
for i, serDim := range seriesDims {
|
2022-03-03 19:16:25 +08:00
|
|
|
|
serDimsQuoted[i] ="`"+serDim.Name+"`"
|
2022-01-26 16:40:50 +08:00
|
|
|
|
params[i] = "?"
|
|
|
|
|
}
|
|
|
|
|
|
2022-03-03 15:50:32 +08:00
|
|
|
|
bytesbuffer.WriteString("INSERT INTO ")
|
2022-03-04 09:52:18 +08:00
|
|
|
|
bytesbuffer.WriteString(tableName.(string))
|
2022-03-03 15:50:32 +08:00
|
|
|
|
bytesbuffer.WriteString(" (")
|
|
|
|
|
bytesbuffer.WriteString(strings.Join(serDimsQuoted, ","))
|
|
|
|
|
bytesbuffer.WriteString(") ")
|
|
|
|
|
bytesbuffer.WriteString("VALUES (")
|
|
|
|
|
bytesbuffer.WriteString(strings.Join(params, ","))
|
|
|
|
|
bytesbuffer.WriteString(")")
|
|
|
|
|
|
2022-03-04 09:52:18 +08:00
|
|
|
|
defer func() {
|
|
|
|
|
bytesbuffer.Reset()
|
|
|
|
|
}()
|
|
|
|
|
|
2022-01-26 16:40:50 +08:00
|
|
|
|
tx, err := db.ClickHouseSqlx.Begin()
|
|
|
|
|
if err != nil {
|
|
|
|
|
logs.Logger.Error("CK入库失败", zap.Error(err))
|
|
|
|
|
return false
|
|
|
|
|
}
|
2022-03-04 09:52:18 +08:00
|
|
|
|
stmt, err := tx.Prepare(bytesbuffer.String())
|
2022-01-26 16:40:50 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
logs.Logger.Error("CK入库失败", zap.Error(err))
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
defer stmt.Close()
|
|
|
|
|
haveFail := false
|
2022-03-04 09:52:18 +08:00
|
|
|
|
for _, row := range rowsMap[tableName.(string)] {
|
2022-01-26 16:40:50 +08:00
|
|
|
|
if _, err := stmt.Exec(row...); err != nil {
|
|
|
|
|
logs.Logger.Error("CK入库失败", zap.Error(err))
|
|
|
|
|
haveFail = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if !haveFail {
|
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
|
|
|
logs.Logger.Error("CK入库失败", zap.Error(err))
|
|
|
|
|
return false
|
|
|
|
|
} else {
|
|
|
|
|
len := len(this.buffer)
|
2022-03-04 09:52:18 +08:00
|
|
|
|
logs.Logger.Info("CK入库成功,", zap.String("所花时间", time.Now().Sub(startNow).String()), zap.Int("数据长度为", len))
|
2022-01-26 16:40:50 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
this.buffer = make([]map[string]*parser.FastjsonMetric, 0, this.batchSize)
|
|
|
|
|
this.bufferMutex.Unlock()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (this *ReportData2CK) Add(data map[string]*parser.FastjsonMetric) (err error) {
|
|
|
|
|
this.bufferMutex.Lock()
|
|
|
|
|
this.buffer = append(this.buffer, data)
|
|
|
|
|
this.bufferMutex.Unlock()
|
|
|
|
|
|
|
|
|
|
if this.getBufferLength() >= this.batchSize {
|
|
|
|
|
err := this.Flush()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (this *ReportData2CK) getBufferLength() int {
|
|
|
|
|
this.bufferMutex.RLock()
|
|
|
|
|
defer this.bufferMutex.RUnlock()
|
|
|
|
|
return len(this.buffer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (this *ReportData2CK) FlushAll() error {
|
|
|
|
|
for this.getBufferLength() > 0 {
|
|
|
|
|
if err := this.Flush(); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (this *ReportData2CK) RegularFlushing() {
|
|
|
|
|
go func() {
|
|
|
|
|
ticker := time.NewTicker(time.Duration(this.flushInterval) * time.Second)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
for {
|
|
|
|
|
<-ticker.C
|
|
|
|
|
this.Flush()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|