|
|
package service
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"github.com/linmadan/egglib-go/core/application"
|
|
|
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/cache"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
|
|
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
|
|
|
"math"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
|
...
|
...
|
@@ -15,12 +20,12 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
|
if err != nil {
|
|
|
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
}
|
|
|
//if err := transactionContext.StartTransaction(); err != nil {
|
|
|
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
//}
|
|
|
//defer func() {
|
|
|
// transactionContext.RollbackTransaction()
|
|
|
//}()
|
|
|
if err := transactionContext.StartTransaction(); err != nil {
|
|
|
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
}
|
|
|
defer func() {
|
|
|
transactionContext.RollbackTransaction()
|
|
|
}()
|
|
|
|
|
|
var (
|
|
|
dataChanged = true
|
...
|
...
|
@@ -31,29 +36,54 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
|
tableId := 0
|
|
|
switch data.Type {
|
|
|
case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent:
|
|
|
// dataChanged = true
|
|
|
tableId = data.Table.TableId
|
|
|
case domain.QuerySetUpdateEvent:
|
|
|
case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
|
|
|
tableId = data.QuerySet.QuerySetInfo.BindTableId
|
|
|
// structChanged = true
|
|
|
if data.QuerySet.Status != domain.StatusOn {
|
|
|
return nil, nil
|
|
|
}
|
|
|
if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
return nil, nil
|
|
|
}
|
|
|
case domain.QuerySetUpdateStatusEvent:
|
|
|
tableId = data.QuerySet.QuerySetInfo.BindTableId
|
|
|
if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
return nil, nil
|
|
|
}
|
|
|
case domain.TableApplyOnEvent:
|
|
|
tableId = data.Table.TableId
|
|
|
dataChanged = false
|
|
|
case domain.QuerySetDeleteEvent:
|
|
|
tableId = data.Table.TableId
|
|
|
}
|
|
|
if tableId == 0 {
|
|
|
return nil, nil
|
|
|
}
|
|
|
var notifyData = struct {
|
|
|
DataChanged bool `json:"dataChanged"`
|
|
|
StructChanged bool `json:"structChanged"`
|
|
|
TableId int `json:"tableId"`
|
|
|
Event string `json:"event"`
|
|
|
TableAffectedList []int `json:"tableAffectedList"`
|
|
|
}{
|
|
|
var notifyData = NotifyData{
|
|
|
DataChanged: dataChanged,
|
|
|
StructChanged: structChanged,
|
|
|
TableId: tableId,
|
|
|
Event: data.Type.ToString(),
|
|
|
Metadata: cmd.EventTable.Metadata,
|
|
|
}
|
|
|
// tableId 相关联的
|
|
|
tableRepository, _, _ := factory.FastPgTable(transactionContext, 0)
|
|
|
tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
|
|
|
if table == nil && data.Table != nil {
|
|
|
table = data.Table
|
|
|
notifyData.CompanyId = table.Context.CompanyId
|
|
|
}
|
|
|
if table != nil {
|
|
|
notifyData.TableType = domain.EnumsDescription(domain.ObjectTypeMap, table.TableType)
|
|
|
switch domain.TableType(table.TableType) {
|
|
|
case domain.MainTable, domain.SubTable, domain.SideTable:
|
|
|
notifyData.ObjectType = "导入模块"
|
|
|
case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
|
|
|
notifyData.ObjectType = "拆解模块"
|
|
|
case domain.CalculateItem, domain.CalculateSet:
|
|
|
notifyData.ObjectType = "计算模块"
|
|
|
}
|
|
|
}
|
|
|
|
|
|
_, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
|
|
|
if err != nil {
|
|
|
return nil, err
|
...
|
...
|
@@ -63,22 +93,103 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
|
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
|
|
|
tree := tableDependTree.Tree
|
|
|
|
|
|
//tableService := tableservice.NewTableService(nil)
|
|
|
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
|
|
|
var mapTableQuerySet = make(map[int]*domain.QuerySet)
|
|
|
if len(tree) > 0 && cmd.EventTable.QuerySet != nil {
|
|
|
_, querySets, _ := querySetRepository.Find(map[string]interface{}{
|
|
|
"types": []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
|
|
|
"bindTableIds": tree,
|
|
|
"status": domain.StatusOn,
|
|
|
})
|
|
|
for _, q := range querySets {
|
|
|
mapTableQuerySet[q.QuerySetInfo.BindTableId] = q
|
|
|
}
|
|
|
}
|
|
|
|
|
|
for i := range tree {
|
|
|
cache.DefaultDataTableCacheService.DeleteDataTable(tree[i])
|
|
|
// fresh cache
|
|
|
//tableService.TablePreview(data.Context, &tablecommand.TablePreviewCommand{
|
|
|
// TableId: tree[i],
|
|
|
// ObjectType: domain.ObjectMetaTable,
|
|
|
// PageSize: 10000,
|
|
|
// PageNumber: 0,
|
|
|
// UseCache: true,
|
|
|
//})
|
|
|
table, ok := tableDependencyService.TableMap[tree[i]]
|
|
|
if !ok {
|
|
|
continue
|
|
|
}
|
|
|
if notifyData.CompanyId == 0 {
|
|
|
notifyData.CompanyId = table.Context.CompanyId
|
|
|
}
|
|
|
switch table.TableType {
|
|
|
case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
|
|
|
if table.TableInfo != nil && table.TableInfo.ApplyOnModule&domain.ModuleDigitalCenter == 0 {
|
|
|
continue
|
|
|
}
|
|
|
break
|
|
|
case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
|
|
|
continue
|
|
|
case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
|
|
|
if querySet, ok := mapTableQuerySet[tree[i]]; !ok {
|
|
|
continue
|
|
|
} else {
|
|
|
// 不是当前的查询集。且状态为关闭的都补推送
|
|
|
if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
|
|
|
}
|
|
|
found := false
|
|
|
for _, id := range notifyData.TableAffectedList {
|
|
|
if id == tableId {
|
|
|
found = true
|
|
|
}
|
|
|
}
|
|
|
if !found {
|
|
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
|
|
|
}
|
|
|
lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
|
|
|
if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
|
|
|
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
|
|
|
if t, ok := notifyData.Retry(); ok {
|
|
|
tableEventService.TimingWheel.SetTimer(notifyData.Key(), ¬ifyData, t)
|
|
|
log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
|
|
|
}
|
|
|
|
|
|
//if err := transactionContext.CommitTransaction(); err != nil {
|
|
|
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
//}
|
|
|
}
|
|
|
if err := transactionContext.CommitTransaction(); err != nil {
|
|
|
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
}
|
|
|
return nil, nil
|
|
|
}
|
|
|
|
|
|
type NotifyData struct {
|
|
|
DataChanged bool `json:"dataChanged"` // 数据有变化
|
|
|
StructChanged bool `json:"structChanged"` // 结构有变化
|
|
|
TableId int `json:"tableId"` // 表ID
|
|
|
TableType string `json:"tableType"` // 表类型:导入模块(主表,副表,分表) 拆解模块(方案、子过程、计算表) 计算模块(计算项,计算集)
|
|
|
ObjectType string `json:"objectType"` // 导入模块、拆解模块、计算模块
|
|
|
CompanyId int `json:"companyId"` // 公司
|
|
|
Event string `json:"event"` // 事件名称
|
|
|
TableAffectedList []int `json:"tableAffectedList"` // 级联影响到的表
|
|
|
Metadata map[string]interface{} `json:"metadata"` // 元数据
|
|
|
sendRetry int
|
|
|
}
|
|
|
|
|
|
func (n *NotifyData) Key() string {
|
|
|
return fmt.Sprintf("delay:notify:table:%d", n.TableId)
|
|
|
}
|
|
|
|
|
|
func (n *NotifyData) Retry() (time.Duration, bool) {
|
|
|
n.sendRetry++
|
|
|
if n.sendRetry > 3 {
|
|
|
return n.Delay(), false
|
|
|
}
|
|
|
if n.sendRetry == 1 {
|
|
|
return n.Delay(), true
|
|
|
}
|
|
|
return n.Delay() * time.Duration(int(math.Pow(float64(2), float64(n.sendRetry)))), true
|
|
|
}
|
|
|
|
|
|
func (n *NotifyData) Delay() time.Duration {
|
|
|
return time.Second * 10
|
|
|
}
|
|
|
|
|
|
func (n *NotifyData) RetryTime() int {
|
|
|
return n.sendRetry
|
|
|
} |
...
|
...
|
|