作者 庄敏学

kafka

不能预览此文件类型
... ... @@ -20,38 +20,38 @@ type SyncDataCompanyService struct {
// // add:添加,edit:编辑,setCompanyCharge:更改公司主管,changeAdmin换管理员
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// Data json.RawMessage `json:"data"`
// }
//从BusinessAdmins 接收消息,变更公司数据
// 从BusinessAdmins 接收消息,变更公司数据
func (c SyncDataCompanyService) FromBusinessAdmin(param *domain.MessageBody) error {
action := param.Module + "/" + param.Action
var err error
switch action {
case "company/add":
var param1 command.SaveCompanyCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.addCompany(&param1)
case "company/edit":
var param2 command.SaveCompanyCommand
err = json.Unmarshal(param.Datas, &param2)
err = json.Unmarshal(param.Data, &param2)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.editCompany(&param2)
case "company/setCompanyCharge":
var param3 command.SetCompanyCharge
err = json.Unmarshal(param.Datas, &param3)
err = json.Unmarshal(param.Data, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = c.setCompanyCharge(&param3)
case "company/changeAdmin":
var param3 command.ChangeAdminCommand
err = json.Unmarshal(param.Datas, &param3)
err = json.Unmarshal(param.Data, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
... ... @@ -63,9 +63,9 @@ func (c SyncDataCompanyService) FromBusinessAdmin(param *domain.MessageBody) err
return err
}
//addCompany
//从BusinessAdmins 接收消息 添加公司
//module="company" action="add"
// addCompany
// 从BusinessAdmins 接收消息 添加公司
// module="company" action="add"
func (c SyncDataCompanyService) addCompany(param *command.SaveCompanyCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -121,9 +121,9 @@ func (c SyncDataCompanyService) addCompany(param *command.SaveCompanyCommand) er
return nil
}
//editCompany
//从BusinessAdmins 接收消息 更新公司
//module="company" action="edit"
// editCompany
// 从BusinessAdmins 接收消息 更新公司
// module="company" action="edit"
func (c SyncDataCompanyService) editCompany(param *command.SaveCompanyCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -218,7 +218,7 @@ func (c SyncDataCompanyService) editCompany(param *command.SaveCompanyCommand) e
return nil
}
//module="company" action="setCompanyCharge"
// module="company" action="setCompanyCharge"
func (srv SyncDataCompanyService) setCompanyCharge(param *command.SetCompanyCharge) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -257,9 +257,9 @@ func (srv SyncDataCompanyService) setCompanyCharge(param *command.SetCompanyChar
return nil
}
//changeAdmin
//从BusinessAdmins 接收消息 变更主管
//module="company" action="changeAdmin"
// changeAdmin
// 从BusinessAdmins 接收消息 变更主管
// module="company" action="changeAdmin"
func (srv SyncDataCompanyService) changeAdmin(param *command.ChangeAdminCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ...
... ... @@ -18,7 +18,7 @@ type SyncDataDepartmentService struct{}
// // add:添加,edit:编辑,batchDelete:批量删除,import:导入部门
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// Data json.RawMessage `json:"data"`
// }
func (srv SyncDataDepartmentService) FromBusinessAdmin(param *domain.MessageBody) error {
... ... @@ -27,28 +27,28 @@ func (srv SyncDataDepartmentService) FromBusinessAdmin(param *domain.MessageBody
switch action {
case "department/add":
var param1 command.AddDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.addDepartment(&param1)
case "department/edit":
var param1 command.EditDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.editDepartment(&param1)
case "department/batchDelete":
var param1 command.BatchDeleteCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchDeleteDepartment(&param1)
case "department/import":
var param1 []command.ImportDepartmentCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
... ... @@ -57,9 +57,9 @@ func (srv SyncDataDepartmentService) FromBusinessAdmin(param *domain.MessageBody
return err
}
//AddDepartment
//从BusinessAdmins 接收消息 添加部门
//module="department" action="add"
// AddDepartment
// 从BusinessAdmins 接收消息 添加部门
// module="department" action="add"
func (srv SyncDataDepartmentService) addDepartment(param *command.AddDepartmentCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -99,9 +99,9 @@ func (srv SyncDataDepartmentService) addDepartment(param *command.AddDepartmentC
return nil
}
//EditDepartment
//从BusinessAdmins 接收消息 编辑部门
//module="department" action="edit"
// EditDepartment
// 从BusinessAdmins 接收消息 编辑部门
// module="department" action="edit"
func (srv SyncDataDepartmentService) editDepartment(param *command.EditDepartmentCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -158,9 +158,9 @@ func (srv SyncDataDepartmentService) editDepartment(param *command.EditDepartmen
return nil
}
//batchDelete
//从BusinessAdmins 接收消息 删除部门
//module="department" action="batchDelete"
// batchDelete
// 从BusinessAdmins 接收消息 删除部门
// module="department" action="batchDelete"
func (srv SyncDataDepartmentService) batchDeleteDepartment(param *command.BatchDeleteCommand) error {
if len(param.Ids) == 0 {
return nil
... ... @@ -188,9 +188,9 @@ func (srv SyncDataDepartmentService) batchDeleteDepartment(param *command.BatchD
return nil
}
//importDepartment
//从BusinessAdmins 接收消息 导入部门数据
//module="department" action="import"
// importDepartment
// 从BusinessAdmins 接收消息 导入部门数据
// module="department" action="import"
func (srv SyncDataDepartmentService) importDepartment(param []command.ImportDepartmentCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ...
... ... @@ -20,7 +20,7 @@ type SyncDataUserService struct{}
// // add:添加,edit:编辑,batchDelete:批量删除,batchForbid:批量禁用用户,batchRemove:批量更改用户部门,import:导入用户
// Action string `json:"action"`
// // 具体的对象JSON数据
// Datas json.RawMessage `json:"data"`
// Data json.RawMessage `json:"data"`
// }
func (srv SyncDataUserService) FromBusinessAdmin(param *domain.MessageBody) error {
... ... @@ -29,35 +29,35 @@ func (srv SyncDataUserService) FromBusinessAdmin(param *domain.MessageBody) erro
switch action {
case "employee/add":
var param1 command.SaveUserCommand
err = json.Unmarshal(param.Datas, &param1)
err = json.Unmarshal(param.Data, &param1)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.AddUser(&param1)
case "employee/edit":
var param2 command.SaveUserCommand
err = json.Unmarshal(param.Datas, &param2)
err = json.Unmarshal(param.Data, &param2)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.UpdateUser(&param2)
case "employee/batchDelete":
var param3 command.BatchDeleteCommand
err = json.Unmarshal(param.Datas, &param3)
err = json.Unmarshal(param.Data, &param3)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchDelete(&param3)
case "company/batchForbid":
case "employee/batchForbid":
var param4 command.BatchForbidCommand
err = json.Unmarshal(param.Datas, &param4)
err = json.Unmarshal(param.Data, &param4)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
err = srv.batchForbid(&param4)
case "company/import":
case "employee/import":
var param4 command.ImportUserCommand
err = json.Unmarshal(param.Datas, &param4)
err = json.Unmarshal(param.Data, &param4)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
... ... @@ -69,9 +69,9 @@ func (srv SyncDataUserService) FromBusinessAdmin(param *domain.MessageBody) erro
return err
}
//AddUser
//从BusinessAdmins 接收消息 添加用户
//module="employee" action="add"
// AddUser
// 从BusinessAdmins 接收消息 添加用户
// module="employee" action="add"
func (srv SyncDataUserService) AddUser(param *command.SaveUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -114,9 +114,9 @@ func (srv SyncDataUserService) AddUser(param *command.SaveUserCommand) error {
return nil
}
//UpdateUser
//从BusinessAdmins 接收消息 更新用户
//module="employee" action="edit"
// UpdateUser
// 从BusinessAdmins 接收消息 更新用户
// module="employee" action="edit"
func (srv SyncDataUserService) UpdateUser(param *command.SaveUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ... @@ -174,9 +174,9 @@ func (srv SyncDataUserService) UpdateUser(param *command.SaveUserCommand) error
return nil
}
//batchDelete
//从BusinessAdmins 接收消息 删除用户
//module="employee" action="batchDelete"
// batchDelete
// 从BusinessAdmins 接收消息 删除用户
// module="employee" action="batchDelete"
func (srv SyncDataUserService) batchDelete(param *command.BatchDeleteCommand) error {
if len(param.Uids) == 0 {
return nil
... ... @@ -205,9 +205,9 @@ func (srv SyncDataUserService) batchDelete(param *command.BatchDeleteCommand) er
return nil
}
//batchForbid
//从BusinessAdmins 接收消息 禁用,启用用户
//module="employee" action="batchForbid"
// batchForbid
// 从BusinessAdmins 接收消息 禁用,启用用户
// module="employee" action="batchForbid"
func (srv SyncDataUserService) batchForbid(param *command.BatchForbidCommand) error {
if len(param.Uids) == 0 {
return nil
... ... @@ -245,9 +245,9 @@ func (srv SyncDataUserService) batchForbid(param *command.BatchForbidCommand) er
return nil
}
//importUser
//从BusinessAdmins 接收消息 导入用户数据
//module="employee" action="import"
// importUser
// 从BusinessAdmins 接收消息 导入用户数据
// module="employee" action="import"
func (srv SyncDataUserService) importUser(param *command.ImportUserCommand) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
... ...
... ... @@ -16,7 +16,7 @@ type ReceivedMessage struct {
type MessageBody struct {
Module string `json:"module"`
Action string `json:"action"`
Datas json.RawMessage `json:"datas"` // 具体的对象JSON数据
Data json.RawMessage `json:"data"` // 具体的对象JSON数据
}
type ReceivedMessageRepository interface {
... ...
... ... @@ -15,13 +15,12 @@ import (
func SyncDataBusinessAdmin(msgData *sarama.ConsumerMessage) error {
var receivedMsg domain.ReceivedMessage
log.Logger.Debug(string(msgData.Value))
err := json.Unmarshal(msgData.Value, &receivedMsg)
if err != nil {
log.Logger.Error("解析ReceivedMessage 失败" + err.Error())
return err
}
log.Logger.Debug("解析message:", map[string]interface{}{"data": receivedMsg})
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
... ...
... ... @@ -23,10 +23,3 @@ func Run() {
err := saramaConsumer.StartConsume(host, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger)
log.Logger.Error(err.Error())
}
func Demo(message *sarama.ConsumerMessage) error {
k := string(message.Key)
v := string(message.Value)
log.Logger.Debug("message===>" + k + ":" + v)
return nil
}
... ...