作者 唐旭辉

更新

... ... @@ -135,6 +135,10 @@ func (r *Runer) Start(ctx context.Context) {
logs.Error(e)
}
}()
if len(r.msgConsumer.topics) == 0 {
logs.Error("there has no topics")
return
}
for {
select {
case <-ctx.Done():
... ...
... ... @@ -4,7 +4,6 @@ import (
"os"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles"
)
... ... @@ -13,11 +12,11 @@ type TopicHandle func(*sarama.ConsumerMessage) error
//TopicHandleRouters 根据topic区分消息并进行处理
var TopicHandleRouters = map[string]TopicHandle{
"topic_test": func(message *sarama.ConsumerMessage) error {
logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
message.Timestamp, message.Topic, message.Offset, string(message.Value))
return nil
},
// "topic_test": func(message *sarama.ConsumerMessage) error {
// logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
// message.Timestamp, message.Topic, message.Offset, string(message.Value))
// return nil
// },
}
func init() {
... ...