正在显示
6 个修改的文件
包含
46 行增加
和
34 行删除
@@ -65,29 +65,6 @@ func main() { | @@ -65,29 +65,6 @@ func main() { | ||
65 | } | 65 | } |
66 | 66 | ||
67 | func startConsume(c config.Config) { | 67 | func startConsume(c config.Config) { |
68 | - //svcCtx := svc.NewServiceContext(c) | ||
69 | - go func() { | ||
70 | - //for { | ||
71 | - // notice := &domain.ObjectNotice{ | ||
72 | - // CompanyId: 1598224576532189184, | ||
73 | - // TableId: 521, | ||
74 | - // TableType: "主表", | ||
75 | - // ObjectType: "导入模块", | ||
76 | - // Event: "table.data.edit", | ||
77 | - // TableAffectedList: []int{521}, | ||
78 | - // DataChanged: true, | ||
79 | - // StructChanged: true, | ||
80 | - // MetaData: domain.ObjectNoticeMetaData{ | ||
81 | - // Module: 0, | ||
82 | - // Status: 0, | ||
83 | - // }, | ||
84 | - // } | ||
85 | - // mBytes, _ := json.Marshal(notice) | ||
86 | - // err := kq.NewPusher(c.KqConsumerConf.Brokers, c.KqConsumerConf.Topic).Push(string(mBytes)) | ||
87 | - // fmt.Println(err) | ||
88 | - // time.Sleep(10 * 10 * time.Second) | ||
89 | - //} | ||
90 | - }() | ||
91 | //kafka消费队列 处理字库推送事件 | 68 | //kafka消费队列 处理字库推送事件 |
92 | go func() { | 69 | go func() { |
93 | svcCtx := svc.NewServiceContext(c) | 70 | svcCtx := svc.NewServiceContext(c) |
@@ -109,8 +86,4 @@ func startConsume(c config.Config) { | @@ -109,8 +86,4 @@ func startConsume(c config.Config) { | ||
109 | time.Sleep(3 * time.Second) | 86 | time.Sleep(3 * time.Second) |
110 | } | 87 | } |
111 | }() | 88 | }() |
112 | - //kq.MustNewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx)) | ||
113 | - //for { | ||
114 | - //time.Sleep(1 * time.Second) | ||
115 | - //} | ||
116 | } | 89 | } |
@@ -97,7 +97,7 @@ func (l *SaveChartLogic) SaveChart(req *types.ChartSaveRequest) (resp *types.Cha | @@ -97,7 +97,7 @@ func (l *SaveChartLogic) SaveChart(req *types.ChartSaveRequest) (resp *types.Cha | ||
97 | ObjectId: int(sourceId), | 97 | ObjectId: int(sourceId), |
98 | } | 98 | } |
99 | mBytes, _ := json.Marshal(pusher) | 99 | mBytes, _ := json.Marshal(pusher) |
100 | - _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | 100 | + _, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes)) |
101 | } | 101 | } |
102 | } | 102 | } |
103 | resp = &types.ChartSaveResponse{ | 103 | resp = &types.ChartSaveResponse{ |
@@ -77,7 +77,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn, | @@ -77,7 +77,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn, | ||
77 | ObjectId: int(sourceId), | 77 | ObjectId: int(sourceId), |
78 | } | 78 | } |
79 | mBytes, _ := json.Marshal(pusher) | 79 | mBytes, _ := json.Marshal(pusher) |
80 | - _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | 80 | + _, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes)) |
81 | } | 81 | } |
82 | } | 82 | } |
83 | //对比更新前后数据源 | 83 | //对比更新前后数据源 |
@@ -90,7 +90,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn, | @@ -90,7 +90,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn, | ||
90 | ObjectId: int(sourceId), | 90 | ObjectId: int(sourceId), |
91 | } | 91 | } |
92 | mBytes, _ := json.Marshal(pusher) | 92 | mBytes, _ := json.Marshal(pusher) |
93 | - _, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes)) | 93 | + _, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes)) |
94 | } | 94 | } |
95 | } | 95 | } |
96 | //删除 | 96 | //删除 |
@@ -2,6 +2,8 @@ package table | @@ -2,6 +2,8 @@ package table | ||
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | "context" | 4 | "context" |
5 | + "encoding/json" | ||
6 | + "fmt" | ||
5 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types" | 7 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types" |
6 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction" | 8 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction" |
7 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain" | 9 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain" |
@@ -81,6 +83,26 @@ func (l *SyncTableLogic) SyncTable() (resp interface{}, err error) { | @@ -81,6 +83,26 @@ func (l *SyncTableLogic) SyncTable() (resp interface{}, err error) { | ||
81 | } | 83 | } |
82 | return nil | 84 | return nil |
83 | }, true) | 85 | }, true) |
86 | + if err != nil { | ||
87 | + return nil, xerr.NewErrMsg("保存表失败:" + err.Error()) | ||
88 | + } | ||
89 | + //同步表数据 | ||
90 | + for _, item := range tables { | ||
91 | + if item.TableId <= 0 { | ||
92 | + continue | ||
93 | + } | ||
94 | + used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, l.conn, item.TableId) | ||
95 | + if err == nil && used { | ||
96 | + pusher := &types.SyncTableDataPusher{ | ||
97 | + CompanyId: item.CompanyId, | ||
98 | + ObjectId: int(item.TableId), | ||
99 | + } | ||
100 | + mBytes, _ := json.Marshal(pusher) | ||
101 | + _, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes)) | ||
102 | + fmt.Println(" ===========================> 加入数据下载队列 " + string(mBytes)) | ||
103 | + } | ||
104 | + } | ||
105 | + fmt.Println("========>数据同步完成") | ||
84 | return companyIds, err | 106 | return companyIds, err |
85 | } | 107 | } |
86 | 108 |
@@ -144,7 +144,10 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac | @@ -144,7 +144,10 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac | ||
144 | // CheckUseDataSource 检验是否使用数据源 | 144 | // CheckUseDataSource 检验是否使用数据源 |
145 | func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) { | 145 | func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) { |
146 | var count int64 | 146 | var count int64 |
147 | - err := conn.DB().Model(&models.ChartSetting{}).Where(fmt.Sprintf("data_source_ids::jsonb @>'[%v]'", objectId)).Count(&count).Error | 147 | + err := conn.DB().Model(&models.ChartSetting{}). |
148 | + Joins("left join chart on chart.id=chart_setting.chart_id"). | ||
149 | + Where("chart.is_del = 0"). | ||
150 | + Where(fmt.Sprintf("data_source_ids::jsonb @>'[%v]'", objectId)).Count(&count).Error | ||
148 | if err != nil { | 151 | if err != nil { |
149 | return false, err | 152 | return false, err |
150 | } | 153 | } |
@@ -7,6 +7,7 @@ import ( | @@ -7,6 +7,7 @@ import ( | ||
7 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction" | 7 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction" |
8 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain" | 8 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain" |
9 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib" | 9 | "gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib" |
10 | + "strconv" | ||
10 | "strings" | 11 | "strings" |
11 | ) | 12 | ) |
12 | 13 | ||
@@ -24,13 +25,26 @@ func (repository *ObjectTableDataRepository) makeDropTableSQL(tableId int) (stri | @@ -24,13 +25,26 @@ func (repository *ObjectTableDataRepository) makeDropTableSQL(tableId int) (stri | ||
24 | } | 25 | } |
25 | 26 | ||
26 | // makeCreateTableSQL 创建表SQL | 27 | // makeCreateTableSQL 创建表SQL |
27 | -func (repository *ObjectTableDataRepository) makeCreateTableSQL(tableId int, fields []*bytelib.Field) (string, error) { | 28 | +func (repository *ObjectTableDataRepository) makeCreateTableSQL(tableId int, tableData bytelib.TableData) (string, error) { |
29 | + fields := tableData.Fields | ||
30 | + list := tableData.Grid.List | ||
28 | if len(fields) <= 0 { | 31 | if len(fields) <= 0 { |
29 | return "", errors.New("缺少字段信息") | 32 | return "", errors.New("缺少字段信息") |
30 | } | 33 | } |
31 | columns := make([]string, 0) | 34 | columns := make([]string, 0) |
32 | for _, item := range fields { | 35 | for _, item := range fields { |
33 | - columns = append(columns, item.SQLName+" text ") | 36 | + fieldType := "text" |
37 | + //判断字段是否为id,并且数据值能转为整型 设置字段类型 为 int8,否则为 text | ||
38 | + if item.SQLName == "id" && len(list) > 0 { | ||
39 | + listItem := list[0] | ||
40 | + if idValue, ok := listItem["id"]; ok { | ||
41 | + _, err := strconv.Atoi(idValue) | ||
42 | + if err == nil { | ||
43 | + fieldType = "int8" | ||
44 | + } | ||
45 | + } | ||
46 | + } | ||
47 | + columns = append(columns, item.SQLName+" "+fieldType+" ") | ||
34 | } | 48 | } |
35 | sql := `Create TABLE data."` + fmt.Sprintf("%v", tableId) + `" (` + strings.Join(columns, ",") + `);` | 49 | sql := `Create TABLE data."` + fmt.Sprintf("%v", tableId) + `" (` + strings.Join(columns, ",") + `);` |
36 | return sql, nil | 50 | return sql, nil |
@@ -92,7 +106,7 @@ func (repository *ObjectTableDataRepository) InsertWithTableData(ctx context.Con | @@ -92,7 +106,7 @@ func (repository *ObjectTableDataRepository) InsertWithTableData(ctx context.Con | ||
92 | return err | 106 | return err |
93 | } | 107 | } |
94 | //创建表 | 108 | //创建表 |
95 | - createTableSql, err := repository.makeCreateTableSQL(int(tableDataPreview.ObjectId), tableDataPreview.Fields) | 109 | + createTableSql, err := repository.makeCreateTableSQL(int(tableDataPreview.ObjectId), tableDataPreview) |
96 | if err != nil { | 110 | if err != nil { |
97 | return err | 111 | return err |
98 | } | 112 | } |
-
请 注册 或 登录 后发表评论