uniugm/admin/apps/game/domain/repo/gamelog_querier.go
2025-07-22 09:37:37 +08:00

222 lines
6.3 KiB
Go

package repo
import (
"admin/internal/errcode"
"admin/internal/model/dto"
"admin/lib/xlog"
"context"
"encoding/json"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/jmoiron/sqlx"
"strconv"
"strings"
"time"
)
type EventListQuerier struct {
db driver.Conn
clickHouseSqlx *sqlx.DB
tableName string
eventName []string
attrList []*AttrInfo
serverId int
roleId string
account string
}
// CondRoleId 添加角色信息查询条件
func (querier *EventListQuerier) CondRoleId(serverId int, roleId string) *EventListQuerier {
querier.serverId = serverId
querier.roleId = roleId
return querier
}
func (querier *EventListQuerier) CondAccount(serverId int, account string) *EventListQuerier {
querier.serverId = serverId
querier.account = account
return querier
}
const (
TypeUnknown = iota
Int
Float
String
DateTime
ElasticDateTime
IntArray
FloatArray
StringArray
DateTimeArray
)
func (querier *EventListQuerier) Go(ctx context.Context, pageNo int, pageLen int, dateStart, dateEnd time.Time) (
totalCount int, fieldsDescInfo []*dto.GameLogFieldInfo, rows [][]any, err error) {
// 生成sql
sql, countSql, sqlArgs := querier.genSql(querier.tableName, pageNo, pageLen, dateStart, dateEnd)
// 查询总数量和数据
totalCount, fieldsDescInfo, rows, err = querier.query(sql, countSql, sqlArgs)
if err != nil {
return
}
if xlog.GetLogLevel() <= xlog.LogLevelDebug {
argsBin, _ := json.Marshal(&sqlArgs)
rowsBin, _ := json.Marshal(&rows)
xlog.Debugf("query sql:%v with args:%v, rows result:%v", sql, string(argsBin), string(rowsBin))
}
return
}
func (querier *EventListQuerier) genSql(tableName string, pageNo int, pageLen int, dateStart, dateEnd time.Time) (
string, string, []any) {
querier.tableName = tableName
sql := "select "
countSql := "select count(1) "
cols := make([]string, 0, len(querier.attrList))
for _, attr := range querier.attrList {
cols = append(cols, attr.Name)
}
sql += strings.Join(cols, ",")
sql += " from " + querier.tableName + " where "
countSql += " from " + querier.tableName + " where "
whereList := make([]string, 0)
whereArgs := make([]any, 0)
if len(querier.eventName) != 0 {
if len(querier.eventName) == 1 {
whereList = append(whereList, "`xwl_part_event`=?")
whereArgs = append(whereArgs, querier.eventName[0])
} else {
eventNameWhereSql := make([]string, 0, len(querier.eventName))
for _, v := range querier.eventName {
eventNameWhereSql = append(eventNameWhereSql, "`xwl_part_event`=?")
whereArgs = append(whereArgs, v)
}
whereList = append(whereList, "("+strings.Join(eventNameWhereSql, " or ")+")")
}
}
if querier.serverId != 0 {
whereList = append(whereList, "`pub_serverid`=?")
whereArgs = append(whereArgs, querier.serverId)
}
if querier.roleId != "" {
whereList = append(whereList, "`xwl_distinct_id`=?")
whereArgs = append(whereArgs, strconv.Itoa(querier.serverId)+"-"+querier.roleId)
}
if querier.account != "" {
whereList = append(whereList, "`pub_userid`=?")
whereArgs = append(whereArgs, querier.account)
}
whereList = append(whereList, "xwl_part_date >= toDateTime(?)")
whereList = append(whereList, "xwl_part_date <= toDateTime(?)")
whereArgs = append(whereArgs, dateStart.Format("2006-01-02 15:04:05"))
whereArgs = append(whereArgs, dateEnd.Format("2006-01-02 15:04:05"))
sql += strings.Join(whereList, " and ")
countSql += strings.Join(whereList, " and ")
limitStart := (pageNo - 1) * pageLen
limitLen := pageLen
sql += fmt.Sprintf(" order by xwl_part_date desc limit %v,%v", limitStart, limitLen)
return sql, countSql, whereArgs
}
func (querier *EventListQuerier) query(sql string, countSql string, args []any) (
totalCount int, fieldsDescInfo []*dto.GameLogFieldInfo, rows [][]any, err error) {
rawRows, err := querier.clickHouseSqlx.Query(sql, args...)
if err != nil {
argsBin, _ := json.Marshal(&args)
return 0, nil, nil, errcode.New(errcode.DBError, "query sql:%v args:%v, error:%v",
sql, string(argsBin), err)
}
defer rawRows.Close()
columns, err := rawRows.Columns()
if err != nil {
return 0, nil, nil, errcode.New(errcode.DBError, "query sql:%v read rows columns error:%v", sql, err)
}
columnLength := len(columns)
readCacheRow := make([]any, 0, columnLength)
if columnLength != len(querier.attrList) {
colsBin, _ := json.Marshal(&columns)
selectColsBin, _ := json.Marshal(&querier.attrList)
xlog.Warnf("queyr table %v with sql:%v, args:%+v, result cols len:%v not equal to select:%v",
querier.tableName, sql, args, string(colsBin), string(selectColsBin))
// 修正一下查询结果,以数据库实际读取为准
for _, field := range columns {
for _, selectField := range querier.attrList {
if field == selectField.Name {
fieldDesc := &dto.GameLogFieldInfo{
Name: selectField.Name,
Alias: selectField.Name,
IsPublicField: false,
}
if selectField.Alias != "" {
fieldDesc.Alias = selectField.Alias
}
fieldsDescInfo = append(fieldsDescInfo, fieldDesc)
var v any
readCacheRow = append(readCacheRow, &v)
break
}
}
}
} else {
for _, field := range querier.attrList {
fieldDesc := &dto.GameLogFieldInfo{
Name: field.Name,
Alias: field.Name,
IsPublicField: false,
FieldType: field.DataType,
}
if field.Alias != "" {
fieldDesc.Alias = field.Alias
}
fieldsDescInfo = append(fieldsDescInfo, fieldDesc)
var v any
readCacheRow = append(readCacheRow, &v)
}
}
for rawRows.Next() {
err = rawRows.Scan(readCacheRow...)
if err != nil {
return 0, nil, nil, errcode.New(errcode.DBError, "sql:%v result scan row error:%v", sql, err)
}
if xlog.GetLogLevel() <= xlog.LogLevelTrace {
argsBin, _ := json.Marshal(&args)
cacheRowBin, _ := json.Marshal(&readCacheRow)
xlog.Tracef("query sql:%v with args:%+v result row:%v", sql, string(argsBin), string(cacheRowBin))
}
parsedRow := make([]any, len(readCacheRow))
for i := range readCacheRow {
parsedRow[i] = *(readCacheRow[i].(*interface{}))
}
rows = append(rows, parsedRow)
}
count := 0
err = querier.clickHouseSqlx.QueryRow(countSql, args...).Scan(&count)
if err != nil {
return 0, nil, nil, errcode.New(errcode.DBError, "query count sql:%v error:%v", countSql, err)
}
return count, fieldsDescInfo, rows, nil
}