作者 唐旭辉

更新

... ... @@ -24,12 +24,12 @@ func main() {
beego.Run()
}()
consumerRun := consumer.NewRuner()
if err := consumerRun.InitConsumer(); err != nil {
logs.Error("启动kafka消息消费者失败:%s", err)
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
if err := consumerRun.InitConsumer(); err != nil {
logs.Error("启动kafka消息消费者失败:%s", err)
}
consumerRun.Start(ctx)
wg.Done()
}()
... ...
... ... @@ -14,7 +14,7 @@ var KafkaCfg KafkaConfig
func init() {
KafkaCfg = KafkaConfig{
Servers: []string{"106.52.15.41:9092"},
Servers: []string{"192.168.190.136:9092"},
ConsumerId: "partnermg_local",
}
if os.Getenv("KAFKA_HOST") != "" {
... ...
... ... @@ -95,6 +95,9 @@ func (good OrderGoodBestShop) CopyToOrderGood(g *OrderGood) {
g.Price = good.Price
g.PlanGoodNumber = good.Nums
g.GoodCompute.PlanAmount = good.Amount
g.BonusStatus = OrderGoodWaitPay
g.CurrentBonusStatus = OrderGoodBonusWaitPay{}
g.CurrentBonusStatus.WartPayPartnerBonus(g)
return
}
... ...
... ... @@ -22,20 +22,6 @@ type MessageConsumer struct {
afterHandles []TopicHandle
}
// func NewMessageConsumer() *MessageConsumer {
// topics := []string{}
// for key := range TopicHandleRouters {
// topics = append(topics, key)
// }
// return &MessageConsumer{
// ready: make(chan bool),
// kafkaHosts: configs.Cfg.Servers,
// groupId: configs.Cfg.ConsumerId,
// topicsHandles: TopicHandleRouters,
// topics: topics,
// }
// }
//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)
... ... @@ -128,59 +114,21 @@ func (r *Runer) Start(ctx context.Context) {
for {
select {
case <-ctx.Done():
err := r.consumerGroup.Close()
logs.Warning("ctx cancel;consumerGroup.Close();err:%s", err)
logs.Warning("ctx cancel;consumerGroup.Close()")
r.consumerGroup.Close()
return
default:
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
}
r.msgConsumer.ready = make(chan struct{})
}
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
}
r.msgConsumer.ready = make(chan struct{})
}
}
func (r *Runer) IsReady() <-chan struct{} {
return r.msgConsumer.ready
}
//StartConsumer 启动
//返回 Consumer关闭方法 和 error
// func StartConsumer(ctx context.Context) (func(), error) {
// consumer := NewMessageConsumer()
// config := sarama.NewConfig()
// config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// config.Consumer.Offsets.Initial = sarama.OffsetNewest
// config.Version = sarama.V0_11_0_2
// consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
// if err != nil {
// return func() {}, err
// }
// wg := &sync.WaitGroup{}
// wg.Add(1)
// go func() {
// defer wg.Done()
// for {
// if err := ctx.Err(); err != nil {
// logs.Error("ctx err:%s \n", err)
// return
// }
// if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
// logs.Error("consumerGroup err:%s \n", err)
// }
// consumer.ready = make(chan bool)
// }
// }()
// //等待 consumerGroup 设置完成
// <-consumer.ready
// logs.Info("Sarama consumer up and running!...")
// return func() {
// wg.Wait()
// if err := consumerGroup.Close(); err != nil {
// logs.Error("consumerGroup.Close err %s", err)
// }
// logs.Info("consumerGroup.Close")
// }, nil
// }
... ...