...
|
...
|
@@ -18,8 +18,8 @@ type MessageConsumer struct { |
|
|
groupId string
|
|
|
topics []string
|
|
|
topicsHandles map[string]TopicHandle
|
|
|
beforeHandles []TopicHandle
|
|
|
afterHandles []TopicHandle
|
|
|
// beforeHandles []TopicHandle
|
|
|
// afterHandles []TopicHandle
|
|
|
}
|
|
|
|
|
|
//实现对应的接口
|
...
|
...
|
@@ -41,13 +41,8 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
|
err error
|
|
|
)
|
|
|
for message := range groupClaim.Messages() {
|
|
|
|
|
|
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
|
|
|
message.Timestamp, message.Topic, message.Offset, string(message.Value))
|
|
|
for i := range c.beforeHandles {
|
|
|
c.beforeHandles[i](message)
|
|
|
}
|
|
|
groupSession.MarkMessage(message, "")
|
|
|
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
|
|
|
logs.Error("FindTopichandle err:%s \n", err)
|
|
|
continue
|
...
|
...
|
@@ -55,9 +50,7 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
|
if err = topicHandle(message); err != nil {
|
|
|
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
|
|
|
}
|
|
|
for i := range c.beforeHandles {
|
|
|
c.afterHandles[i](message)
|
|
|
}
|
|
|
groupSession.MarkMessage(message, "")
|
|
|
}
|
|
|
return nil
|
|
|
}
|
...
|
...
|
@@ -79,18 +72,20 @@ func NewRuner() *Runer { |
|
|
for key := range TopicHandleRouters {
|
|
|
topics = append(topics, key)
|
|
|
}
|
|
|
|
|
|
return &Runer{
|
|
|
r := &Runer{
|
|
|
msgConsumer: &MessageConsumer{
|
|
|
ready: make(chan struct{}),
|
|
|
kafkaHosts: configs.Cfg.Servers,
|
|
|
groupId: configs.Cfg.ConsumerId,
|
|
|
topicsHandles: TopicHandleRouters,
|
|
|
topics: topics,
|
|
|
beforeHandles: BeforeHandles,
|
|
|
afterHandles: AfterHandles,
|
|
|
// beforeHandles: BeforeHandles,
|
|
|
// afterHandles: AfterHandles,
|
|
|
},
|
|
|
}
|
|
|
logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts,
|
|
|
r.msgConsumer.topics, r.msgConsumer.groupId)
|
|
|
return r
|
|
|
}
|
|
|
|
|
|
func (r *Runer) InitConsumer() error {
|
...
|
...
|
|