topic_handle_router.go 725 字节
package consumer

import (
	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles"
)

//TopicHandle 处理kafka中得消息
type TopicHandle func(*sarama.ConsumerMessage) error

var BeforeHandles = []TopicHandle{}
var AfterHandles = []TopicHandle{}

//TopicHandleRouters 根据topic区分消息并进行处理
var TopicHandleRouters = map[string]TopicHandle{
	"topic_test": func(message *sarama.ConsumerMessage) error {
		logs.Info("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
			message.Timestamp, message.Topic, message.Offset, string(message.Value))
		return nil
	},
	"xiangmi-orders": handles.DataFromXiangMi,
}