|
@@ -57,6 +57,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
@@ -57,6 +57,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
57
|
for message := range groupClaim.Messages() {
|
57
|
for message := range groupClaim.Messages() {
|
58
|
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
|
58
|
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
|
59
|
message.Timestamp, message.Topic, message.Offset, string(message.Value))
|
59
|
message.Timestamp, message.Topic, message.Offset, string(message.Value))
|
|
|
60
|
+ for i := range c.beforeHandles {
|
|
|
61
|
+ c.beforeHandles[i](message)
|
|
|
62
|
+ }
|
60
|
groupSession.MarkMessage(message, "")
|
63
|
groupSession.MarkMessage(message, "")
|
61
|
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
|
64
|
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
|
62
|
logs.Error("FindTopichandle err:%s \n", err)
|
65
|
logs.Error("FindTopichandle err:%s \n", err)
|
|
@@ -65,6 +68,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
@@ -65,6 +68,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
65
|
if err = topicHandle(message); err != nil {
|
68
|
if err = topicHandle(message); err != nil {
|
66
|
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
|
69
|
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
|
67
|
}
|
70
|
}
|
|
|
71
|
+ for i := range c.beforeHandles {
|
|
|
72
|
+ c.afterHandles[i](message)
|
|
|
73
|
+ }
|
68
|
}
|
74
|
}
|
69
|
return nil
|
75
|
return nil
|
70
|
}
|
76
|
}
|