作者 yangfu

feat:sync_data

package service
import (
"fmt"
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/cache"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"math"
"time"
)
func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
... ... @@ -15,12 +19,12 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
//if err := transactionContext.StartTransaction(); err != nil {
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
//}
//defer func() {
// transactionContext.RollbackTransaction()
//}()
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
var (
dataChanged = true
... ... @@ -31,29 +35,39 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
tableId := 0
switch data.Type {
case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent:
// dataChanged = true
tableId = data.Table.TableId
case domain.QuerySetUpdateEvent:
case domain.QuerySetUpdateEvent, domain.QuerySetUpdateStatusEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
// structChanged = true
if data.QuerySet.Status != domain.StatusOn {
return nil, nil
}
if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
return nil, nil
}
}
if tableId == 0 {
return nil, nil
}
var notifyData = struct {
DataChanged bool `json:"dataChanged"`
StructChanged bool `json:"structChanged"`
TableId int `json:"tableId"`
Event string `json:"event"`
TableAffectedList []int `json:"tableAffectedList"`
}{
var notifyData = NotifyData{
DataChanged: dataChanged,
StructChanged: structChanged,
TableId: tableId,
Event: data.Type.ToString(),
}
// tableId 相关联的
tableRepository, _, _ := factory.FastPgTable(transactionContext, 0)
tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
if table != nil {
notifyData.TableType = domain.EnumsDescription(domain.ObjectTypeMap, table.TableType)
switch domain.TableType(table.TableType) {
case domain.MainTable, domain.SubTable, domain.SideTable:
notifyData.ObjectType = "导入模块"
case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
notifyData.ObjectType = "拆解模块"
case domain.CalculateItem, domain.CalculateSet:
notifyData.ObjectType = "计算模块"
}
}
_, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
if err != nil {
return nil, err
... ... @@ -63,22 +77,93 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
tree := tableDependTree.Tree
//tableService := tableservice.NewTableService(nil)
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
var mapTableQuerySet = make(map[int]*domain.QuerySet)
if len(tree) > 0 && cmd.EventTable.QuerySet != nil {
_, querySets, _ := querySetRepository.Find(map[string]interface{}{
"types": []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
"bindTableIds": tree,
"status": domain.StatusOn,
})
for _, q := range querySets {
mapTableQuerySet[q.QuerySetInfo.BindTableId] = q
}
}
for i := range tree {
cache.DefaultDataTableCacheService.DeleteDataTable(tree[i])
// fresh cache
//tableService.TablePreview(data.Context, &tablecommand.TablePreviewCommand{
// TableId: tree[i],
// ObjectType: domain.ObjectMetaTable,
// PageSize: 10000,
// PageNumber: 0,
// UseCache: true,
//})
table, ok := tableDependencyService.TableMap[tree[i]]
if !ok {
continue
}
if notifyData.CompanyId == 0 {
notifyData.CompanyId = table.Context.CompanyId
}
switch table.TableType {
case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
if table.TableInfo.ApplyOnModule&domain.ModuleDigitalCenter == 0 {
continue
}
break
case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
continue
case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
if querySet, ok := mapTableQuerySet[tree[i]]; !ok {
continue
} else {
// 不是当前的查询集。且状态为关闭的都补推送
if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
continue
}
}
}
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
}
lib := digitalLib.NewDigitalLib("")
if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
if t, ok := notifyData.Retry(); ok {
tableEventService.TimingWheel.SetTimer(notifyData.Key(), &notifyData, t)
log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
}
//if err := transactionContext.CommitTransaction(); err != nil {
// return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
//}
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil, nil
}
type NotifyData struct {
DataChanged bool `json:"dataChanged"` // 数据有变化
StructChanged bool `json:"structChanged"` // 结构有变化
TableId int `json:"tableId"` // 表ID
TableType string `json:"tableType"` // 表类型:导入模块(主表,副表,分表) 拆解模块(方案、子过程、计算表) 计算模块(计算项,计算集)
ObjectType string `json:"objectType"` // 导入模块、拆解模块、计算模块
CompanyId int `json:"companyId"` // 公司
Event string `json:"event"` // 事件名称
TableAffectedList []int `json:"tableAffectedList"` // 级联影响到的表
sendRetry int
}
func (n *NotifyData) Key() string {
return fmt.Sprintf("delay:notify:table:%d", n.TableId)
}
func (n *NotifyData) Retry() (time.Duration, bool) {
n.sendRetry++
if n.sendRetry > 3 {
return n.Delay(), false
}
if n.sendRetry == 1 {
return n.Delay(), true
}
return n.Delay() * time.Duration(int(math.Pow(float64(2), float64(n.sendRetry)))), true
}
func (n *NotifyData) Delay() time.Duration {
return time.Second * 10
}
func (n *NotifyData) RetryTime() int {
return n.sendRetry
}
... ...
... ... @@ -5,17 +5,21 @@ import (
"fmt"
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"github.com/zeromicro/go-zero/core/collection"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/event/command"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
tablecommand "gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/table/command"
tableservice "gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/table/service"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/cache"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"time"
)
type TableEventService struct {
TimingWheel *collection.TimingWheel
}
func (tableEventService *TableEventService) Handler(ctx *domain.Context, cmd *command.TableEventCommand) (interface{}, error) {
... ... @@ -37,6 +41,8 @@ func (tableEventService *TableEventService) Handler(ctx *domain.Context, cmd *co
tableId = data.Table.TableId
case domain.QuerySetUpdateEvent:
tableId = data.QuerySet.QuerySetInfo.BindTableId
default:
return nil, err
}
if tableId == 0 {
return nil, nil
... ... @@ -129,5 +135,25 @@ func (tableEventService *TableEventService) HandlerTableAffectedMarkToConflictSt
func NewTableEventService(options map[string]interface{}) *TableEventService {
svr := &TableEventService{}
delayNotifyTimingWheel, _ := collection.NewTimingWheel(time.Second, 10, svr.TimingWheelFunc)
svr.TimingWheel = delayNotifyTimingWheel
return svr
}
func (tableEventService *TableEventService) TimingWheelFunc(key, value interface{}) {
v, ok := value.(*NotifyData)
if !ok {
return
}
lib := digitalLib.NewDigitalLib("")
if _, err := lib.SyncNotice(digitalLib.RequestSyncNotice{Body: v}); err != nil {
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
if t, ok := v.Retry(); ok {
if err = tableEventService.TimingWheel.SetTimer(v.Key(), v, t); err == nil {
log.Logger.Debug(fmt.Sprintf("通知数控重试(%d) key:%s wait:%vs", v.RetryTime(), v.Key(), t.Seconds()))
return
}
}
}
tableEventService.TimingWheel.RemoveTimer(v.Key())
}
... ...
... ... @@ -28,6 +28,9 @@ var BYTE_CORE_HOST = "http://192.168.100.34:8303"
var AUTH_SERVER_HOST = "http://digital-platform-dev.fjmaimaimai.com"
// 数控服务域名地址
var DIGITAL_SERVER_HOST = "http://digital-platform-dev.fjmaimaimai.com"
var BLACK_LIST_USER int64
var BLACK_LIST_COMPANY int64
var WHITE_LIST_USERS []int
... ... @@ -52,6 +55,7 @@ func init() {
SERVICE_ENV = Configurator.DefaultString("SERVICE_ENV", SERVICE_ENV)
HTTP_PORT = Configurator.DefaultInt("HTTP_PORT", HTTP_PORT)
AUTH_SERVER_HOST = Configurator.DefaultString("AUTH_SERVER_HOST", AUTH_SERVER_HOST)
DIGITAL_SERVER_HOST = Configurator.DefaultString("DIGITAL_SERVER_HOST", DIGITAL_SERVER_HOST)
SERVICE_NAME = fmt.Sprintf("%v-%v", SERVICE_NAME, SERVICE_ENV)
PPROF_ON = Configurator.DefaultBool("PPROF_ON", PPROF_ON)
CACHE_PREFIX = SERVICE_NAME + ":" + SERVICE_ENV
... ...
... ... @@ -14,6 +14,10 @@ const (
TableDeleteEvent EventType = "table.delete"
QuerySetUpdateEvent EventType = "table.query.set.update"
QuerySetUpdateRenameEvent EventType = "table.query.set.update.rename"
TableApplyOnEvent EventType = "table.apply-on"
QuerySetUpdateStatusEvent EventType = "table.query.set.update.status" // 禁用启用
QuerySetDeleteEvent EventType = "table.query.set.delete" // 删除
)
type EventTable struct {
... ...
... ... @@ -176,3 +176,12 @@ func (tables Tables) ToMap() map[int]*Table {
}
return result
}
func AssertTableType(tableType string, types ...TableType) bool {
for _, item := range types {
if tableType == item.ToString() {
return true
}
}
return false
}
... ...
package digitalLib
import (
"github.com/beego/beego/v2/core/logs"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api"
"net/http"
"time"
)
type DigitalLib struct {
Token string
api.BaseServiceGateway
}
func (gateway *DigitalLib) WithToken(token string) *DigitalLib {
gateway.Token = token
return gateway
}
func (gateway *DigitalLib) DefaultHeader() http.Header {
var header = make(map[string][]string)
header["x-mmm-accesstoken"] = []string{gateway.Token}
return header
}
func NewDigitalLib(host string) *DigitalLib {
gt := api.NewBaseServiceGateway(host)
gt.ConnectTimeout = 360 * time.Second
gt.ReadWriteTimeout = 360 * time.Second
gt.Interceptor = func(msg string) {
//log.Logger.Info(msg)
logs.Debug(msg)
}
gt.ServiceName = "【数控中心】"
return &DigitalLib{
BaseServiceGateway: gt,
}
}
func (gateway *DigitalLib) SyncNotice(param RequestSyncNotice) (*DataSyncNotice, error) {
url := gateway.Host() + "/api/sync/notice"
method := "post"
var data DataSyncNotice
err := gateway.FastDoRequest(url, method, param.Body, &data, api.WithHeader(gateway.DefaultHeader()))
if err != nil {
return nil, err
}
return &data, nil
}
... ...
package digitalLib
type RequestSyncNotice struct {
Body interface{}
}
type DataSyncNotice struct{}
... ...
... ... @@ -958,6 +958,10 @@ func (ptr *QuerySetService) ChangeStatus(ctx *domain.Context, querySetId int, st
if _, err = tableRepository.Save(table); err != nil {
return err
}
defer func() {
AsyncEvent(domain.NewEventTable(ctx, domain.QuerySetUpdateStatusEvent).WithTable(table).WithQuerySet(qs))
}()
}
return nil
}
... ...
... ... @@ -15,4 +15,5 @@ func RegisterEvent() {
event.On(domain.TableDeleteEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.QuerySetUpdateEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.QuerySetUpdateRenameEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
event.On(domain.QuerySetUpdateStatusEvent.ToString(), event.ListenerFunc(tableDataChangeHandler), event.High)
}
... ...