...
|
...
|
@@ -18,6 +18,8 @@ type MessageConsumer struct { |
|
|
groupId string
|
|
|
topics []string
|
|
|
topicsHandles map[string]TopicHandle
|
|
|
beforeHandles []TopicHandle
|
|
|
afterHandles []TopicHandle
|
|
|
}
|
|
|
|
|
|
// func NewMessageConsumer() *MessageConsumer {
|
...
|
...
|
@@ -58,7 +60,6 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
|
groupSession.MarkMessage(message, "")
|
|
|
if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
|
|
|
logs.Error("FindTopichandle err:%s \n", err)
|
|
|
|
|
|
continue
|
|
|
}
|
|
|
if err = topicHandle(message); err != nil {
|
...
|
...
|
@@ -93,6 +94,8 @@ func NewRuner() *Runer { |
|
|
groupId: configs.Cfg.ConsumerId,
|
|
|
topicsHandles: TopicHandleRouters,
|
|
|
topics: topics,
|
|
|
beforeHandles: BeforeHandles,
|
|
|
afterHandles: AfterHandles,
|
|
|
},
|
|
|
}
|
|
|
}
|
...
|
...
|
@@ -111,6 +114,11 @@ func (r *Runer) InitConsumer() error { |
|
|
}
|
|
|
|
|
|
func (r *Runer) Start(ctx context.Context) {
|
|
|
defer func() {
|
|
|
if e := recover(); e != nil {
|
|
|
logs.Error(e)
|
|
|
}
|
|
|
}()
|
|
|
for {
|
|
|
select {
|
|
|
case <-ctx.Done():
|
...
|
...
|
|