package repo import ( "admin/internal/errcode" "admin/internal/model/dto" "admin/lib/xlog" "context" "encoding/json" "fmt" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/jmoiron/sqlx" "strconv" "strings" "time" ) type EventListQuerier struct { db driver.Conn clickHouseSqlx *sqlx.DB tableName string eventName []string attrList []*AttrInfo serverId int roleId string account string } // CondRoleId 添加角色信息查询条件 func (querier *EventListQuerier) CondRoleId(serverId int, roleId string) *EventListQuerier { querier.serverId = serverId querier.roleId = roleId return querier } func (querier *EventListQuerier) CondAccount(serverId int, account string) *EventListQuerier { querier.serverId = serverId querier.account = account return querier } const ( TypeUnknown = iota Int Float String DateTime ElasticDateTime IntArray FloatArray StringArray DateTimeArray ) func (querier *EventListQuerier) Go(ctx context.Context, pageNo int, pageLen int, dateStart, dateEnd time.Time) ( totalCount int, fieldsDescInfo []*dto.GameLogFieldInfo, rows [][]any, err error) { // 生成sql sql, countSql, sqlArgs := querier.genSql(querier.tableName, pageNo, pageLen, dateStart, dateEnd) // 查询总数量和数据 totalCount, fieldsDescInfo, rows, err = querier.query(sql, countSql, sqlArgs) if err != nil { return } if xlog.GetLogLevel() <= xlog.LogLevelDebug { argsBin, _ := json.Marshal(&sqlArgs) rowsBin, _ := json.Marshal(&rows) xlog.Debugf("query sql:%v with args:%v, rows result:%v", sql, string(argsBin), string(rowsBin)) } return } func (querier *EventListQuerier) genSql(tableName string, pageNo int, pageLen int, dateStart, dateEnd time.Time) ( string, string, []any) { querier.tableName = tableName sql := "select " countSql := "select count(1) " cols := make([]string, 0, len(querier.attrList)) for _, attr := range querier.attrList { cols = append(cols, attr.Name) } sql += strings.Join(cols, ",") sql += " from " + querier.tableName + " where " countSql += " from " + querier.tableName + " where " whereList := make([]string, 0) whereArgs := make([]any, 0) if len(querier.eventName) != 0 { if len(querier.eventName) == 1 { whereList = append(whereList, "`xwl_part_event`=?") whereArgs = append(whereArgs, querier.eventName[0]) } else { eventNameWhereSql := make([]string, 0, len(querier.eventName)) for _, v := range querier.eventName { eventNameWhereSql = append(eventNameWhereSql, "`xwl_part_event`=?") whereArgs = append(whereArgs, v) } whereList = append(whereList, "("+strings.Join(eventNameWhereSql, " or ")+")") } } if querier.serverId != 0 { whereList = append(whereList, "`pub_serverid`=?") whereArgs = append(whereArgs, querier.serverId) } if querier.roleId != "" { whereList = append(whereList, "`xwl_distinct_id`=?") whereArgs = append(whereArgs, strconv.Itoa(querier.serverId)+"-"+querier.roleId) } if querier.account != "" { whereList = append(whereList, "`pub_userid`=?") whereArgs = append(whereArgs, querier.account) } whereList = append(whereList, "xwl_part_date >= toDateTime(?)") whereList = append(whereList, "xwl_part_date <= toDateTime(?)") whereArgs = append(whereArgs, dateStart.Format("2006-01-02 15:04:05")) whereArgs = append(whereArgs, dateEnd.Format("2006-01-02 15:04:05")) sql += strings.Join(whereList, " and ") countSql += strings.Join(whereList, " and ") limitStart := (pageNo - 1) * pageLen limitLen := pageLen sql += fmt.Sprintf(" order by xwl_part_date desc limit %v,%v", limitStart, limitLen) return sql, countSql, whereArgs } func (querier *EventListQuerier) query(sql string, countSql string, args []any) ( totalCount int, fieldsDescInfo []*dto.GameLogFieldInfo, rows [][]any, err error) { rawRows, err := querier.clickHouseSqlx.Query(sql, args...) if err != nil { argsBin, _ := json.Marshal(&args) return 0, nil, nil, errcode.New(errcode.DBError, "query sql:%v args:%v, error:%v", sql, string(argsBin), err) } defer rawRows.Close() columns, err := rawRows.Columns() if err != nil { return 0, nil, nil, errcode.New(errcode.DBError, "query sql:%v read rows columns error:%v", sql, err) } columnLength := len(columns) readCacheRow := make([]any, 0, columnLength) if columnLength != len(querier.attrList) { colsBin, _ := json.Marshal(&columns) selectColsBin, _ := json.Marshal(&querier.attrList) xlog.Warnf("queyr table %v with sql:%v, args:%+v, result cols len:%v not equal to select:%v", querier.tableName, sql, args, string(colsBin), string(selectColsBin)) // 修正一下查询结果,以数据库实际读取为准 for _, field := range columns { for _, selectField := range querier.attrList { if field == selectField.Name { fieldDesc := &dto.GameLogFieldInfo{ Name: selectField.Name, Alias: selectField.Name, IsPublicField: false, } if selectField.Alias != "" { fieldDesc.Alias = selectField.Alias } fieldsDescInfo = append(fieldsDescInfo, fieldDesc) var v any readCacheRow = append(readCacheRow, &v) break } } } } else { for _, field := range querier.attrList { fieldDesc := &dto.GameLogFieldInfo{ Name: field.Name, Alias: field.Name, IsPublicField: false, FieldType: field.DataType, } if field.Alias != "" { fieldDesc.Alias = field.Alias } fieldsDescInfo = append(fieldsDescInfo, fieldDesc) var v any readCacheRow = append(readCacheRow, &v) } } for rawRows.Next() { err = rawRows.Scan(readCacheRow...) if err != nil { return 0, nil, nil, errcode.New(errcode.DBError, "sql:%v result scan row error:%v", sql, err) } if xlog.GetLogLevel() <= xlog.LogLevelTrace { argsBin, _ := json.Marshal(&args) cacheRowBin, _ := json.Marshal(&readCacheRow) xlog.Tracef("query sql:%v with args:%+v result row:%v", sql, string(argsBin), string(cacheRowBin)) } parsedRow := make([]any, len(readCacheRow)) for i := range readCacheRow { parsedRow[i] = *(readCacheRow[i].(*interface{})) } rows = append(rows, parsedRow) } count := 0 err = querier.clickHouseSqlx.QueryRow(countSql, args...).Scan(&count) if err != nil { return 0, nil, nil, errcode.New(errcode.DBError, "query count sql:%v error:%v", countSql, err) } return count, fieldsDescInfo, rows, nil }