topic_handle_router.go 1.2 KB
package consumer

import (
	"os"

	"github.com/Shopify/sarama"
	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles"
)

//TopicHandle 处理kafka中得消息
type TopicHandle func(*sarama.ConsumerMessage) error

//TopicHandleRouters 根据topic区分消息并进行处理
var TopicHandleRouters = map[string]TopicHandle{
	// "topic_test": func(message *sarama.ConsumerMessage) error {
	// 	logs.Info("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
	// 		message.Timestamp, message.Topic, message.Offset, string(message.Value))
	// 	return nil
	// },
}

func init() {
	var runEnv string
	if os.Getenv("KAFKA_CONSUMER_ID") != "" {
		runEnv = os.Getenv("KAFKA_CONSUMER_ID")
	}
	if runEnv == "partnermg_test" {
		initHandleRoutersTest()
	}
	if runEnv == "partnermg_prd" {
		initHandleRoutersProd()
	}
	if runEnv == "partnermg_dev" {
		initHandleRoutersDev()
	}
}

func initHandleRoutersTest() {
	TopicHandleRouters["xiangmi_project_test"] = handles.DataFromXiangMi
}

func initHandleRoutersProd() {
	TopicHandleRouters["xiangmi_project"] = handles.DataFromXiangMi
}

func initHandleRoutersDev() {
	TopicHandleRouters["xiangmi_project_dev"] = handles.DataFromXiangMi
}