审查视图

pkg/port/consumer/consumer.go 3.4 KB
唐旭辉 authored
1 2 3 4 5
package consumer

import (
	"context"
	"errors"
唐旭辉 authored
6
	"time"
唐旭辉 authored
7 8 9 10 11 12 13 14 15

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

//MessageConsumer 消息消费者
type MessageConsumer struct {
Your Name authored
16
	ready         chan struct{}
唐旭辉 authored
17 18 19 20
	kafkaHosts    []string
	groupId       string
	topics        []string
	topicsHandles map[string]TopicHandle
唐旭辉 authored
21 22
	beforeHandles []TopicHandle
	afterHandles  []TopicHandle
唐旭辉 authored
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
}

//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)

func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
	return nil
}

func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
	groupClaim sarama.ConsumerGroupClaim) error {
	var (
		topicHandle TopicHandle
		err         error
	)
	for message := range groupClaim.Messages() {
唐旭辉 authored
44
唐旭辉 authored
45 46
		logs.Debug("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
			message.Timestamp, message.Topic, message.Offset, string(message.Value))
唐旭辉 authored
47 48 49
		for i := range c.beforeHandles {
			c.beforeHandles[i](message)
		}
唐旭辉 authored
50
		groupSession.MarkMessage(message, "")
唐旭辉 authored
51 52 53 54 55 56 57
		if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
			logs.Error("FindTopichandle err:%s \n", err)
			continue
		}
		if err = topicHandle(message); err != nil {
			logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
		}
唐旭辉 authored
58 59 60
		for i := range c.beforeHandles {
			c.afterHandles[i](message)
		}
唐旭辉 authored
61 62 63 64 65 66 67 68 69 70 71
	}
	return nil
}

func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
	if v, ok := c.topicsHandles[topic]; ok {
		return v, nil
	}
	return nil, errors.New("TopicHandle not found")
}
Your Name authored
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
type Runer struct {
	msgConsumer   *MessageConsumer
	consumerGroup sarama.ConsumerGroup
}

func NewRuner() *Runer {
	topics := []string{}
	for key := range TopicHandleRouters {
		topics = append(topics, key)
	}

	return &Runer{
		msgConsumer: &MessageConsumer{
			ready:         make(chan struct{}),
			kafkaHosts:    configs.Cfg.Servers,
			groupId:       configs.Cfg.ConsumerId,
			topicsHandles: TopicHandleRouters,
			topics:        topics,
唐旭辉 authored
90 91
			beforeHandles: BeforeHandles,
			afterHandles:  AfterHandles,
Your Name authored
92 93 94 95 96
		},
	}
}

func (r *Runer) InitConsumer() error {
唐旭辉 authored
97
	config := sarama.NewConfig()
唐旭辉 authored
98
	//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
唐旭辉 authored
99
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
唐旭辉 authored
100 101 102 103
	config.Version = sarama.V0_10_2_1
	// config.Version = sarama.KafkaVersion{
	// 	version: [4]int{},
	// }
Your Name authored
104
	consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
唐旭辉 authored
105
	if err != nil {
Your Name authored
106
		return err
唐旭辉 authored
107
	}
Your Name authored
108 109 110 111 112
	r.consumerGroup = consumerGroup
	return nil
}

func (r *Runer) Start(ctx context.Context) {
唐旭辉 authored
113 114 115 116 117
	defer func() {
		if e := recover(); e != nil {
			logs.Error(e)
		}
	}()
Your Name authored
118 119 120
	for {
		select {
		case <-ctx.Done():
唐旭辉 authored
121 122
			logs.Warning("ctx cancel;consumerGroup.Close()")
			r.consumerGroup.Close()
Your Name authored
123 124
			return
		default:
唐旭辉 authored
125 126 127 128 129 130 131
			if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
				logs.Error("consumerGroup err:%s \n", err)
				//等待重试
				timer := time.NewTimer(5 * time.Second)
				<-timer.C
			}
			r.msgConsumer.ready = make(chan struct{})
唐旭辉 authored
132
		}
唐旭辉 authored
133
Your Name authored
134 135 136 137
	}
}
func (r *Runer) IsReady() <-chan struct{} {
	return r.msgConsumer.ready
唐旭辉 authored
138
}