package consumer import ( "os" "github.com/Shopify/sarama" "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles" ) //TopicHandle 处理kafka中得消息 type TopicHandle func(*sarama.ConsumerMessage) error //TopicHandleRouters 根据topic区分消息并进行处理 var TopicHandleRouters = map[string]TopicHandle{ // "topic_test": func(message *sarama.ConsumerMessage) error { // logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", // message.Timestamp, message.Topic, message.Offset, string(message.Value)) // return nil // }, } func init() { var runEnv string if os.Getenv("KAFKA_CONSUMER_ID") != "" { runEnv = os.Getenv("KAFKA_CONSUMER_ID") } if runEnv == "partnermg_test" { initHandleRoutersTest() } if runEnv == "partnermg_prd" { initHandleRoutersProd() } if runEnv == "partnermg_dev" { initHandleRoutersDev() } } func initHandleRoutersTest() { TopicHandleRouters["xiangmi_project_test"] = handles.DataFromXiangMi } func initHandleRoutersProd() { TopicHandleRouters["xiangmi_project"] = handles.DataFromXiangMi } func initHandleRoutersDev() { TopicHandleRouters["xiangmi_project_dev"] = handles.DataFromXiangMi }