consumer.go 4.6 KB
package consumer

import (
	"context"
	"errors"
	"time"

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

//MessageConsumer 消息消费者
type MessageConsumer struct {
	ready         chan struct{}
	kafkaHosts    []string
	groupId       string
	topics        []string
	topicsHandles map[string]TopicHandle
}

// func NewMessageConsumer() *MessageConsumer {
// 	topics := []string{}
// 	for key := range TopicHandleRouters {
// 		topics = append(topics, key)
// 	}
// 	return &MessageConsumer{
// 		ready:         make(chan bool),
// 		kafkaHosts:    configs.Cfg.Servers,
// 		groupId:       configs.Cfg.ConsumerId,
// 		topicsHandles: TopicHandleRouters,
// 		topics:        topics,
// 	}
// }

//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)

func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
	return nil
}

func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
	groupClaim sarama.ConsumerGroupClaim) error {
	var (
		topicHandle TopicHandle
		err         error
	)
	for message := range groupClaim.Messages() {
		logs.Debug("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
			message.Timestamp, message.Topic, message.Offset, string(message.Value))
		if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
			logs.Error("FindTopichandle err:%s \n", err)
			continue
		}
		if err = topicHandle(message); err != nil {
			logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
		} else {
			groupSession.MarkMessage(message, "")
		}
	}
	return nil
}

func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
	if v, ok := c.topicsHandles[topic]; ok {
		return v, nil
	}
	return nil, errors.New("TopicHandle not found")
}

type Runer struct {
	msgConsumer   *MessageConsumer
	consumerGroup sarama.ConsumerGroup
}

func NewRuner() *Runer {
	topics := []string{}
	for key := range TopicHandleRouters {
		topics = append(topics, key)
	}

	return &Runer{
		msgConsumer: &MessageConsumer{
			ready:         make(chan struct{}),
			kafkaHosts:    configs.Cfg.Servers,
			groupId:       configs.Cfg.ConsumerId,
			topicsHandles: TopicHandleRouters,
			topics:        topics,
		},
	}
}

func (r *Runer) InitConsumer() error {
	config := sarama.NewConfig()
	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	config.Version = sarama.V0_11_0_2
	consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
	if err != nil {
		return err
	}
	r.consumerGroup = consumerGroup
	return nil
}

func (r *Runer) Start(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			err := r.consumerGroup.Close()
			logs.Warning("ctx cancel;consumerGroup.Close();err:%s", err)
			return
		default:
			if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
				logs.Error("consumerGroup err:%s \n", err)
				//等待重试
				timer := time.NewTimer(5 * time.Second)
				<-timer.C
			}
			r.msgConsumer.ready = make(chan struct{})
		}
	}
}
func (r *Runer) IsReady() <-chan struct{} {
	return r.msgConsumer.ready
}

//StartConsumer 启动
//返回 Consumer关闭方法 和 error
// func StartConsumer(ctx context.Context) (func(), error) {
// 	consumer := NewMessageConsumer()
// 	config := sarama.NewConfig()
// 	config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
// 	config.Version = sarama.V0_11_0_2
// 	consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
// 	if err != nil {
// 		return func() {}, err
// 	}
// 	wg := &sync.WaitGroup{}
// 	wg.Add(1)
// 	go func() {
// 		defer wg.Done()
// 		for {
// 			if err := ctx.Err(); err != nil {
// 				logs.Error("ctx err:%s \n", err)
// 				return
// 			}
// 			if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
// 				logs.Error("consumerGroup err:%s \n", err)
// 			}
// 			consumer.ready = make(chan bool)
// 		}
// 	}()
// 	//等待 consumerGroup 设置完成
// 	<-consumer.ready
// 	logs.Info("Sarama consumer up and running!...")
// 	return func() {
// 		wg.Wait()
// 		if err := consumerGroup.Close(); err != nil {
// 			logs.Error("consumerGroup.Close err %s", err)
// 		}
// 		logs.Info("consumerGroup.Close")
// 	}, nil
// }