package consumer import ( "context" "errors" "time" "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" "github.com/Shopify/sarama" "github.com/astaxie/beego/logs" ) //MessageConsumer 消息消费者 type MessageConsumer struct { ready chan struct{} kafkaHosts []string groupId string topics []string topicsHandles map[string]TopicHandle beforeHandles []TopicHandle afterHandles []TopicHandle } //实现对应的接口 var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil) func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error { close(c.ready) return nil } func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error { return nil } func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, groupClaim sarama.ConsumerGroupClaim) error { var ( topicHandle TopicHandle err error ) for message := range groupClaim.Messages() { logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", message.Timestamp, message.Topic, message.Offset, string(message.Value)) for i := range c.beforeHandles { c.beforeHandles[i](message) } groupSession.MarkMessage(message, "") if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil { logs.Error("FindTopichandle err:%s \n", err) continue } if err = topicHandle(message); err != nil { logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err) } for i := range c.beforeHandles { c.afterHandles[i](message) } } return nil } func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) { if v, ok := c.topicsHandles[topic]; ok { return v, nil } return nil, errors.New("TopicHandle not found") } type Runer struct { msgConsumer *MessageConsumer consumerGroup sarama.ConsumerGroup } func NewRuner() *Runer { topics := []string{} for key := range TopicHandleRouters { topics = append(topics, key) } return &Runer{ msgConsumer: &MessageConsumer{ ready: make(chan struct{}), kafkaHosts: configs.Cfg.Servers, groupId: configs.Cfg.ConsumerId, topicsHandles: TopicHandleRouters, topics: topics, beforeHandles: BeforeHandles, afterHandles: AfterHandles, }, } } func (r *Runer) InitConsumer() error { config := sarama.NewConfig() //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Version = sarama.V0_10_2_1 // config.Version = sarama.KafkaVersion{ // version: [4]int{}, // } consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config) if err != nil { return err } r.consumerGroup = consumerGroup return nil } func (r *Runer) Start(ctx context.Context) { defer func() { if e := recover(); e != nil { logs.Error(e) } }() for { select { case <-ctx.Done(): logs.Warning("ctx cancel;consumerGroup.Close()") r.consumerGroup.Close() return default: if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil { logs.Error("consumerGroup err:%s \n", err) //等待重试 timer := time.NewTimer(5 * time.Second) <-timer.C } r.msgConsumer.ready = make(chan struct{}) } } } func (r *Runer) IsReady() <-chan struct{} { return r.msgConsumer.ready }