diff --git a/cmd/sinker/main.go b/cmd/sinker/main.go index 4570105..751e6ce 100644 --- a/cmd/sinker/main.go +++ b/cmd/sinker/main.go @@ -104,9 +104,11 @@ func main() { log.Println(fmt.Sprintf("sinker 服务启动成功,性能检测入口为: http://127.0.0.1:%v", model.GlobConfig.Sinker.PprofHttpPort)) - realTimeWarehousing := consumer_data.NewRealTimeWarehousing(model.GlobConfig.Sinker.RealTimeWarehousing.BufferSize, model.GlobConfig.Sinker.RealTimeWarehousing.FlushInterval) - reportAcceptStatus := consumer_data.NewReportAcceptStatus(model.GlobConfig.Sinker.ReportAcceptStatus.BufferSize, model.GlobConfig.Sinker.ReportAcceptStatus.FlushInterval) - reportData2CK := consumer_data.NewReportData2CK(model.GlobConfig.Sinker.ReportData2CK.BufferSize, model.GlobConfig.Sinker.ReportData2CK.FlushInterval) + sinkerC := model.GlobConfig.Sinker + + realTimeWarehousing := consumer_data.NewRealTimeWarehousing(sinkerC.RealTimeWarehousing) + reportAcceptStatus := consumer_data.NewReportAcceptStatus(sinkerC.ReportAcceptStatus) + reportData2CK := consumer_data.NewReportData2CK(sinkerC.ReportData2CK) realTimeDataSarama := sinker.NewKafkaSarama() reportData2CKSarama := realTimeDataSarama.Clone() @@ -114,8 +116,11 @@ func main() { go sinker.ClearDimsCacheByTime(time.Minute * 30) var json = jsoniter.ConfigCompatibleWithStandardLibrary - err = realTimeDataSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.RealTimeDataGroup, func(msg model.InputMessage, markFn func()) { - + err = realTimeDataSarama.Init( + model.GlobConfig.Comm.Kafka, + model.GlobConfig.Comm.Kafka.ReportTopicName, + model.GlobConfig.Comm.Kafka.RealTimeDataGroup, + func(msg model.InputMessage, markFn func()) { //ETL var kafkaData model.KafkaData err = json.Unmarshal(msg.Value, &kafkaData) @@ -154,7 +159,11 @@ func main() { panic(err) } - err = reportData2CKSarama.Init(model.GlobConfig.Comm.Kafka, model.GlobConfig.Comm.Kafka.ReportTopicName, model.GlobConfig.Comm.Kafka.ReportData2CKGroup, func(msg model.InputMessage, markFn func()) { + err = reportData2CKSarama.Init( + model.GlobConfig.Comm.Kafka, + model.GlobConfig.Comm.Kafka.ReportTopicName, + model.GlobConfig.Comm.Kafka.ReportData2CKGroup, + func(msg model.InputMessage, markFn func()) { var kafkaData model.KafkaData err = json.Unmarshal(msg.Value, &kafkaData) diff --git a/platform-basic-libs/service/consumer_data/real_time_warehousing.go b/platform-basic-libs/service/consumer_data/real_time_warehousing.go index 0da35e3..b6fb3af 100644 --- a/platform-basic-libs/service/consumer_data/real_time_warehousing.go +++ b/platform-basic-libs/service/consumer_data/real_time_warehousing.go @@ -3,6 +3,7 @@ package consumer_data import ( "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" + "github.com/1340691923/xwl_bi/model" "github.com/1340691923/xwl_bi/platform-basic-libs/util" "go.uber.org/zap" "sync" @@ -23,16 +24,16 @@ type RealTimeWarehousing struct { flushInterval int } -func NewRealTimeWarehousing(batchSize, flushInterval int) *RealTimeWarehousing { - logs.Logger.Info("NewRealTimeWarehousing", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval)) +func NewRealTimeWarehousing(config model.BatchConfig) *RealTimeWarehousing { + logs.Logger.Info("NewRealTimeWarehousing", zap.Int("batchSize", config.BufferSize), zap.Int("flushInterval", config.FlushInterval)) realTimeWarehousing := &RealTimeWarehousing{ - buffer: make([]*RealTimeWarehousingData, 0, batchSize), + buffer: make([]*RealTimeWarehousingData, 0, config.BufferSize), bufferMutex: new(sync.RWMutex), - batchSize: batchSize, - flushInterval: flushInterval, + batchSize: config.BufferSize, + flushInterval: config.FlushInterval, } - if flushInterval > 0 { + if config.FlushInterval > 0 { realTimeWarehousing.RegularFlushing() } diff --git a/platform-basic-libs/service/consumer_data/report_accpet_status.go b/platform-basic-libs/service/consumer_data/report_accpet_status.go index 8f6995d..37ec8a9 100644 --- a/platform-basic-libs/service/consumer_data/report_accpet_status.go +++ b/platform-basic-libs/service/consumer_data/report_accpet_status.go @@ -3,6 +3,7 @@ package consumer_data import ( "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" + "github.com/1340691923/xwl_bi/model" "go.uber.org/zap" "sync" "time" @@ -32,16 +33,16 @@ const ( SuccessStatus = 1 ) -func NewReportAcceptStatus(batchSize int, flushInterval int) *ReportAcceptStatus { - logs.Logger.Info("NewReportAcceptStatus", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval)) +func NewReportAcceptStatus(config model.BatchConfig) *ReportAcceptStatus { + logs.Logger.Info("NewReportAcceptStatus", zap.Int("batchSize", config.BufferSize), zap.Int("flushInterval", config.FlushInterval)) reportAcceptStatus := &ReportAcceptStatus{ - buffer: make([]*ReportAcceptStatusData, 0, batchSize), + buffer: make([]*ReportAcceptStatusData, 0, config.BufferSize), bufferMutex: new(sync.RWMutex), - batchSize: batchSize, - flushInterval: flushInterval, + batchSize: config.BufferSize, + flushInterval: config.FlushInterval, } - if flushInterval > 0 { + if config.FlushInterval > 0 { reportAcceptStatus.RegularFlushing() } diff --git a/platform-basic-libs/service/consumer_data/reportdata2ck.go b/platform-basic-libs/service/consumer_data/reportdata2ck.go index 054d346..f8d1b2e 100644 --- a/platform-basic-libs/service/consumer_data/reportdata2ck.go +++ b/platform-basic-libs/service/consumer_data/reportdata2ck.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/1340691923/xwl_bi/engine/db" "github.com/1340691923/xwl_bi/engine/logs" + "github.com/1340691923/xwl_bi/model" model2 "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/model" parser "github.com/1340691923/xwl_bi/platform-basic-libs/sinker/parse" "go.uber.org/zap" @@ -21,15 +22,15 @@ type ReportData2CK struct { flushInterval int } -func NewReportData2CK(batchSize int, flushInterval int) *ReportData2CK { - logs.Logger.Info("NewReportData2CK", zap.Int("batchSize", batchSize), zap.Int("flushInterval", flushInterval)) +func NewReportData2CK(config model.BatchConfig) *ReportData2CK { + logs.Logger.Info("NewReportData2CK", zap.Int("batchSize", config.BufferSize), zap.Int("flushInterval", config.FlushInterval)) reportData2CK := &ReportData2CK{ - buffer: make([]map[string]*parser.FastjsonMetric, 0, batchSize), + buffer: make([]map[string]*parser.FastjsonMetric, 0, config.BufferSize), bufferMutex: new(sync.RWMutex), - batchSize: batchSize, - flushInterval: flushInterval, + batchSize: config.BufferSize, + flushInterval: config.FlushInterval, } - if flushInterval > 0 { + if config.FlushInterval > 0 { reportData2CK.RegularFlushing() }