consumer.go
3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package consumer
import (
"context"
"errors"
"time"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
)
//MessageConsumer 消息消费者
type MessageConsumer struct {
ready chan struct{}
kafkaHosts []string
groupId string
topics []string
topicsHandles map[string]TopicHandle
// beforeHandles []TopicHandle
// afterHandles []TopicHandle
}
//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)
func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
close(c.ready)
return nil
}
func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
return nil
}
func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
groupClaim sarama.ConsumerGroupClaim) error {
var (
topicHandle TopicHandle
err error
)
for message := range groupClaim.Messages() {
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
message.Timestamp, message.Topic, message.Offset, string(message.Value))
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
logs.Error("FindTopichandle err:%s \n", err)
continue
}
if err = topicHandle(message); err != nil {
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
}
groupSession.MarkMessage(message, "")
}
return nil
}
func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
if v, ok := c.topicsHandles[topic]; ok {
return v, nil
}
return nil, errors.New("TopicHandle not found")
}
type Runer struct {
msgConsumer *MessageConsumer
consumerGroup sarama.ConsumerGroup
}
func NewRuner() *Runer {
topics := []string{}
for key := range TopicHandleRouters {
topics = append(topics, key)
}
r := &Runer{
msgConsumer: &MessageConsumer{
ready: make(chan struct{}),
kafkaHosts: configs.Cfg.Servers,
groupId: configs.Cfg.ConsumerId,
topicsHandles: TopicHandleRouters,
topics: topics,
// beforeHandles: BeforeHandles,
// afterHandles: AfterHandles,
},
}
logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts,
r.msgConsumer.topics, r.msgConsumer.groupId)
return r
}
func (r *Runer) InitConsumer() error {
config := sarama.NewConfig()
//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Version = sarama.V0_10_2_1
consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
if err != nil {
return err
}
r.consumerGroup = consumerGroup
return nil
}
func (r *Runer) Start(ctx context.Context) {
defer func() {
if e := recover(); e != nil {
logs.Error(e)
}
}()
for {
select {
case <-ctx.Done():
logs.Warning("ctx cancel;consumerGroup.Close()")
r.consumerGroup.Close()
return
default:
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
logs.Error("consumerGroup err:%s \n", err)
//等待重试
timer := time.NewTimer(5 * time.Second)
<-timer.C
}
r.msgConsumer.ready = make(chan struct{})
}
}
}
func (r *Runer) IsReady() <-chan struct{} {
return r.msgConsumer.ready
}