consumer.go
2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package consumer
import (
"context"
"errors"
"sync"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
//MessageConsumer 消息消费者
type MessageConsumer struct {
ready chan bool
kafkaHosts []string
groupId string
topics []string
topicsHandles map[string]TopicHandle
}
func NewMessageConsumer() *MessageConsumer {
topics := []string{}
for key := range TopicHandleRouters {
topics = append(topics, key)
}
return &MessageConsumer{
ready: make(chan bool),
kafkaHosts: configs.Cfg.Servers,
groupId: configs.Cfg.ConsumerId,
topicsHandles: TopicHandleRouters,
topics: topics,
}
}
//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)
func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
close(c.ready)
return nil
}
func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
return nil
}
func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
groupClaim sarama.ConsumerGroupClaim) error {
var (
topicHandle TopicHandle
err error
)
for message := range groupClaim.Messages() {
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
logs.Error("FindTopichandle err:%s \n", err)
continue
}
if err = topicHandle(message); err != nil {
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
}
groupSession.MarkMessage(message, "")
}
return nil
}
func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
if v, ok := c.topicsHandles[topic]; ok {
return v, nil
}
return nil, errors.New("TopicHandle not found")
}
//StartConsumer 启动
//返回 Consumer关闭方法 和 error
func StartConsumer(ctx context.Context) (func(), error) {
consumer := NewMessageConsumer()
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V0_11_0_2
consumerGroup, err := sarama.NewConsumerGroup(consumer.kafkaHosts, consumer.groupId, config)
if err != nil {
return nil, err
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := ctx.Err(); err != nil {
logs.Error("ctx err:%s \n", err)
return
}
if err := consumerGroup.Consume(ctx, consumer.topics, consumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
}
}
}()
//等待 consumerGroup 设置完成
<-consumer.ready
logs.Info("Sarama consumer up and running!...")
return func() {
wg.Wait()
if err := consumerGroup.Close(); err != nil {
logs.Error("consumerGroup.Close err %s", err)
}
logs.Info("consumerGroup.Close")
}, nil
}