作者 郑周

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	pkg/application/factory/reposetory.go
{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch file",
"type": "go",
"request": "launch",
"mode": "debug",
"buildFlags": "--tags=local",
"program": "./main.go"
},
]
}
\ No newline at end of file
... ...
package company
import (
"encoding/json"
"time"
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/company/command"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
)
type CompanyServices struct {
type SyncDataCompanyService struct {
}
// type BusinessAdminCommand struct {
// //company:公司
// Module string `json:"module"`
// // add:添加,edit:编辑,setCompanyCharge:更改公司主管,changeAdmin换管理员
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// }
//从BusinessAdmins 接收消息,变更公司数据
//
func (c CompanyServices) BusinessAdminCompany() error {
return nil
func (c SyncDataCompanyService) FromBusinessAdmin(param *domain.MessageBody) error {
action := param.Module + "/" + param.Action
var err error
switch action {
case "company/add":
var param1 command.SaveCompanyCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.addCompany(&param1)
case "company/edit":
var param2 command.SaveCompanyCommand
err = json.Unmarshal(param.Datas, &param2)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.editCompany(&param2)
case "company/setCompanyCharge":
var param3 command.SetCompanyCharge
err = json.Unmarshal(param.Datas, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.setCompanyCharge(&param3)
case "company/changeAdmin":
var param3 command.ChangeAdminCommand
err = json.Unmarshal(param.Datas, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.changeAdmin(&param3)
default:
log.Logger.Error("action err:" + action)
}
return err
}
//addCompany
//从BusinessAdmins 接收消息 添加公司
func (c CompanyServices) addCompany(param *command.SaveCompanyCommand) error {
//module="company" action="add"
func (c SyncDataCompanyService) addCompany(param *command.SaveCompanyCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -63,11 +109,11 @@ func (c CompanyServices) addCompany(param *command.SaveCompanyCommand) error {
})
_, err = companyRepo.Insert(&newCompany)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
_, err = userRepo.Insert(&newUser)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -77,7 +123,8 @@ func (c CompanyServices) addCompany(param *command.SaveCompanyCommand) error {
//editCompany
//从BusinessAdmins 接收消息 更新公司
func (c CompanyServices) editCompany(param *command.SaveCompanyCommand) error {
//module="company" action="edit"
func (c SyncDataCompanyService) editCompany(param *command.SaveCompanyCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -100,14 +147,14 @@ func (c CompanyServices) editCompany(param *command.SaveCompanyCommand) error {
"id": param.Comapany.Id,
})
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
_, userList, err := userRepo.Find(map[string]interface{}{
"limit": 1,
"id": param.User.Id,
})
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
var (
newCompany *domain.Company
... ... @@ -146,32 +193,33 @@ func (c CompanyServices) editCompany(param *command.SaveCompanyCommand) error {
if len(companyList) > 0 {
_, err = companyRepo.Update(newCompany)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
} else {
_, err = companyRepo.Insert(newCompany)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
if len(userList) > 0 {
_, err = userRepo.Update(newUser)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
} else {
_, err = userRepo.Insert(newUser)
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
if err := transactionContext.CommitTransaction(); err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
return nil
}
func (srv CompanyServices) setCompanyCharge(param *command.SetCompanyCharge) error {
//module="company" action="setCompanyCharge"
func (srv SyncDataCompanyService) setCompanyCharge(param *command.SetCompanyCharge) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -193,14 +241,14 @@ func (srv CompanyServices) setCompanyCharge(param *command.SetCompanyCharge) err
})
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
for i := range companyList {
companyList[i].ChargeUserIds = param.ChargeUserIds
companyList[i].UpdateAt = time.Now()
_, err = companyRepo.Update(companyList[i])
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
if err := transactionContext.CommitTransaction(); err != nil {
... ... @@ -211,7 +259,8 @@ func (srv CompanyServices) setCompanyCharge(param *command.SetCompanyCharge) err
//changeAdmin
//从BusinessAdmins 接收消息 变更主管
func (srv CompanyServices) changeAdmin(param *command.ChangeAdminCommand) error {
//module="company" action="changeAdmin"
func (srv SyncDataCompanyService) changeAdmin(param *command.ChangeAdminCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -233,7 +282,7 @@ func (srv CompanyServices) changeAdmin(param *command.ChangeAdminCommand) error
"adminType": domain.UserTypeManager,
})
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//修改旧管理员 为普通用户
for i := range userList {
... ... @@ -241,7 +290,7 @@ func (srv CompanyServices) changeAdmin(param *command.ChangeAdminCommand) error
userList[i].UpdateAt = time.Now()
_, err := userRepo.Update(userList[i])
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
//获取新管理员
... ... @@ -251,7 +300,7 @@ func (srv CompanyServices) changeAdmin(param *command.ChangeAdminCommand) error
"account": param.UserAccount,
})
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//修改为管理员用户
for i := range userList2 {
... ... @@ -259,7 +308,7 @@ func (srv CompanyServices) changeAdmin(param *command.ChangeAdminCommand) error
userList[i].UpdateAt = time.Now()
_, err := userRepo.Update(userList[i])
if err != nil {
return err
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
if err := transactionContext.CommitTransaction(); err != nil {
... ...
package department
import (
"encoding/json"
"time"
"github.com/linmadan/egglib-go/core/application"
... ... @@ -11,10 +12,55 @@ import (
type SyncDataDepartmentService struct{}
// type BusinessAdminCommand struct {
// // department:部门
// Module string `json:"module"`
// // add:添加,edit:编辑,batchDelete:批量删除,import:导入部门
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// }
func (srv SyncDataDepartmentService) FromBusinessAdmin(param *domain.MessageBody) error {
action := param.Module + "/" + param.Action
var err error
switch action {
case "department/add":
var param1 command.AddDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.addDepartment(&param1)
case "department/edit":
var param1 command.EditDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.editDepartment(&param1)
case "department/batchDelete":
var param1 command.BatchDeleteCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchDeleteDepartment(&param1)
case "department/import":
var param1 []command.ImportDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.importDepartment(param1)
}
return err
}
//AddDepartment
//从BusinessAdmins 接收消息 添加部门
//module="department" action="add"
func (srv SyncDataDepartmentService) addDepartment(param command.AddDepartmentCommand) error {
func (srv SyncDataDepartmentService) addDepartment(param *command.AddDepartmentCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -56,7 +102,7 @@ func (srv SyncDataDepartmentService) addDepartment(param command.AddDepartmentCo
//EditDepartment
//从BusinessAdmins 接收消息 编辑部门
//module="department" action="edit"
func (srv SyncDataDepartmentService) editDepartment(param command.EditDepartmentCommand) error {
func (srv SyncDataDepartmentService) editDepartment(param *command.EditDepartmentCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -115,7 +161,7 @@ func (srv SyncDataDepartmentService) editDepartment(param command.EditDepartment
//batchDelete
//从BusinessAdmins 接收消息 删除部门
//module="department" action="batchDelete"
func (srv SyncDataDepartmentService) batchDeleteDepartment(param command.BatchDeleteCommand) error {
func (srv SyncDataDepartmentService) batchDeleteDepartment(param *command.BatchDeleteCommand) error {
if len(param.Ids) == 0 {
return nil
}
... ...
... ... @@ -72,3 +72,11 @@ func CreateRoleUserRepository(options map[string]interface{}) domain.RoleUserRep
}
return repository.NewRoleUserRepository(transactionContext)
}
func CreateReceivedMessageRepository(options map[string]interface{}) domain.ReceivedMessageRepository {
var transactionContext *pg.TransactionContext
if value, ok := options["transactionContext"]; ok {
transactionContext = value.(*pg.TransactionContext)
}
return repository.NewReceivedMessageRepository(transactionContext)
}
... ...
package user
import (
"encoding/json"
"time"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/user/command"
... ... @@ -11,10 +14,65 @@ import (
type SyncDataUserService struct{}
// type BusinessAdminCommand struct {
// // employee:员工
// Module string `json:"module"`
// // add:添加,edit:编辑,batchDelete:批量删除,batchForbid:批量禁用用户,batchRemove:批量更改用户部门,import:导入用户
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// }
func (srv SyncDataUserService) FromBusinessAdmin(param *domain.MessageBody) error {
action := param.Module + "/" + param.Action
var err error
switch action {
case "employee/add":
var param1 command.SaveUserCommand
err = json.Unmarshal(param.Datas, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.AddUser(&param1)
case "employee/edit":
var param2 command.SaveUserCommand
err = json.Unmarshal(param.Datas, &param2)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.UpdateUser(&param2)
case "employee/batchDelete":
var param3 command.BatchDeleteCommand
err = json.Unmarshal(param.Datas, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchDelete(&param3)
case "company/batchForbid":
var param4 command.BatchForbidCommand
err = json.Unmarshal(param.Datas, &param4)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchForbid(&param4)
case "company/import":
var param4 command.ImportUserCommand
err = json.Unmarshal(param.Datas, &param4)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.importUser(&param4)
default:
log.Logger.Error("action err:" + action)
}
return err
}
//AddUser
//从BusinessAdmins 接收消息 添加用户
//module="employee" action="add"
func (srv SyncDataUserService) AddUser(param command.SaveUserCommand) error {
func (srv SyncDataUserService) AddUser(param *command.SaveUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -59,7 +117,7 @@ func (srv SyncDataUserService) AddUser(param command.SaveUserCommand) error {
//UpdateUser
//从BusinessAdmins 接收消息 更新用户
//module="employee" action="edit"
func (srv SyncDataUserService) UpdateUser(param command.SaveUserCommand) error {
func (srv SyncDataUserService) UpdateUser(param *command.SaveUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ... @@ -119,7 +177,7 @@ func (srv SyncDataUserService) UpdateUser(param command.SaveUserCommand) error {
//batchDelete
//从BusinessAdmins 接收消息 删除用户
//module="employee" action="batchDelete"
func (srv SyncDataUserService) batchDelete(param command.BatchDeleteCommand) error {
func (srv SyncDataUserService) batchDelete(param *command.BatchDeleteCommand) error {
if len(param.Uids) == 0 {
return nil
}
... ... @@ -150,7 +208,7 @@ func (srv SyncDataUserService) batchDelete(param command.BatchDeleteCommand) err
//batchForbid
//从BusinessAdmins 接收消息 禁用,启用用户
//module="employee" action="batchForbid"
func (srv SyncDataUserService) batchForbid(param command.BatchForbidCommand) error {
func (srv SyncDataUserService) batchForbid(param *command.BatchForbidCommand) error {
if len(param.Uids) == 0 {
return nil
}
... ... @@ -190,7 +248,7 @@ func (srv SyncDataUserService) batchForbid(param command.BatchForbidCommand) err
//importUser
//从BusinessAdmins 接收消息 导入用户数据
//module="employee" action="import"
func (srv SyncDataUserService) importUser(param command.ImportUserCommand) error {
func (srv SyncDataUserService) importUser(param *command.ImportUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
... ...
package domain
import (
"encoding/json"
"time"
)
type ReceivedMessage struct {
MessageId int64 `json:"MessageId"`
MessageType string `json:"MessageType"`
MessageBody string `json:"MessageBody"`
OccurredOn time.Time `json:"OccurredOn"`
CreateAt time.Time `json:"-"`
}
type MessageBody struct {
Module string `json:"module"`
Action string `json:"action"`
Datas json.RawMessage `json:"datas"` // 具体的对象JSON数据
}
type ReceivedMessageRepository interface {
SaveMessage(param *ReceivedMessage) error
FindMessage(id int64) (*ReceivedMessage, error)
}
... ...
... ... @@ -6,11 +6,12 @@ import (
"github.com/linmadan/egglib-go/persistent/pg/comment"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/pg/models"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
"github.com/go-pg/pg/v10"
"github.com/go-pg/pg/v10/orm"
"github.com/linmadan/egglib-go/persistent/pg/hooks"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/pg/models"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
)
var DB *pg.DB
... ... @@ -27,6 +28,24 @@ func init() {
Logger: log.Logger,
})
}
if !constant.DISABLE_CREATE_TABLE {
tables := []interface{}{
&models.Company{},
&models.Department{},
&models.User{},
&models.ReceivedMessage{},
}
for _, model := range tables {
err := DB.Model(model).CreateTable(&orm.CreateTableOptions{
Temp: false,
IfNotExists: true,
FKConstraints: true,
})
if err != nil {
panic(err)
}
}
}
models := []interface{}{
&models.User{},
... ...
package models
import "time"
type ReceivedMessage struct {
tableName struct{} `pg:"received_message"`
MessageId int64 `pg:"pk:message_id"`
MessageType string
MessageBody string
OccurredOn time.Time
CreateAt time.Time
}
... ...
package repository
import (
"time"
"github.com/go-pg/pg/v10"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/pg/models"
)
type ReceivedMessageRepository struct {
transactionContext *pgTransaction.TransactionContext
}
var _ domain.ReceivedMessageRepository = (*ReceivedMessageRepository)(nil)
func NewReceivedMessageRepository(tx *pgTransaction.TransactionContext) *ReceivedMessageRepository {
return &ReceivedMessageRepository{
transactionContext: tx,
}
}
func (repo *ReceivedMessageRepository) SaveMessage(param *domain.ReceivedMessage) error {
message := &models.ReceivedMessage{
MessageId: param.MessageId,
MessageType: param.MessageType,
MessageBody: param.MessageBody,
OccurredOn: param.OccurredOn,
CreateAt: time.Now(),
}
tx := repo.transactionContext.PgTx
_, err := tx.Model(message).
Insert()
return err
}
func (repo *ReceivedMessageRepository) FindMessage(messageId int64) (*domain.ReceivedMessage, error) {
tx := repo.transactionContext.PgTx
receivedMessageModel := new(models.ReceivedMessage)
query := tx.Model(receivedMessageModel).
Where("message_id = ?", messageId)
if err := query.First(); err != nil {
if err != pg.ErrNoRows {
return nil, err
}
}
message := &domain.ReceivedMessage{
MessageId: receivedMessageModel.MessageId,
MessageType: receivedMessageModel.MessageType,
MessageBody: receivedMessageModel.MessageBody,
OccurredOn: receivedMessageModel.OccurredOn,
CreateAt: receivedMessageModel.CreateAt,
}
return message, nil
}
... ...
package handle
import (
"encoding/json"
"strconv"
"github.com/Shopify/sarama"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/company"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/department"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/user"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
)
func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
var receivedMsg domain.ReceivedMessage
log.Logger.Debug(string(msgData.Value))
err := json.Unmarshal(msgData.Value, &receivedMsg)
if err != nil {
log.Logger.Error("解析ReceivedMessage 失败" + err.Error())
return err
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
_ = transactionContext.StartTransaction()
defer func() {
_ = transactionContext.RollbackTransaction()
}()
msgRepo := factory.CreateReceivedMessageRepository(map[string]interface{}{
"transactionContext": transactionContext,
})
oldMsg, err := msgRepo.FindMessage(receivedMsg.MessageId)
if err != nil {
log.Logger.Error(" 查询旧消息发生错误 " + err.Error())
log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
return err
}
if oldMsg.MessageId == receivedMsg.MessageId {
log.Logger.Info("消息重复,message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
return nil
}
var msgBody domain.MessageBody
err = json.Unmarshal([]byte(receivedMsg.MessageBody), &msgBody)
if err != nil {
log.Logger.Error("解析messageBody发生错误" + err.Error())
return err
}
switch msgBody.Module {
case "company":
companySrv := company.SyncDataCompanyService{}
err = companySrv.FromBusinessAdmin(&msgBody)
if err != nil {
log.Logger.Error("处理company消息失败"+err.Error(), map[string]interface{}{
"data": msgBody,
})
return err
}
case "department":
departmentSrv := department.SyncDataDepartmentService{}
err = departmentSrv.FromBusinessAdmin(&msgBody)
if err != nil {
log.Logger.Error("处理department消息失败"+err.Error(), map[string]interface{}{
"data": msgBody,
})
return err
}
case "employee":
employeeSrv := user.SyncDataUserService{}
err = employeeSrv.FromBusinessAdmin(&msgBody)
if err != nil {
log.Logger.Error("处理employee消息失败"+err.Error(), map[string]interface{}{
"data": msgBody,
})
return err
}
}
err = msgRepo.SaveMessage(&receivedMsg)
if err != nil {
log.Logger.Error(" 保存新消息发生错误 " + err.Error())
log.Logger.Info("异常消息message_id=" + strconv.FormatInt(receivedMsg.MessageId, 10))
}
_ = transactionContext.CommitTransaction()
return nil
}
... ...
... ... @@ -5,12 +5,14 @@ import (
saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle"
)
func Run() {
messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
messageHandlerMap["demo-v1"] = Demo
//"指定topic" => 对应的处理方法
messageHandlerMap["mmm-business-admin-dev"] = handle.SyncDataBusinessAdmin
err := saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.SERVICE_NAME, messageHandlerMap, log.Logger)
log.Logger.Error(err.Error())
}
... ...