正在显示
4 个修改的文件
包含
72 行增加
和
32 行删除
@@ -34,6 +34,6 @@ KqConsumerConf: | @@ -34,6 +34,6 @@ KqConsumerConf: | ||
34 | Name: KqConsumer | 34 | Name: KqConsumer |
35 | Brokers: | 35 | Brokers: |
36 | - 47.97.5.102:9092 | 36 | - 47.97.5.102:9092 |
37 | - Group: bchart_dev | 37 | + Group: bchart_local |
38 | Topic: allied_creation_metadata_table_sync_notice | 38 | Topic: allied_creation_metadata_table_sync_notice |
39 | Processors: 1 | 39 | Processors: 1 |
@@ -61,35 +61,64 @@ func (l *UpdateChartLogic) UpdateChart(req *types.ChartUpdateRequest) (resp *typ | @@ -61,35 +61,64 @@ func (l *UpdateChartLogic) UpdateChart(req *types.ChartUpdateRequest) (resp *typ | ||
61 | if chartSetting, err = l.svcCtx.ChartSettingRepository.UpdateWithVersion(l.ctx, conn, chartSetting); err != nil { | 61 | if chartSetting, err = l.svcCtx.ChartSettingRepository.UpdateWithVersion(l.ctx, conn, chartSetting); err != nil { |
62 | return err | 62 | return err |
63 | } | 63 | } |
64 | - //对比更新前后数据源 | ||
65 | - left, right := lo.Difference(chartSetting.DataSourceIds, oldDataSource) | ||
66 | - //同步 | ||
67 | - if len(left) > 0 { | ||
68 | - for _, sourceId := range left { | ||
69 | - pusher := &types.SyncTableDataPusher{ | ||
70 | - CompanyId: tenantId, | ||
71 | - ObjectId: int(sourceId), | ||
72 | - } | ||
73 | - mBytes, _ := json.Marshal(pusher) | ||
74 | - _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | 64 | + err = l.SyncTableData(tenantId, conn, chartSetting.DataSourceIds, oldDataSource) |
65 | + return err | ||
66 | + }, true); err != nil { | ||
67 | + return nil, xerr.NewErrMsgErr("创建失败", err) | ||
68 | + } | ||
69 | + return | ||
70 | +} | ||
71 | + | ||
72 | +func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn, newDataSourceIds, oldDataSourceIds []int64) error { | ||
73 | + if len(newDataSourceIds) > 0 { | ||
74 | + for _, sourceId := range newDataSourceIds { | ||
75 | + pusher := &types.SyncTableDataPusher{ | ||
76 | + CompanyId: tenantId, | ||
77 | + ObjectId: int(sourceId), | ||
78 | + } | ||
79 | + mBytes, _ := json.Marshal(pusher) | ||
80 | + _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | ||
81 | + } | ||
82 | + } | ||
83 | + //对比更新前后数据源 | ||
84 | + left, right := lo.Difference(newDataSourceIds, oldDataSourceIds) | ||
85 | + //同步 | ||
86 | + if len(left) > 0 { | ||
87 | + for _, sourceId := range left { | ||
88 | + pusher := &types.SyncTableDataPusher{ | ||
89 | + CompanyId: tenantId, | ||
90 | + ObjectId: int(sourceId), | ||
75 | } | 91 | } |
92 | + mBytes, _ := json.Marshal(pusher) | ||
93 | + _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | ||
76 | } | 94 | } |
77 | - //删除 | ||
78 | - if len(right) > 0 { | ||
79 | - for _, sourceId := range right { | ||
80 | - //验证其他图表是否使用 | ||
81 | - used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, conn, int(sourceId)) | ||
82 | - if err == nil && !used { //未使用,删除 | ||
83 | - err = l.svcCtx.ObjectTableDataRepository.DropTable(l.ctx, conn, int(sourceId)) | 95 | + } |
96 | + //删除 | ||
97 | + if len(right) > 0 { | ||
98 | + for _, sourceId := range right { | ||
99 | + //验证其他图表是否使用 | ||
100 | + used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, conn, int(sourceId)) | ||
101 | + if err == nil && !used { //未使用,删除 | ||
102 | + err = l.svcCtx.ObjectTableDataRepository.DropTable(l.ctx, conn, int(sourceId)) | ||
103 | + if err != nil { | ||
104 | + return err | ||
105 | + } | ||
106 | + //是否远程已删除 | ||
107 | + objectTable, err := l.svcCtx.ObjectTableRepository.FindOne(l.ctx, conn, int(sourceId)) | ||
108 | + if err == nil && objectTable.Id > 0 && objectTable.RemoteDeleted == 1 { | ||
109 | + //删除表 | ||
110 | + _, err = l.svcCtx.ObjectTableRepository.Delete(l.ctx, conn, &domain.ObjectTable{Id: int(sourceId)}) | ||
111 | + if err != nil { | ||
112 | + return err | ||
113 | + } | ||
114 | + //删除字段 | ||
115 | + _, err = l.svcCtx.ObjectFieldRepository.Delete(l.ctx, conn, &domain.ObjectField{Id: sourceId}) | ||
84 | if err != nil { | 116 | if err != nil { |
85 | return err | 117 | return err |
86 | } | 118 | } |
87 | } | 119 | } |
88 | } | 120 | } |
89 | } | 121 | } |
90 | - return nil | ||
91 | - }, true); err != nil { | ||
92 | - return nil, xerr.NewErrMsgErr("创建失败", err) | ||
93 | } | 122 | } |
94 | - return | 123 | + return nil |
95 | } | 124 | } |
@@ -64,7 +64,7 @@ func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain | @@ -64,7 +64,7 @@ func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain | ||
64 | if notice.StructChanged { | 64 | if notice.StructChanged { |
65 | request := bytelib.ObjectTableSearchRequest{ | 65 | request := bytelib.ObjectTableSearchRequest{ |
66 | Token: accessToken, | 66 | Token: accessToken, |
67 | - Module: bytelib.ModuleDigitalCenter, | 67 | + Module: bytelib.ModuleChartTemplate, |
68 | } | 68 | } |
69 | if notice.ObjectType == "导入模块" { | 69 | if notice.ObjectType == "导入模块" { |
70 | request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable} | 70 | request.TableTypes = []string{bytelib.MainTable, bytelib.SubTable, bytelib.SideTable} |
@@ -164,13 +164,23 @@ func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain | @@ -164,13 +164,23 @@ func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain | ||
164 | if err != nil { | 164 | if err != nil { |
165 | return err | 165 | return err |
166 | } | 166 | } |
167 | - } | ||
168 | - //是否有使用数据源 | ||
169 | - used, err := logic.svcCtx.ChartSettingRepository.CheckUseDataSource(logic.ctx, conn, notice.TableId) | ||
170 | - if err == nil && !used { | ||
171 | - err = logic.svcCtx.ObjectTableDataRepository.DropTable(logic.ctx, conn, notice.TableId) | ||
172 | - if err != nil { | ||
173 | - return err | 167 | + //是否有使用数据源 |
168 | + used, err := logic.svcCtx.ChartSettingRepository.CheckUseDataSource(logic.ctx, conn, notice.TableId) | ||
169 | + if err == nil && !used { | ||
170 | + err = logic.svcCtx.ObjectTableDataRepository.DropTable(logic.ctx, conn, notice.TableId) | ||
171 | + if err != nil { | ||
172 | + return err | ||
173 | + } | ||
174 | + //删除表 | ||
175 | + _, err = logic.svcCtx.ObjectTableRepository.Delete(logic.ctx, conn, objectTable) | ||
176 | + if err != nil { | ||
177 | + return err | ||
178 | + } | ||
179 | + //删除字段 | ||
180 | + _, err = logic.svcCtx.ObjectFieldRepository.Delete(logic.ctx, conn, &domain.ObjectField{Id: int64(objectTable.Id)}) | ||
181 | + if err != nil { | ||
182 | + return err | ||
183 | + } | ||
174 | } | 184 | } |
175 | } | 185 | } |
176 | return nil | 186 | return nil |
@@ -2,6 +2,7 @@ package repository | @@ -2,6 +2,7 @@ package repository | ||
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | "context" | 4 | "context" |
5 | + "fmt" | ||
5 | "github.com/jinzhu/copier" | 6 | "github.com/jinzhu/copier" |
6 | "github.com/pkg/errors" | 7 | "github.com/pkg/errors" |
7 | "github.com/tiptok/gocomm/pkg/cache" | 8 | "github.com/tiptok/gocomm/pkg/cache" |
@@ -143,7 +144,7 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac | @@ -143,7 +144,7 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac | ||
143 | // CheckUseDataSource 检验是否使用数据源 | 144 | // CheckUseDataSource 检验是否使用数据源 |
144 | func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) { | 145 | func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) { |
145 | var count int64 | 146 | var count int64 |
146 | - err := conn.DB().Model(&models.ChartSetting{}).Where("data_source_ids::jsonb @>'[?]'", objectId).Count(&count).Error | 147 | + err := conn.DB().Model(&models.ChartSetting{}).Where(fmt.Sprintf("data_source_ids::jsonb @>'[%v]'", objectId)).Count(&count).Error |
147 | if err != nil { | 148 | if err != nil { |
148 | return false, err | 149 | return false, err |
149 | } | 150 | } |
-
请 注册 或 登录 后发表评论