consumer.go 3.3 KB
package consumer

import (
	"context"
	"errors"
	"time"

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

//MessageConsumer 消息消费者
type MessageConsumer struct {
	ready         chan struct{}
	kafkaHosts    []string
	groupId       string
	topics        []string
	topicsHandles map[string]TopicHandle
	// beforeHandles []TopicHandle
	// afterHandles  []TopicHandle
}

//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)

func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
	return nil
}

func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
	groupClaim sarama.ConsumerGroupClaim) error {
	var (
		topicHandle TopicHandle
		err         error
	)
	for message := range groupClaim.Messages() {
		logs.Debug("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
			message.Timestamp, message.Topic, message.Offset, string(message.Value))
		if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
			logs.Error("FindTopichandle err:%s \n", err)
			continue
		}
		if err = topicHandle(message); err != nil {
			logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
		}
		groupSession.MarkMessage(message, "")
	}
	return nil
}

func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
	if v, ok := c.topicsHandles[topic]; ok {
		return v, nil
	}
	return nil, errors.New("TopicHandle not found")
}

type Runer struct {
	msgConsumer   *MessageConsumer
	consumerGroup sarama.ConsumerGroup
}

func NewRuner() *Runer {
	topics := []string{}
	for key := range TopicHandleRouters {
		topics = append(topics, key)
	}
	r := &Runer{
		msgConsumer: &MessageConsumer{
			ready:         make(chan struct{}),
			kafkaHosts:    configs.Cfg.Servers,
			groupId:       configs.Cfg.ConsumerId,
			topicsHandles: TopicHandleRouters,
			topics:        topics,
			// beforeHandles: BeforeHandles,
			// afterHandles:  AfterHandles,
		},
	}
	logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts,
		r.msgConsumer.topics, r.msgConsumer.groupId)
	return r
}

func (r *Runer) InitConsumer() error {
	config := sarama.NewConfig()
	//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Version = sarama.V0_10_2_1
	consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
	if err != nil {
		return err
	}
	r.consumerGroup = consumerGroup
	return nil
}

func (r *Runer) Start(ctx context.Context) {
	defer func() {
		if e := recover(); e != nil {
			logs.Error(e)
		}
	}()
	for {
		select {
		case <-ctx.Done():
			logs.Warning("ctx cancel;consumerGroup.Close()")
			r.consumerGroup.Close()
			return
		default:
			if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
				logs.Error("consumerGroup err:%s \n", err)
				//等待重试
				timer := time.NewTimer(5 * time.Second)
				<-timer.C
			}
			r.msgConsumer.ready = make(chan struct{})
		}

	}
}
func (r *Runer) IsReady() <-chan struct{} {
	return r.msgConsumer.ready
}