|
|
package consumer
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"github.com/zeromicro/go-zero/core/stores/redis"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
|
|
|
)
|
|
|
|
|
|
type ByteNoticeLogic struct {
|
|
|
ctx context.Context
|
|
|
svcCtx *svc.ServiceContext
|
|
|
conn transaction.Conn
|
|
|
}
|
|
|
|
|
|
func NewByteNoticeLogic(svcCtx *svc.ServiceContext) *ByteNoticeLogic {
|
|
|
return &ByteNoticeLogic{
|
|
|
ctx: context.Background(),
|
|
|
svcCtx: svcCtx,
|
|
|
conn: svcCtx.DefaultDBConn(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (logic *ByteNoticeLogic) Consume(key, value string) error {
|
|
|
fmt.Println(key, value)
|
|
|
notice := &domain.ObjectNotice{}
|
|
|
err := json.Unmarshal([]byte(value), notice)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//保存推送消息
|
|
|
_, err = logic.svcCtx.ObjectNoticeRepository.Insert(logic.ctx, logic.conn, notice)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//处理消息
|
|
|
err = logic.handleNotice(notice)
|
|
|
//更新处理结果
|
|
|
if err != nil {
|
|
|
notice.Status = domain.ObjectNoticeStatusError
|
|
|
notice.Message = err.Error()
|
|
|
} else {
|
|
|
notice.Status = domain.ObjectNoticeStatusDone
|
|
|
notice.Message = "OK"
|
|
|
}
|
|
|
_, _ = logic.svcCtx.ObjectNoticeRepository.Update(logic.ctx, logic.conn, notice)
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// handleNotice 处理消息
|
|
|
func (logic *ByteNoticeLogic) handleNotice(notice *domain.ObjectNotice) error {
|
|
|
//是否删除消息
|
|
|
if notice.IsDeletedEvent() {
|
|
|
return logic.handleDelete(notice)
|
|
|
}
|
|
|
accessToken, _ := types.TableAccessToken{CompanyId: notice.CompanyId}.GenerateToken()
|
|
|
//结构变更
|
|
|
if notice.StructChanged {
|
|
|
request := bytelib.ObjectTableSearchRequest{
|
|
|
Token: accessToken,
|
|
|
Module: bytelib.ModuleDigitalCenter,
|
|
|
}
|
|
|
if notice.ObjectType == "导入模块" {
|
|
|
request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable}
|
|
|
}
|
|
|
if notice.ObjectType == "拆解模块" {
|
|
|
request.TableTypes = []string{bytelib.SchemaTable}
|
|
|
request.ReturnGroupItem = true
|
|
|
}
|
|
|
if notice.ObjectType == "计算模块" {
|
|
|
request.TableTypes = []string{bytelib.CalculateItem, bytelib.CalculateSet}
|
|
|
request.ReturnGroupItem = true
|
|
|
request.ExcludeTables = []int{0}
|
|
|
}
|
|
|
list, err := logic.svcCtx.ByteMetadataService.ObjectTableSearch(logic.ctx, request)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
tableInfo, err := logic.svcCtx.ByteMetadataService.TableInfo(logic.ctx, &bytelib.TableInfoRequest{
|
|
|
Token: accessToken,
|
|
|
TableId: notice.TableId,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return nil
|
|
|
}
|
|
|
if len(list.List) > 0 {
|
|
|
objectTables := make([]*domain.ObjectTable, 0)
|
|
|
for _, item := range list.List {
|
|
|
if item.TableId == notice.TableId {
|
|
|
objectTables = append(objectTables, &domain.ObjectTable{
|
|
|
Id: item.Id,
|
|
|
TableId: item.TableId,
|
|
|
Name: item.Name,
|
|
|
TableType: item.TableType,
|
|
|
CompanyId: notice.CompanyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
Fields: tableInfo.Fields,
|
|
|
})
|
|
|
//父级节点
|
|
|
objectTables = append(objectTables, logic.getParents(notice.CompanyId, item.ParentId, list.List)...)
|
|
|
}
|
|
|
}
|
|
|
err = logic.saveTables(objectTables)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
//数据变更
|
|
|
if notice.DataChanged {
|
|
|
tableDataPreview, err := logic.svcCtx.ByteMetadataService.TableDataPreview(logic.ctx, &bytelib.TableDataPreviewRequest{
|
|
|
Token: accessToken,
|
|
|
ObjectId: int64(notice.TableId),
|
|
|
ObjectType: bytelib.ObjectMetaTable,
|
|
|
Where: &bytelib.TableQueryWhere{
|
|
|
PageNumber: 1,
|
|
|
PageSize: bytelib.MaxPageSize,
|
|
|
},
|
|
|
UseCache: true,
|
|
|
HiddenData: false,
|
|
|
})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
//加锁,避免图表在请求读取本地数据时报错
|
|
|
key := logic.svcCtx.Config.Name + ":bytelib:" + fmt.Sprintf("%v", tableDataPreview.ObjectId)
|
|
|
lock := redis.NewRedisLock(logic.svcCtx.Redis, key)
|
|
|
// 设置过期时间
|
|
|
lock.SetExpire(10 * 60)
|
|
|
acquire, err := lock.Acquire()
|
|
|
fmt.Println(acquire, err)
|
|
|
defer lock.Release()
|
|
|
err = transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
return logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
|
|
|
}, true)
|
|
|
fmt.Println(err)
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
func (logic *ByteNoticeLogic) getParents(companyId int64, parentId int, list []*bytelib.Table) []*domain.ObjectTable {
|
|
|
result := make([]*domain.ObjectTable, 0)
|
|
|
for _, item := range list {
|
|
|
if item.Id == parentId {
|
|
|
result = append(result, &domain.ObjectTable{
|
|
|
Id: item.Id,
|
|
|
TableId: item.TableId,
|
|
|
Name: item.Name,
|
|
|
TableType: item.TableType,
|
|
|
CompanyId: companyId,
|
|
|
ParentId: item.ParentId,
|
|
|
Flag: item.Flag,
|
|
|
Fields: item.Fields,
|
|
|
})
|
|
|
if item.ParentId > 0 {
|
|
|
result = append(result, logic.getParents(companyId, item.ParentId, list)...)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
// handleDelete 删除
|
|
|
func (logic *ByteNoticeLogic) handleDelete(notice *domain.ObjectNotice) error {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, logic.conn, notice.TableId)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
objectTable.RemoteDeleted = 1
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, logic.conn, objectTable)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// saveTables 保存表结构
|
|
|
func (logic *ByteNoticeLogic) saveTables(tables []*domain.ObjectTable) error {
|
|
|
err := transaction.UseTrans(logic.ctx, logic.conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
|
|
|
if len(tables) > 0 {
|
|
|
for _, item := range tables {
|
|
|
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOne(logic.ctx, conn, item.Id)
|
|
|
if err == nil && objectTable.Id > 0 {
|
|
|
item.Id = objectTable.Id
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
_, err := logic.svcCtx.ObjectTableRepository.Insert(logic.ctx, conn, item)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
}, true)
|
|
|
return err
|
|
|
} |
...
|
...
|
|