53 lines
1.5 KiB
Go
53 lines
1.5 KiB
Go
package kafka
|
||
|
||
import (
|
||
"fmt"
|
||
"github.com/1340691923/xwl_bi/model"
|
||
"github.com/Shopify/sarama"
|
||
"log"
|
||
"time"
|
||
)
|
||
|
||
//初始化kafka数据
|
||
func Init() {
|
||
config := sarama.NewConfig()
|
||
|
||
config.Version = sarama.V2_0_0_0
|
||
if model.GlobConfig.Comm.Kafka.Username != "" {
|
||
config.Net.SASL.Enable = true
|
||
config.Net.SASL.User = model.GlobConfig.Comm.Kafka.Username
|
||
config.Net.SASL.Password = model.GlobConfig.Comm.Kafka.Password
|
||
config.Net.SASL.Handshake = true
|
||
}
|
||
|
||
config.Consumer.Group.Session.Timeout = 15 * time.Second
|
||
config.Consumer.Group.Heartbeat.Interval = 5 * time.Second
|
||
|
||
conn, err := sarama.NewClusterAdmin(model.GlobConfig.Comm.Kafka.Addresses, config)
|
||
if err != nil {
|
||
log.Println(fmt.Sprintf("kafka 链接初始化失败:%s", err.Error()))
|
||
panic(err)
|
||
}
|
||
s, err := conn.ListTopics()
|
||
for topic := range s {
|
||
log.Println("您所拥有的TOPIC为:", topic)
|
||
}
|
||
|
||
if _, ok := s[model.GlobConfig.Comm.Kafka.ReportTopicName]; !ok {
|
||
detail := sarama.TopicDetail{NumPartitions: model.GlobConfig.Comm.Kafka.NumPartitions, ReplicationFactor: 1}
|
||
err = conn.CreateTopic(model.GlobConfig.Comm.Kafka.ReportTopicName, &detail, false)
|
||
if err != nil {
|
||
log.Println("创建TOPIC失败!", model.GlobConfig.Comm.Kafka.ReportTopicName)
|
||
panic(err)
|
||
}
|
||
|
||
err = conn.Close()
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
log.Println("初始化TOPIC完成!", model.GlobConfig.Comm.Kafka.ReportTopicName)
|
||
} else {
|
||
log.Println("您已拥有该TOPIC:", model.GlobConfig.Comm.Kafka.ReportTopicName)
|
||
}
|
||
}
|