|
|
package consumer
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
|
|
|
)
|
|
|
|
|
|
type ByteNoticeLogic struct {
|
|
|
ctx context.Context
|
|
|
svcCtx *svc.ServiceContext
|
|
|
conn transaction.Conn
|
|
|
}
|
|
|
|
|
|
func NewByteNoticeLogic(svcCtx *svc.ServiceContext) *ByteNoticeLogic {
|
|
|
return &ByteNoticeLogic{
|
|
|
ctx: context.Background(),
|
|
|
svcCtx: svcCtx,
|
|
|
conn: svcCtx.DefaultDBConn(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (logic *ByteNoticeLogic) Consume(key, value string) error {
|
|
|
fmt.Println(key, value)
|
|
|
notice := &domain.ObjectNotice{}
|
|
|
err := json.Unmarshal([]byte(value), notice)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//保存推送消息
|
|
|
_, err = logic.svcCtx.ObjectNoticeRepository.Insert(logic.ctx, logic.conn, notice)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//处理消息
|
|
|
err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
return logic.handleNotice(conn, notice)
|
|
|
}, true)
|
|
|
//更新处理结果
|
|
|
if err != nil {
|
|
|
notice.Status = domain.ObjectNoticeStatusError
|
|
|
notice.Message = err.Error()
|
|
|
} else {
|
|
|
notice.Status = domain.ObjectNoticeStatusDone
|
|
|
notice.Message = "OK"
|
|
|
}
|
|
|
_, _ = logic.svcCtx.ObjectNoticeRepository.Update(logic.ctx, logic.conn, notice)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// handleNotice 处理消息
|
|
|
func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain.ObjectNotice) error {
|
|
|
//是否删除消息
|
|
|
if notice.IsDeletedEvent() {
|
|
|
return logic.handleDelete(conn, notice)
|
|
|
}
|
|
|
accessToken, _ := types.TableAccessToken{CompanyId: notice.CompanyId}.GenerateToken()
|
|
|
//结构变更
|
|
|
if notice.StructChanged {
|
|
|
request := bytelib.ObjectTableSearchRequest{
|
|
|
Token: accessToken,
|
|
|
Module: bytelib.ModuleDigitalCenter,
|
|
|
}
|
|
|
if notice.ObjectType == "导入模块" {
|
|
|
request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable}
|
|
|
}
|
|
|
if notice.ObjectType == "拆解模块" {
|
|
|
request.TableTypes = []string{bytelib.SchemaTable}
|
|
|
request.ReturnGroupItem = true
|
|
|
}
|
|
|
if notice.ObjectType == "计算模块" {
|
|
|
request.TableTypes = []string{bytelib.CalculateItem, bytelib.CalculateSet}
|
|
|
request.ReturnGroupItem = true
|
|
|
request.ExcludeTables = []int{0}
|
|
|
}
|
|
|
list, err := logic.svcCtx.ByteMetadataService.ObjectTableSearch(logic.ctx, request)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
tableInfo, err := logic.svcCtx.ByteMetadataService.TableInfo(logic.ctx, &bytelib.TableInfoRequest{
|
|
|
Token: accessToken,
|
|
|
TableId: notice.TableId,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return nil
|
|
|
}
|
|
|
if len(list.List) > 0 {
|
|
|
objectTables := make([]*domain.ObjectTable, 0)
|
|
|
for _, item := range list.List {
|
|
|
if item.TableId == notice.TableId {
|
|
|
objectTables = append(objectTables, &domain.ObjectTable{
|
|
|
Id: item.Id,
|
|
|
TableId: item.TableId,
|
|
|
Name: item.Name,
|
|
|
TableType: item.TableType,
|
|
|
CompanyId: notice.CompanyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
})
|
|
|
//父级节点
|
|
|
objectTables = append(objectTables, logic.getParents(notice.CompanyId, item.ParentId, list.List)...)
|
|
|
}
|
|
|
}
|
|
|
err = logic.saveTables(conn, objectTables)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
//保存字段
|
|
|
_, err = logic.saveFields(conn, &domain.ObjectField{
|
|
|
Id: int64(tableInfo.TableId),
|
|
|
Name: tableInfo.Name,
|
|
|
Fields: tableInfo.Fields,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
//数据变更
|
|
|
if notice.DataChanged {
|
|
|
data := &types.SyncTableDataPusher{
|
|
|
CompanyId: notice.CompanyId,
|
|
|
ObjectId: notice.TableId,
|
|
|
}
|
|
|
mBytes, _ := json.Marshal(data)
|
|
|
_, err := logic.svcCtx.Redis.LpushCtx(logic.ctx, logic.svcCtx.Config.Name+":table_data", string(mBytes))
|
|
|
return err
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []*bytelib.Table) []*domain.ObjectTable {
|
|
|
result := make([]*domain.ObjectTable, 0)
|
|
|
for _, item := range list {
|
|
|
if item.Id == parentId {
|
|
|
result = append(result, &domain.ObjectTable{
|
|
|
Id: item.Id,
|
|
|
TableId: item.TableId,
|
|
|
Name: item.Name,
|
|
|
TableType: item.TableType,
|
|
|
CompanyId: companyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
})
|
|
|
if item.ParentId > 0 {
|
|
|
result = append(result, logic.getParents(companyId, item.ParentId, list)...)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
// handleDelete 删除
|
|
|
func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain.ObjectNotice) error {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, notice.TableId)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
objectTable.RemoteDeleted = 1
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
//是否有使用数据源
|
|
|
used, err := logic.svcCtx.ChartSettingRepository.CheckUseDataSource(logic.ctx, conn, notice.TableId)
|
|
|
if err == nil && !used {
|
|
|
err = logic.svcCtx.ObjectTableDataRepository.DropTable(logic.ctx, conn, notice.TableId)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// saveTables 保存表结构
|
|
|
func (logic *ByteNoticeLogic) saveTables(conn transaction.Conn, tables []*domain.ObjectTable) error {
|
|
|
if len(tables) > 0 {
|
|
|
for _, item := range tables {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
item.Id = objectTable.Id
|
|
|
item.Version = objectTable.Version + 1
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// saveFields 保存表字段
|
|
|
func (logic *ByteNoticeLogic) saveFields(conn transaction.Conn, objectField *domain.ObjectField) (*domain.ObjectField, error) {
|
|
|
mField, err := logic.svcCtx.ObjectFieldRepository.FindOne(logic.ctx, conn, objectField.Id)
|
|
|
if err == nil && mField.Id > 0 { //已存在 - 更新
|
|
|
objectField.Version = mField.Version + 1
|
|
|
return logic.svcCtx.ObjectFieldRepository.Update(logic.ctx, conn, objectField)
|
|
|
} else {
|
|
|
return logic.svcCtx.ObjectFieldRepository.Insert(logic.ctx, conn, objectField)
|
|
|
}
|
|
|
} |
...
|
...
|
|