This commit is contained in:
1340691923@qq.com 2022-03-04 12:09:14 +08:00
parent 324bea4b23
commit b4650a18b7

View File

@ -46,22 +46,24 @@ func (this *ReportData2CK) Flush() (err error) {
startNow := time.Now() startNow := time.Now()
rowsMap := map[string][][]interface{}{} rowsMap := map[string][][]interface{}{}
rowArr := []interface{}{}
rows := [][]interface{}{}
for bufferIndex := range this.buffer { for bufferIndex := range this.buffer {
for tableName := range this.buffer[bufferIndex] { for tableName := range this.buffer[bufferIndex] {
rows := rows[0:0] rowArr := []interface{}{}
rows := [][]interface{}{}
if _, haveKey := rowsMap[tableName]; haveKey { if _, haveKey := rowsMap[tableName]; haveKey {
rows = rowsMap[tableName] rows = rowsMap[tableName]
} else { } else {
rowsMap[tableName] = rows rowsMap[tableName] = rows
} }
dims, _ := TableColumnMap.Load(tableName) dims, _ := TableColumnMap.Load(tableName)
rowArr = rowArr[0:0]
for _, dim := range dims.([]*model2.ColumnWithType) { for _, dim := range dims.([]*model2.ColumnWithType) {
val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim) val := parser.GetValueByType(this.buffer[bufferIndex][tableName], dim)
logs.Logger.Sugar().Errorf("dim.SourceName",dim.SourceName,val)
rowArr = append(rowArr, val) rowArr = append(rowArr, val)
} }
rows = append(rows, rowArr) rows = append(rows, rowArr)
rowsMap[tableName] = rows rowsMap[tableName] = rows
} }
@ -90,17 +92,16 @@ func (this *ReportData2CK) Flush() (err error) {
bytesbuffer.WriteString("VALUES (") bytesbuffer.WriteString("VALUES (")
bytesbuffer.WriteString(strings.Join(params, ",")) bytesbuffer.WriteString(strings.Join(params, ","))
bytesbuffer.WriteString(")") bytesbuffer.WriteString(")")
insertSql := bytesbuffer.String()
defer func() { bytesbuffer.Reset()
bytesbuffer.Reset()
}()
tx, err := db.ClickHouseSqlx.Begin() tx, err := db.ClickHouseSqlx.Begin()
if err != nil { if err != nil {
logs.Logger.Error("CK入库失败", zap.Error(err)) logs.Logger.Error("CK入库失败", zap.Error(err))
return false return false
} }
stmt, err := tx.Prepare(bytesbuffer.String())
stmt, err := tx.Prepare(insertSql)
if err != nil { if err != nil {
logs.Logger.Error("CK入库失败", zap.Error(err)) logs.Logger.Error("CK入库失败", zap.Error(err))
return false return false
@ -108,13 +109,17 @@ func (this *ReportData2CK) Flush() (err error) {
defer stmt.Close() defer stmt.Close()
haveFail := false haveFail := false
for _, row := range rowsMap[tableName.(string)] { for _, row := range rowsMap[tableName.(string)] {
logs.Logger.Sugar().Errorf("bytesbuffer.String()",insertSql)
if _, err := stmt.Exec(row...); err != nil { if _, err := stmt.Exec(row...); err != nil {
logs.Logger.Error("CK入库失败", zap.Error(err)) logs.Logger.Error("CK入库失败", zap.Error(err))
haveFail = true haveFail = true
} }
logs.Logger.Sugar().Errorf("bytesbuffer.String() args",row...)
} }
if !haveFail { if !haveFail {
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
logs.Logger.Error("CK入库失败", zap.Error(err)) logs.Logger.Error("CK入库失败", zap.Error(err))
return false return false
} else { } else {