consumer.go 2.8 KB
package consumer

import (
	"context"
	"errors"
	"sync"

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

//MessageConsumer 消息消费者
type MessageConsumer struct {
	ready         chan bool
	kafkaHosts    []string
	groupId       string
	topics        []string
	topicsHandles map[string]TopicHandle
}

func NewMessageConsumer() *MessageConsumer {
	topics := []string{}
	for key := range TopicHandleRouters {
		topics = append(topics, key)
	}
	return &MessageConsumer{
		ready:         make(chan bool),
		kafkaHosts:    configs.Cfg.Servers,
		groupId:       configs.Cfg.ConsumerId,
		topicsHandles: TopicHandleRouters,
		topics:        topics,
	}
}

//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)

func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
	return nil
}

func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
	groupClaim sarama.ConsumerGroupClaim) error {
	var (
		topicHandle TopicHandle
		err         error
	)
	for message := range groupClaim.Messages() {
		if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
			logs.Error("FindTopichandle err:%s \n", err)
			continue
		}
		if err = topicHandle(message); err != nil {
			logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
		}
		groupSession.MarkMessage(message, "")
	}
	return nil
}

func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
	if v, ok := c.topicsHandles[topic]; ok {
		return v, nil
	}
	return nil, errors.New("TopicHandle not found")
}

//StartConsumer 启动
//返回 Consumer关闭方法 和 error
func StartConsumer(ctx context.Context) (func(), error) {
	consumer := NewMessageConsumer()
	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V0_11_0_2
	consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
	if err != nil {
		return nil, err
	}
	wg := &sync.WaitGroup{}
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			if err := ctx.Err(); err != nil {
				logs.Error("ctx err:%s \n", err)
				return
			}
			if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
				logs.Error("consumerGroup err:%s \n", err)
			}
		}
	}()
	//等待 consumerGroup 设置完成
	<-consumer.ready
	logs.Info("Sarama consumer up and running!...")
	return func() {
		wg.Wait()
		if err := consumerGroup.Close(); err != nil {
			logs.Error("consumerGroup.Close err %s", err)
		}
		logs.Info("consumerGroup.Close")
	}, nil
}