bi/platform-basic-libs/sinker/kafka_sarama.go
1340691923@qq.com ebbf4120bf 第一次提交
2022-01-26 16:40:50 +08:00

148 lines
3.5 KiB
Go

package sinker
import (
"context"
"github.com/1340691923/xwl_bi/engine/logs"
"github.com/1340691923/xwl_bi/model"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type KafkaSarama struct {
topic string
cfg model.KafkaCfg
cg sarama.ConsumerGroup
sess sarama.ConsumerGroupSession
ctx context.Context
cancel context.CancelFunc
wgRun sync.WaitGroup
putFn func(msg model.InputMessage, markFn func())
cleanupFn func()
}
func NewKafkaSarama() *KafkaSarama {
return &KafkaSarama{}
}
func (k *KafkaSarama) Clone() *KafkaSarama {
return &KafkaSarama{}
}
type MyConsumerGroupHandler struct {
k *KafkaSarama
}
func (h MyConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
h.k.sess = sess
return nil
}
func (h MyConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
begin := time.Now()
h.k.cleanupFn()
logs.Logger.Info("consumer group cleanup",
zap.Int32("generation id", h.k.sess.GenerationID()),
zap.Duration("cost", time.Since(begin)))
return nil
}
func (h MyConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
h.k.putFn(model.InputMessage{
Topic: msg.Topic,
Partition: int(msg.Partition),
Key: msg.Key,
Value: msg.Value,
Offset: msg.Offset,
Timestamp: &msg.Timestamp,
}, func() {
sess.MarkMessage(msg, "")
})
}
return nil
}
func (k *KafkaSarama) Init(cfg model.KafkaCfg, topicName, consumerGroup string, putFn func(msg model.InputMessage, markFn func()), cleanupFn func()) (err error) {
k.cfg = cfg
k.ctx, k.cancel = context.WithCancel(context.Background())
k.putFn = putFn
k.cleanupFn = cleanupFn
k.topic = topicName
sarCfg, err := GetSaramaConfig(cfg)
if err != nil {
return err
}
sarCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
cg, err := sarama.NewConsumerGroup(cfg.Addresses, consumerGroup, sarCfg)
if err != nil {
return err
}
k.cg = cg
return nil
}
func GetSaramaConfig(kfkCfg model.KafkaCfg) (sarCfg *sarama.Config, err error) {
sarCfg = sarama.NewConfig()
sarCfg.Version = sarama.V2_0_0_0
sarCfg.Consumer.Return.Errors = false
// check for authentication
if kfkCfg.Username != "" && kfkCfg.Password != "" {
sarCfg.Net.SASL.Enable = true
sarCfg.Net.SASL.User = kfkCfg.Username
sarCfg.Net.SASL.Password = kfkCfg.Password
}
sarCfg.ChannelBufferSize = 1024
return
}
func (k *KafkaSarama) Run() {
k.wgRun.Add(1)
defer k.wgRun.Done()
LOOP_SARAMA:
for {
handler := MyConsumerGroupHandler{k}
if k.ctx.Err() != nil {
return
}
if err := k.cg.Consume(k.ctx, []string{k.topic}, handler); err != nil {
if errors.Is(err, context.Canceled) {
logs.Logger.Info("KafkaSarama.Run quit due to context has been canceled", zap.String("task", k.topic))
break LOOP_SARAMA
} else if errors.Is(err, sarama.ErrClosedConsumerGroup) {
logs.Logger.Info("KafkaSarama.Run quit due to consumer group has been closed", zap.String("task", k.topic))
break LOOP_SARAMA
} else {
logs.Logger.Error("sarama.ConsumerGroup.Consume failed", zap.String("task", k.topic), zap.Error(err))
continue
}
}
}
}
func (k *KafkaSarama) CommitMessages(msg *model.InputMessage) error {
k.sess.MarkOffset(msg.Topic, int32(msg.Partition), msg.Offset+1, "")
return nil
}
func (k *KafkaSarama) Stop() error {
k.cancel()
k.cg.Close()
k.wgRun.Wait()
return nil
}
func (k *KafkaSarama) Description() string {
return "kafka consumer of topic " + k.topic
}