作者 庄敏学

kafka topic

//go:build !local
// +build !local
package constant
... ... @@ -8,14 +9,14 @@ var KAFKA_HOSTS = "127.0.0.1:9092" // 1.116.151.79:9092
var KAFKA_GROUP_ID = "performance_dev"
var KAFKA_BUSINESS_ADMIN_TOPIC = "mmm-business-admin-dev"
var KAFKA_BUSINESS_TOPIC = "mmm-business-admin-dev"
func init() {
if os.Getenv("KAFKA_HOSTS") != "" {
KAFKA_HOSTS = os.Getenv("KAFKA_HOSTS")
}
if os.Getenv("KAFKA_BUSINESS_ADMIN_TOPIC") != "" {
KAFKA_BUSINESS_ADMIN_TOPIC = os.Getenv("KAFKA_BUSINESS_ADMIN_TOPIC")
if os.Getenv("KAFKA_BUSINESS_TOPIC") != "" {
KAFKA_BUSINESS_TOPIC = os.Getenv("KAFKA_BUSINESS_TOPIC")
}
if os.Getenv("KAFKA_GROUP_ID") != "" {
KAFKA_GROUP_ID = os.Getenv("KAFKA_GROUP_ID")
... ...
... ... @@ -11,13 +11,14 @@ type User struct {
Name string // 用户姓名
Email string // 邮箱
Status int // 用户状态(1正常 2禁用)
DepartmentId []int // 用户归属的部门
DepartmentId []int64 // 用户归属的部门
PositionId []int64 //用户职位
UpdateAt time.Time // 更新时间
DeleteAt *time.Time
CreateAt time.Time
}
//1普通员工 2 主管理员
// 1普通员工 2 主管理员
const (
UserTypeCommon int = 1
UserTypeManager int = 2
... ...
... ... @@ -12,7 +12,7 @@ func Run() {
messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
messageHandlerMap["demo-v1"] = Demo
//"指定topic" => 对应的处理方法
messageHandlerMap["mmm-business-admin-dev"] = handle.SyncDataBusinessAdmin
messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin
err := saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger)
log.Logger.Error(err.Error())
}
... ...