package consumer import ( "context" "errors" "fmt" "time" "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" "github.com/Shopify/sarama" "github.com/astaxie/beego/logs" cluster "github.com/bsm/sarama-cluster" ) //MessageConsumer 消息消费者 type MessageConsumer struct { ready chan struct{} kafkaHosts []string groupId string topics []string topicsHandles map[string]TopicHandle // beforeHandles []TopicHandle // afterHandles []TopicHandle } //实现对应的接口 var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil) func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error { close(c.ready) return nil } func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error { return nil } func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, groupClaim sarama.ConsumerGroupClaim) error { var ( topicHandle TopicHandle err error ) for message := range groupClaim.Messages() { logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", message.Timestamp, message.Topic, message.Offset, string(message.Value)) if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil { logs.Error("FindTopichandle err:%s \n", err) continue } if err = topicHandle(message); err != nil { logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err) } groupSession.MarkMessage(message, "") } return nil } func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) { if v, ok := c.topicsHandles[topic]; ok { return v, nil } return nil, errors.New("TopicHandle not found") } type Runer struct { msgConsumer *MessageConsumer consumerGroup sarama.ConsumerGroup Consumer *cluster.Consumer } func NewRuner() *Runer { topics := []string{} for key := range TopicHandleRouters { topics = append(topics, key) } r := &Runer{ msgConsumer: &MessageConsumer{ ready: make(chan struct{}), kafkaHosts: configs.Cfg.Servers, groupId: configs.Cfg.ConsumerId, topicsHandles: TopicHandleRouters, topics: topics, }, } logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts, r.msgConsumer.topics, r.msgConsumer.groupId) return r } func (r *Runer) InitConsumer() error { config := sarama.NewConfig() //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Version = sarama.V0_10_2_1 if err := config.Validate(); err != nil { msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err) logs.Error(msg) panic(msg) } consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config) if err != nil { return err } r.consumerGroup = consumerGroup return nil } // func (r *Runer) InitConsumer() error { // clusterCfg := cluster.NewConfig() // clusterCfg.Consumer.Return.Errors = true // clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest // clusterCfg.Group.Return.Notifications = true // clusterCfg.Version = sarama.V0_10_2_1 // // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"} // // groupid := "partnermg_dev" // // topic := []string{"topic_test"} // logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics) // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg) // // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg) // if err != nil { // msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) // logs.Error(msg) // panic(msg) // } // r.Consumer = consumer // return nil // } func (r *Runer) Start(ctx context.Context) { defer func() { if e := recover(); e != nil { logs.Error(e) } }() if len(r.msgConsumer.topics) == 0 { logs.Error("there has no topics") return } for { select { case <-ctx.Done(): logs.Warning("ctx cancel;consumerGroup.Close()") r.consumerGroup.Close() return default: if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil { logs.Error("consumerGroup err:%s \n", err) //等待重试 timer := time.NewTimer(5 * time.Second) <-timer.C } r.msgConsumer.ready = make(chan struct{}) } } } // func (r *Runer) Start(ctx context.Context) { // for { // select { // case msg, more := <-r.Consumer.Messages(): // if more { // logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp) // r.Consumer.MarkOffset(msg, "") // mark message as processed // } // case err, more := <-r.Consumer.Errors(): // if more { // logs.Info("Kafka consumer error: %v", err.Error()) // } // case ntf, more := <-r.Consumer.Notifications(): // if more { // logs.Info("Kafka consumer rebalance: %v", ntf) // } // case <-ctx.Done(): // logs.Info("Stop consumer server...") // r.Consumer.Close() // return // } // } // } func (r *Runer) IsReady() <-chan struct{} { return r.msgConsumer.ready }