作者 Your Name

更新

... ... @@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
... ... @@ -23,16 +22,21 @@ func main() {
logs.Info("应用启动")
beego.Run()
}()
closeConsumer, err := consumer.StartConsumer(ctx)
if err != nil {
fmt.Printf("启动kafka消息消费者失败 err%s \n", err)
consumerRun := consumer.NewRuner()
if err := consumerRun.InitConsumer(); err != nil {
logs.Error("启动kafka消息消费者失败:%s", err)
}
go func() {
consumerRun.Start(ctx)
}()
go func() {
<-consumerRun.IsReady()
logs.Info("Sarama consumer up and running!...")
}()
for {
select {
case <-sigs:
cancel()
closeConsumer()
return
default:
}
... ...
... ... @@ -3,7 +3,6 @@ package consumer
import (
"context"
"errors"
"sync"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
... ... @@ -13,26 +12,26 @@ import (
//MessageConsumer 消息消费者
type MessageConsumer struct {
ready chan bool
ready chan struct{}
kafkaHosts []string
groupId string
topics []string
topicsHandles map[string]TopicHandle
}
func NewMessageConsumer() *MessageConsumer {
topics := []string{}
for key := range TopicHandleRouters {
topics = append(topics, key)
}
return &MessageConsumer{
ready: make(chan bool),
kafkaHosts: configs.Cfg.Servers,
groupId: configs.Cfg.ConsumerId,
topicsHandles: TopicHandleRouters,
topics: topics,
}
}
// func NewMessageConsumer() *MessageConsumer {
// topics := []string{}
// for key := range TopicHandleRouters {
// topics = append(topics, key)
// }
// return &MessageConsumer{
// ready: make(chan bool),
// kafkaHosts: configs.Cfg.Servers,
// groupId: configs.Cfg.ConsumerId,
// topicsHandles: TopicHandleRouters,
// topics: topics,
// }
// }
//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)
... ... @@ -72,41 +71,95 @@ func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
return nil, errors.New("TopicHandle not found")
}
//StartConsumer 启动
//返回 Consumer关闭方法 和 error
func StartConsumer(ctx context.Context) (func(), error) {
consumer := NewMessageConsumer()
type Runer struct {
msgConsumer *MessageConsumer
consumerGroup sarama.ConsumerGroup
}
func NewRuner() *Runer {
topics := []string{}
for key := range TopicHandleRouters {
topics = append(topics, key)
}
return &Runer{
msgConsumer: &MessageConsumer{
ready: make(chan struct{}),
kafkaHosts: configs.Cfg.Servers,
groupId: configs.Cfg.ConsumerId,
topicsHandles: TopicHandleRouters,
topics: topics,
},
}
}
func (r *Runer) InitConsumer() error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V0_11_0_2
consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
if err != nil {
return func() {}, err
return err
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := ctx.Err(); err != nil {
logs.Error("ctx err:%s \n", err)
return
}
if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
r.consumerGroup = consumerGroup
return nil
}
func (r *Runer) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
r.consumerGroup.Close()
logs.Warning("ctx cancel;consumerGroup.Close()")
return
default:
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
}
consumer.ready = make(chan bool)
}
}()
//等待 consumerGroup 设置完成
<-consumer.ready
logs.Info("Sarama consumer up and running!...")
return func() {
wg.Wait()
if err := consumerGroup.Close(); err != nil {
logs.Error("consumerGroup.Close err %s", err)
r.msgConsumer.ready = make(chan struct{})
}
logs.Info("consumerGroup.Close")
}, nil
}
}
func (r *Runer) IsReady() <-chan struct{} {
return r.msgConsumer.ready
}
//StartConsumer 启动
//返回 Consumer关闭方法 和 error
// func StartConsumer(ctx context.Context) (func(), error) {
// consumer := NewMessageConsumer()
// config := sarama.NewConfig()
// config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// config.Consumer.Offsets.Initial = sarama.OffsetNewest
// config.Version = sarama.V0_11_0_2
// consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
// if err != nil {
// return func() {}, err
// }
// wg := &sync.WaitGroup{}
// wg.Add(1)
// go func() {
// defer wg.Done()
// for {
// if err := ctx.Err(); err != nil {
// logs.Error("ctx err:%s \n", err)
// return
// }
// if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
// logs.Error("consumerGroup err:%s \n", err)
// }
// consumer.ready = make(chan bool)
// }
// }()
// //等待 consumerGroup 设置完成
// <-consumer.ready
// logs.Info("Sarama consumer up and running!...")
// return func() {
// wg.Wait()
// if err := consumerGroup.Close(); err != nil {
// logs.Error("consumerGroup.Close err %s", err)
// }
// logs.Info("consumerGroup.Close")
// }, nil
// }
... ...