|
@@ -9,6 +9,7 @@ import ( |
|
@@ -9,6 +9,7 @@ import ( |
9
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
|
9
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/constant"
|
10
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
|
10
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
|
11
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
|
11
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/api/digitalLib"
|
|
|
12
|
+ "gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/broker"
|
12
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
|
13
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/domainService"
|
13
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
|
14
|
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
|
14
|
"math"
|
15
|
"math"
|
|
@@ -30,79 +31,42 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
@@ -30,79 +31,42 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
30
|
var (
|
31
|
var (
|
31
|
dataChanged = true
|
32
|
dataChanged = true
|
32
|
structChanged = true
|
33
|
structChanged = true
|
33
|
- tableType string
|
34
|
+ ok bool
|
|
|
35
|
+ tableId int
|
34
|
)
|
36
|
)
|
35
|
-
|
|
|
36
|
- data := cmd.EventTable
|
|
|
37
|
- tableId := 0
|
|
|
38
|
- switch data.Type {
|
|
|
39
|
- case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent, domain.TableStructEditEvent:
|
|
|
40
|
- tableId = data.Table.TableId
|
|
|
41
|
- case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
|
|
|
42
|
- tableId = data.QuerySet.QuerySetInfo.BindTableId
|
|
|
43
|
- if data.QuerySet.Status != domain.StatusOn {
|
|
|
44
|
- return nil, nil
|
|
|
45
|
- }
|
|
|
46
|
- if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
47
|
- return nil, nil
|
|
|
48
|
- }
|
|
|
49
|
- case domain.QuerySetUpdateStatusEvent:
|
|
|
50
|
- tableId = data.QuerySet.QuerySetInfo.BindTableId
|
|
|
51
|
- if !domain.AssertTableType(data.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
52
|
- return nil, nil
|
|
|
53
|
- }
|
|
|
54
|
- case domain.TableApplyOnEvent:
|
|
|
55
|
- tableId = data.Table.TableId
|
|
|
56
|
- dataChanged = false
|
|
|
57
|
- case domain.QuerySetDeleteEvent:
|
|
|
58
|
- tableId = data.Table.TableId
|
|
|
59
|
- }
|
|
|
60
|
- if tableId == 0 {
|
37
|
+ event := cmd.EventTable
|
|
|
38
|
+ if tableId = resolveTableId(event); tableId == 0 {
|
61
|
return nil, nil
|
39
|
return nil, nil
|
62
|
}
|
40
|
}
|
63
|
- var notifyData = NotifyData{
|
41
|
+ if event.Type == domain.TableApplyOnEvent {
|
|
|
42
|
+ dataChanged = false
|
|
|
43
|
+ }
|
|
|
44
|
+ var notifyData = &NotifyData{
|
64
|
DataChanged: dataChanged,
|
45
|
DataChanged: dataChanged,
|
65
|
StructChanged: structChanged,
|
46
|
StructChanged: structChanged,
|
66
|
TableId: tableId,
|
47
|
TableId: tableId,
|
67
|
- Event: data.Type.ToString(),
|
|
|
68
|
- Metadata: cmd.EventTable.Metadata,
|
48
|
+ Event: event.Type.ToString(),
|
|
|
49
|
+ Metadata: event.Metadata,
|
69
|
}
|
50
|
}
|
70
|
- // tableId 相关联的
|
51
|
+ // 表类型
|
71
|
tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
|
52
|
tableRepository, table, _ := factory.FastPgTable(transactionContext, tableId)
|
72
|
- if table == nil && data.Table != nil {
|
|
|
73
|
- table = data.Table
|
53
|
+ if table == nil && event.Table != nil {
|
|
|
54
|
+ table = event.Table
|
74
|
notifyData.CompanyId = table.Context.CompanyId
|
55
|
notifyData.CompanyId = table.Context.CompanyId
|
75
|
}
|
56
|
}
|
76
|
- if tableType == "" && table != nil {
|
|
|
77
|
- tableType = table.TableType
|
|
|
78
|
- }
|
|
|
79
|
- if tableType == "" && data.QuerySet != nil {
|
|
|
80
|
- tableType = data.QuerySet.Type
|
|
|
81
|
- }
|
|
|
82
|
- if table != nil {
|
|
|
83
|
- notifyData.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
|
|
|
84
|
- switch domain.TableType(tableType) {
|
|
|
85
|
- case domain.MainTable, domain.SubTable, domain.SideTable:
|
|
|
86
|
- notifyData.ObjectType = "导入模块"
|
|
|
87
|
- case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
|
|
|
88
|
- notifyData.ObjectType = "拆解模块"
|
|
|
89
|
- case domain.CalculateItem, domain.CalculateSet:
|
|
|
90
|
- notifyData.ObjectType = "计算模块"
|
|
|
91
|
- }
|
|
|
92
|
- }
|
57
|
+ notifyData.SetType(event, table)
|
93
|
|
58
|
|
94
|
- _, tables, err := tableRepository.Find(map[string]interface{}{"context": data.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
|
59
|
+ // 依赖的表 \ 依赖的查询集合
|
|
|
60
|
+ _, tables, err := tableRepository.Find(map[string]interface{}{"context": event.Context, "tableTypesNotIn": []string{domain.TemporaryTable.ToString(), domain.ExcelTable.ToString()}})
|
95
|
if err != nil {
|
61
|
if err != nil {
|
96
|
return nil, err
|
62
|
return nil, err
|
97
|
}
|
63
|
}
|
98
|
-
|
|
|
99
|
tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
|
64
|
tableDependencyService, _ := domainService.NewTableDependencyService(transactionContext.(*pgTransaction.TransactionContext))
|
100
|
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
|
65
|
tableDependTree := tableDependencyService.TableDependTree(tables, tableId)
|
101
|
tree := tableDependTree.Tree
|
66
|
tree := tableDependTree.Tree
|
102
|
-
|
|
|
103
|
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
|
67
|
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
|
104
|
var mapTableQuerySet = make(map[int]*domain.QuerySet)
|
68
|
var mapTableQuerySet = make(map[int]*domain.QuerySet)
|
105
|
- if len(tree) > 0 { // && cmd.EventTable.QuerySet != nil
|
69
|
+ if len(tree) > 0 {
|
106
|
_, querySets, _ := querySetRepository.Find(map[string]interface{}{
|
70
|
_, querySets, _ := querySetRepository.Find(map[string]interface{}{
|
107
|
"types": []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
|
71
|
"types": []string{domain.SchemaTable.ToString(), domain.CalculateItem.ToString(), domain.CalculateSet.ToString()},
|
108
|
"bindTableIds": tree,
|
72
|
"bindTableIds": tree,
|
|
@@ -113,8 +77,9 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
@@ -113,8 +77,9 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
113
|
}
|
77
|
}
|
114
|
}
|
78
|
}
|
115
|
|
79
|
|
|
|
80
|
+ // 过滤出需要推送的表
|
116
|
for i := range tree {
|
81
|
for i := range tree {
|
117
|
- table, ok := tableDependencyService.TableMap[tree[i]]
|
82
|
+ table, ok = tableDependencyService.TableMap[tree[i]]
|
118
|
if !ok {
|
83
|
if !ok {
|
119
|
continue
|
84
|
continue
|
120
|
}
|
85
|
}
|
|
@@ -123,46 +88,103 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
|
@@ -123,46 +88,103 @@ func (tableEventService *TableEventService) DigitalPlatformEventSubscribe(ctx *d |
123
|
}
|
88
|
}
|
124
|
switch table.TableType {
|
89
|
switch table.TableType {
|
125
|
case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
|
90
|
case domain.MainTable.ToString(), domain.SubTable.ToString(), domain.SideTable.ToString():
|
126
|
- if table.TableInfo != nil && table.TableInfo.ApplyOnModule&domain.ModuleDigitalCenter == 0 {
|
|
|
127
|
- continue
|
91
|
+ if table.TableInfo != nil {
|
|
|
92
|
+ applyOn := domain.ModuleDigitalCenter | domain.ModuleChartTemplate
|
|
|
93
|
+ if table.TableInfo.ApplyOnModule&applyOn == 0 {
|
|
|
94
|
+ continue
|
|
|
95
|
+ }
|
128
|
}
|
96
|
}
|
129
|
break
|
97
|
break
|
130
|
case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
|
98
|
case domain.SubProcessTable.ToString(), domain.CalculateTable.ToString():
|
131
|
continue
|
99
|
continue
|
132
|
case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
|
100
|
case domain.SchemaTable.ToString(), domain.CalculateSet.ToString(), domain.CalculateItem.ToString():
|
133
|
- if querySet, ok := mapTableQuerySet[tree[i]]; !ok {
|
101
|
+ var querySet *domain.QuerySet
|
|
|
102
|
+ if querySet, ok = mapTableQuerySet[tree[i]]; !ok {
|
|
|
103
|
+ continue
|
|
|
104
|
+ }
|
|
|
105
|
+ // 不是当前的查询集。且状态为关闭的都补推送
|
|
|
106
|
+ if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
|
134
|
continue
|
107
|
continue
|
135
|
- } else {
|
|
|
136
|
- // 不是当前的查询集。且状态为关闭的都补推送
|
|
|
137
|
- if querySet.Status != domain.StatusOn && querySet.QuerySetInfo.BindTableId != 0 && querySet.QuerySetInfo.BindTableId != tableId {
|
|
|
138
|
- continue
|
|
|
139
|
- }
|
|
|
140
|
}
|
108
|
}
|
141
|
}
|
109
|
}
|
142
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
|
110
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tree[i])
|
143
|
}
|
111
|
}
|
144
|
- found := false
|
|
|
145
|
- for _, id := range notifyData.TableAffectedList {
|
|
|
146
|
- if id == tableId {
|
|
|
147
|
- found = true
|
|
|
148
|
- }
|
|
|
149
|
- }
|
|
|
150
|
- if !found {
|
112
|
+ // 包含自己
|
|
|
113
|
+ if !exist(notifyData.TableAffectedList, tableId) {
|
151
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
|
114
|
notifyData.TableAffectedList = append(notifyData.TableAffectedList, tableId)
|
152
|
}
|
115
|
}
|
153
|
- lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
|
|
|
154
|
- if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
|
116
|
+ // 通过消息队列发送
|
|
|
117
|
+ if err = tableEventService.send(notifyData, tableEventService.sendBroker); err != nil {
|
|
|
118
|
+ return nil, err
|
|
|
119
|
+ }
|
|
|
120
|
+ if err = transactionContext.CommitTransaction(); err != nil {
|
|
|
121
|
+ return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
|
|
122
|
+ }
|
|
|
123
|
+ return nil, nil
|
|
|
124
|
+}
|
|
|
125
|
+
|
|
|
126
|
+func (tableEventService *TableEventService) send(notifyData *NotifyData, sendFunc func(*NotifyData) error) error {
|
|
|
127
|
+ var err error
|
|
|
128
|
+ if err = sendFunc(notifyData); err != nil {
|
155
|
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
|
129
|
log.Logger.Error(fmt.Sprintf("通知数控失败:%s", err.Error()))
|
156
|
if t, ok := notifyData.Retry(); ok {
|
130
|
if t, ok := notifyData.Retry(); ok {
|
157
|
tableEventService.TimingWheel.SetTimer(notifyData.Key(), ¬ifyData, t)
|
131
|
tableEventService.TimingWheel.SetTimer(notifyData.Key(), ¬ifyData, t)
|
158
|
log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
|
132
|
log.Logger.Debug(fmt.Sprintf("通知数控重试 key:%s wait:%vs", notifyData.Key(), t.Seconds()))
|
159
|
}
|
133
|
}
|
|
|
134
|
+ }
|
|
|
135
|
+ return err
|
|
|
136
|
+}
|
160
|
|
137
|
|
|
|
138
|
+func (tableEventService *TableEventService) sendHttp(notifyData *NotifyData) error {
|
|
|
139
|
+ var err error
|
|
|
140
|
+ lib := digitalLib.NewDigitalLib(constant.DIGITAL_SERVER_HOST)
|
|
|
141
|
+ if _, err = lib.SyncNotice(digitalLib.RequestSyncNotice{Body: notifyData}); err != nil {
|
|
|
142
|
+ return err
|
161
|
}
|
143
|
}
|
162
|
- if err := transactionContext.CommitTransaction(); err != nil {
|
|
|
163
|
- return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
|
144
|
+ return nil
|
|
|
145
|
+}
|
|
|
146
|
+
|
|
|
147
|
+func (tableEventService *TableEventService) sendBroker(notifyData *NotifyData) error {
|
|
|
148
|
+ var err error
|
|
|
149
|
+ if err = broker.Push(constant.KAFKA_HOST, constant.TOPIC_TABLE_DATA_SYNC, notifyData); err != nil {
|
|
|
150
|
+ return err
|
164
|
}
|
151
|
}
|
165
|
- return nil, nil
|
152
|
+ return nil
|
|
|
153
|
+}
|
|
|
154
|
+
|
|
|
155
|
+func resolveTableId(event *domain.EventTable) (tableId int) {
|
|
|
156
|
+ switch event.Type {
|
|
|
157
|
+ case domain.TableDataImportEvent, domain.TableDataEditEvent, domain.TableDeleteEvent, domain.TableStructEditEvent:
|
|
|
158
|
+ tableId = event.Table.TableId
|
|
|
159
|
+ case domain.QuerySetUpdateEvent, domain.QuerySetUpdateRenameEvent:
|
|
|
160
|
+ if event.QuerySet.Status != domain.StatusOn {
|
|
|
161
|
+ return
|
|
|
162
|
+ }
|
|
|
163
|
+ if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
164
|
+ return
|
|
|
165
|
+ }
|
|
|
166
|
+ tableId = event.QuerySet.QuerySetInfo.BindTableId
|
|
|
167
|
+ case domain.QuerySetUpdateStatusEvent:
|
|
|
168
|
+ if !domain.AssertTableType(event.QuerySet.Type, domain.SchemaTable, domain.CalculateItem, domain.CalculateSet) {
|
|
|
169
|
+ return
|
|
|
170
|
+ }
|
|
|
171
|
+ tableId = event.QuerySet.QuerySetInfo.BindTableId
|
|
|
172
|
+ case domain.TableApplyOnEvent:
|
|
|
173
|
+ tableId = event.Table.TableId
|
|
|
174
|
+ case domain.QuerySetDeleteEvent:
|
|
|
175
|
+ tableId = event.Table.TableId
|
|
|
176
|
+ }
|
|
|
177
|
+ return tableId
|
|
|
178
|
+}
|
|
|
179
|
+
|
|
|
180
|
+func exist(idList []int, target int) bool {
|
|
|
181
|
+ found := false
|
|
|
182
|
+ for _, id := range idList {
|
|
|
183
|
+ if id == target {
|
|
|
184
|
+ found = true
|
|
|
185
|
+ }
|
|
|
186
|
+ }
|
|
|
187
|
+ return found
|
166
|
}
|
188
|
}
|
167
|
|
189
|
|
168
|
type NotifyData struct {
|
190
|
type NotifyData struct {
|
|
@@ -178,6 +200,28 @@ type NotifyData struct { |
|
@@ -178,6 +200,28 @@ type NotifyData struct { |
178
|
sendRetry int
|
200
|
sendRetry int
|
179
|
}
|
201
|
}
|
180
|
|
202
|
|
|
|
203
|
+func (n *NotifyData) SetType(event *domain.EventTable, table *domain.Table) {
|
|
|
204
|
+ var tableType string
|
|
|
205
|
+ // 表类型
|
|
|
206
|
+ if tableType == "" && table != nil {
|
|
|
207
|
+ tableType = table.TableType
|
|
|
208
|
+ }
|
|
|
209
|
+ if tableType == "" && event.QuerySet != nil {
|
|
|
210
|
+ tableType = event.QuerySet.Type
|
|
|
211
|
+ }
|
|
|
212
|
+ n.TableType = domain.EnumsDescription(domain.ObjectTypeMap, tableType)
|
|
|
213
|
+ if table != nil {
|
|
|
214
|
+ switch domain.TableType(tableType) {
|
|
|
215
|
+ case domain.MainTable, domain.SubTable, domain.SideTable:
|
|
|
216
|
+ n.ObjectType = "导入模块"
|
|
|
217
|
+ case domain.SchemaTable, domain.SubProcessTable, domain.CalculateTable:
|
|
|
218
|
+ n.ObjectType = "拆解模块"
|
|
|
219
|
+ case domain.CalculateItem, domain.CalculateSet:
|
|
|
220
|
+ n.ObjectType = "计算模块"
|
|
|
221
|
+ }
|
|
|
222
|
+ }
|
|
|
223
|
+}
|
|
|
224
|
+
|
181
|
func (n *NotifyData) Key() string {
|
225
|
func (n *NotifyData) Key() string {
|
182
|
return fmt.Sprintf("delay:notify:table:%d", n.TableId)
|
226
|
return fmt.Sprintf("delay:notify:table:%d", n.TableId)
|
183
|
}
|
227
|
}
|