package consumer import ( "github.com/Shopify/sarama" "github.com/astaxie/beego/logs" ) //TopicHandle 处理kafka中得消息 type TopicHandle func(*sarama.ConsumerMessage) error var BeforeHandles = []TopicHandle{} var AfterHandles = []TopicHandle{} //TopicHandleRouters 根据topic区分消息并进行处理 var TopicHandleRouters = map[string]TopicHandle{ "topic_test": func(message *sarama.ConsumerMessage) error { logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", message.Timestamp, message.Topic, message.Offset, string(message.Value)) return nil }, "xiangmi-orders": SyncBestshopOrder, }