作者 庄敏学

Merge branch 'dev-data' into dev

... ... @@ -65,29 +65,6 @@ func main() {
}
func startConsume(c config.Config) {
//svcCtx := svc.NewServiceContext(c)
go func() {
//for {
// notice := &domain.ObjectNotice{
// CompanyId: 1598224576532189184,
// TableId: 521,
// TableType: "主表",
// ObjectType: "导入模块",
// Event: "table.data.edit",
// TableAffectedList: []int{521},
// DataChanged: true,
// StructChanged: true,
// MetaData: domain.ObjectNoticeMetaData{
// Module: 0,
// Status: 0,
// },
// }
// mBytes, _ := json.Marshal(notice)
// err := kq.NewPusher(c.KqConsumerConf.Brokers, c.KqConsumerConf.Topic).Push(string(mBytes))
// fmt.Println(err)
// time.Sleep(10 * 10 * time.Second)
//}
}()
//kafka消费队列 处理字库推送事件
go func() {
svcCtx := svc.NewServiceContext(c)
... ... @@ -109,8 +86,4 @@ func startConsume(c config.Config) {
time.Sleep(3 * time.Second)
}
}()
//kq.MustNewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
//for {
//time.Sleep(1 * time.Second)
//}
}
... ...
... ... @@ -97,7 +97,7 @@ func (l *SaveChartLogic) SaveChart(req *types.ChartSaveRequest) (resp *types.Cha
ObjectId: int(sourceId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes))
_, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes))
}
}
resp = &types.ChartSaveResponse{
... ...
... ... @@ -77,7 +77,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn,
ObjectId: int(sourceId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes))
_, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes))
}
}
//对比更新前后数据源
... ... @@ -90,7 +90,7 @@ func (l *UpdateChartLogic) SyncTableData(tenantId int64, conn transaction.Conn,
ObjectId: int(sourceId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes))
_, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes))
}
}
//删除
... ...
... ... @@ -2,6 +2,8 @@ package table
import (
"context"
"encoding/json"
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
... ... @@ -81,6 +83,26 @@ func (l *SyncTableLogic) SyncTable() (resp interface{}, err error) {
}
return nil
}, true)
if err != nil {
return nil, xerr.NewErrMsg("保存表失败:" + err.Error())
}
//同步表数据
for _, item := range tables {
if item.TableId <= 0 {
continue
}
used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, l.conn, item.TableId)
if err == nil && used {
pusher := &types.SyncTableDataPusher{
CompanyId: item.CompanyId,
ObjectId: int(item.TableId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.Lpush(l.svcCtx.Config.Name+":table_data", string(mBytes))
fmt.Println(" ===========================> 加入数据下载队列 " + string(mBytes))
}
}
fmt.Println("========>数据同步完成")
return companyIds, err
}
... ...
... ... @@ -144,7 +144,10 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac
// CheckUseDataSource 检验是否使用数据源
func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) {
var count int64
err := conn.DB().Model(&models.ChartSetting{}).Where(fmt.Sprintf("data_source_ids::jsonb @>'[%v]'", objectId)).Count(&count).Error
err := conn.DB().Model(&models.ChartSetting{}).
Joins("left join chart on chart.id=chart_setting.chart_id").
Where("chart.is_del = 0").
Where(fmt.Sprintf("data_source_ids::jsonb @>'[%v]'", objectId)).Count(&count).Error
if err != nil {
return false, err
}
... ...
... ... @@ -7,6 +7,7 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
"strconv"
"strings"
)
... ... @@ -24,13 +25,26 @@ func (repository *ObjectTableDataRepository) makeDropTableSQL(tableId int) (stri
}
// makeCreateTableSQL 创建表SQL
func (repository *ObjectTableDataRepository) makeCreateTableSQL(tableId int, fields []*bytelib.Field) (string, error) {
func (repository *ObjectTableDataRepository) makeCreateTableSQL(tableId int, tableData bytelib.TableData) (string, error) {
fields := tableData.Fields
list := tableData.Grid.List
if len(fields) <= 0 {
return "", errors.New("缺少字段信息")
}
columns := make([]string, 0)
for _, item := range fields {
columns = append(columns, item.SQLName+" text ")
fieldType := "text"
//判断字段是否为id,并且数据值能转为整型 设置字段类型 为 int8,否则为 text
if item.SQLName == "id" && len(list) > 0 {
listItem := list[0]
if idValue, ok := listItem["id"]; ok {
_, err := strconv.Atoi(idValue)
if err == nil {
fieldType = "int8"
}
}
}
columns = append(columns, item.SQLName+" "+fieldType+" ")
}
sql := `Create TABLE data."` + fmt.Sprintf("%v", tableId) + `" (` + strings.Join(columns, ",") + `);`
return sql, nil
... ... @@ -92,7 +106,7 @@ func (repository *ObjectTableDataRepository) InsertWithTableData(ctx context.Con
return err
}
//创建表
createTableSql, err := repository.makeCreateTableSQL(int(tableDataPreview.ObjectId), tableDataPreview.Fields)
createTableSql, err := repository.makeCreateTableSQL(int(tableDataPreview.ObjectId), tableDataPreview)
if err != nil {
return err
}
... ...