作者 yangfu

fix:query set preview

... ... @@ -4,9 +4,12 @@ import (
"context"
"fmt"
"github.com/beego/beego/v2/task"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"github.com/linmadan/egglib-go/utils/xtime"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/dao"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/infrastructure/starrocks"
"gitlab.fjmaimaimai.com/allied-creation/character-library-metadata-bastion/pkg/log"
"io/fs"
"os"
... ... @@ -28,6 +31,9 @@ func (crontabService *CrontabService) initTask() {
autoRemovePublicDownloadFile := task.NewTask("定时清理缓存文件", "0 20 */1 * * *", AutoRemovePublicDownloadFile)
task.AddTask("autoRemovePublicDownloadFile", autoRemovePublicDownloadFile)
autoRemoveTemporaryTable := task.NewTask("定时清理临时表", "0 57 */1 * * *", AutoRemoveTemporaryTable)
task.AddTask("autoRemoveTemporaryTable", autoRemoveTemporaryTable)
}
func (crontabService *CrontabService) StartCrontabTask() {
... ... @@ -115,3 +121,52 @@ func AutoRemovePublicDownloadFile(ctx context.Context) error {
}
return nil
}
func AutoRemoveTemporaryTable(ctx context.Context) error {
defer func() {
if r := recover(); r != nil {
log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时清理过期临时文件记录"})
}
}()
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
if err := transactionContext.StartTransaction(); err != nil {
return err
}
defer func() {
if err != nil {
log.Logger.Error("【定时清理临时表】 失败:" + err.Error())
}
transactionContext.RollbackTransaction()
}()
log.Logger.Debug("【定时清理临时表】 启动")
end := xtime.New(time.Now()).BeginningOfDay().Add(-time.Hour * 12)
begin := end.AddDate(0, 0, -7)
tableRepository, _, _ := factory.FastPgTable(transactionContext, 0)
_, tables, err := tableRepository.Find(map[string]interface{}{"beginTime": begin, "endTime": end, "tableTypes": []string{domain.TemporaryTable.ToString()}})
if err != nil {
return nil
}
for i, t := range tables {
if err = dao.TableDelete(transactionContext.(*pgTransaction.TransactionContext), t.TableId, domain.TemporaryTable); err != nil {
log.Logger.Error(err.Error())
return nil
}
log.Logger.Info(fmt.Sprintf("序号:%d 清理临时表 %v", i, t.SQLName))
if err = starrocks.DropView(starrocks.DB, t.SQLName); err != nil {
log.Logger.Error(err.Error())
return nil
}
}
//if err = dao.TableDeleteByTime(transactionContext.(*pgTransaction.TransactionContext), domain.TemporaryTable, begin, end); err != nil {
// return err
//}
if err = transactionContext.CommitTransaction(); err != nil {
return err
}
return nil
}
... ...
... ... @@ -344,6 +344,35 @@ func (querySetService *QuerySetService) UpdateQuerySet(ctx *domain.Context, upda
return struct{}{}, nil
}
func (querySetService *QuerySetService) PreviewPrepare(ctx *domain.Context, updateQuerySetCommand *command.UpdateQuerySetCommand) (interface{}, error) {
if err := updateQuerySetCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
svr, _ := factory.FastQuerySetServices(transactionContext)
table, err := svr.PreviewPrepare(ctx, updateQuerySetCommand.QuerySetId, updateQuerySetCommand.QueryComponents)
if err != nil {
return nil, factory.FastError(err)
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return map[string]interface{}{
"objectId": table.TableId,
"sqlName": table.SQLName,
}, nil
}
func NewQuerySetService(options map[string]interface{}) *QuerySetService {
newQuerySetService := &QuerySetService{}
return newQuerySetService
... ...
... ... @@ -523,11 +523,8 @@ func (tableService *TableService) CheckRowDuplicateV2(ctx *domain.Context, cmd *
}
querySetRepository, _, _ := factory.FastPgQuerySet(transactionContext, 0)
querySet, err := querySetRepository.FindOne(map[string]interface{}{"BindTableId": table.TableId, "context": ctx})
if err != nil {
return nil, factory.FastError(err)
}
if len(querySet.QueryComponents) == 1 {
querySet, _ := querySetRepository.FindOne(map[string]interface{}{"BindTableId": table.TableId, "context": ctx})
if querySet != nil && len(querySet.QueryComponents) == 1 {
return defaultResponse, nil
}
... ...
... ... @@ -75,10 +75,11 @@ var (
)
var (
MainTable TableType = "MainTable"
SideTable TableType = "SideTable"
SubTable TableType = "SubTable"
ExcelTable TableType = "ExcelTable"
MainTable TableType = "MainTable"
SideTable TableType = "SideTable"
SubTable TableType = "SubTable"
ExcelTable TableType = "ExcelTable"
TemporaryTable TableType = "TemporaryTable"
)
var (
... ...
... ... @@ -265,7 +265,7 @@ func (gateway ApiByteLib) FormulasGenerate(param domain.ReqFormulasGenerate) (*d
}
func (gateway ApiByteLib) FormulasClear(param domain.ReqFormulasClear) (*domain.DataFormulasClear, error) {
url := gateway.Host() + "/formulas/generate"
url := gateway.Host() + "/formulas/clear"
method := "post"
var data FormulasGenerateResponse
request := FormulasClearRequest{
... ...
... ... @@ -36,15 +36,15 @@ type (
}
FormulaDataHandleRule struct {
RuleType int `json:"ruleType"` //Int, 规则类型 1.拆分赋值2.正常赋值
FormulaJoinCondition FormulaJoinCondition `json:"formulaJoinCondition"` // 拆分赋值
AssignValueCondition AssignValueCondition `json:"assignValueCondition"` // 正常赋值
RuleType int `json:"ruleType"` //Int, 规则类型 1.拆分赋值2.正常赋值
FormulaJoinCondition *FormulaJoinCondition `json:"formulaJoinCondition"` // 拆分赋值
AssignValueCondition *AssignValueCondition `json:"assignValueCondition"` // 正常赋值
}
// FormulaJoinCondition 拆分链接条件
FormulaJoinCondition struct {
ConditionLeftField FormulaField `json:"conditionLeftField"`
ConditionRightField FormulaField `json:"conditionRightField"`
JoinAssignValues []AssignValueCondition `json:"joinAssignValues"`
ConditionLeftField FormulaField `json:"conditionLeftField"`
ConditionRightField FormulaField `json:"conditionRightField"`
JoinAssignValues []*AssignValueCondition `json:"joinAssignValues"`
}
// AssignValueCondition 拆分规则
AssignValueCondition struct {
... ... @@ -170,10 +170,11 @@ func NewFormulaDataHandleRule(s domain.SelectExprGroup) FormulaDataHandleRule {
return res
}
func NewFormulaJoinCondition(s domain.SelectExprGroup) FormulaJoinCondition {
var res = FormulaJoinCondition{
func NewFormulaJoinCondition(s domain.SelectExprGroup) *FormulaJoinCondition {
var res = &FormulaJoinCondition{
ConditionLeftField: NewFormulaField(s.FieldLeft, true),
ConditionRightField: NewFormulaField(s.FieldRight),
JoinAssignValues: make([]*AssignValueCondition, 0),
}
for _, sub := range s.SubSelects {
res.JoinAssignValues = append(res.JoinAssignValues, NewAssignValueCondition(sub))
... ... @@ -181,8 +182,8 @@ func NewFormulaJoinCondition(s domain.SelectExprGroup) FormulaJoinCondition {
return res
}
func NewAssignValueCondition(selectExpr domain.SelectExpr) AssignValueCondition {
var res = AssignValueCondition{
func NewAssignValueCondition(selectExpr domain.SelectExpr) *AssignValueCondition {
var res = &AssignValueCondition{
ConditionLeftField: NewFormulaField(selectExpr.FieldLeft, true),
ConditionRightField: NewFormulaField(selectExpr.FieldRight),
}
... ...
... ... @@ -17,3 +17,9 @@ func TableSoftDelete(ptr *pgTransaction.TransactionContext, tableId int, tableTy
_, err := ptr.PgTx.Exec(sql, time.Now(), tableId, tableType.ToString())
return err
}
func TableDeleteByTime(ptr *pgTransaction.TransactionContext, tableType domain.TableType, begin time.Time, end time.Time) error {
sql := "delete from metadata.tables where table_type = ? and created_at >=? and created_at< ?"
_, err := ptr.PgTx.Exec(sql, tableType.ToString(), begin, end)
return err
}
... ...
... ... @@ -162,6 +162,7 @@ func NewCopyTable(tableType domain.TableType, fileName string, dataFields []*dom
table.CreatedAt = time.Now()
table.UpdatedAt = time.Now()
table.RowCount = rowCount
table.TableInfo = domain.NewTableInfo()
return table
}
... ...
... ... @@ -136,6 +136,60 @@ func (ptr *QuerySetService) Update(ctx *domain.Context, querySetId int, queryCom
return nil
}
func (ptr *QuerySetService) PreviewPrepare(ctx *domain.Context, querySetId int, queryComponents []*domain.QueryComponent) (*domain.Table, error) {
querySetRepository, _ := repository.NewQuerySetRepository(ptr.transactionContext)
querySet, err := querySetRepository.FindOne(map[string]interface{}{"querySetId": querySetId})
tableRepository, _ := repository.NewTableRepository(ptr.transactionContext)
if err != nil {
return nil, err
}
if queryComponentsHasEdit(ctx, querySet, queryComponents) && querySet.QuerySetInfo.BindTableId > 0 {
if t, _ := tableRepository.FindOne(map[string]interface{}{"context": ctx, "tableId": querySet.QuerySetInfo.BindTableId}); t != nil {
return t, nil
}
}
// 验证
if err = ptr.validQueryComponents(queryComponents); err != nil {
return nil, err
}
var (
foundMasterTable *domain.Table
)
// 生成Table
masterTable := queryComponents[0].MasterTable
dependencyTables := querySet.GetDependencyTables(queryComponents)
foundMasterTable, err = tableRepository.FindOne(map[string]interface{}{"context": ctx, "tableId": masterTable.TableId})
if err != nil {
return nil, err
}
masterTable = domain.NewQueryComponentTable(foundMasterTable)
var table *domain.Table = NewCopyTable(domain.TableType(domain.TemporaryTable), querySet.Name, domain.RangeFields(masterTable.Fields, domain.ChangeFieldFlag), 0).
WithContext(ctx).
WithPrefix(strings.ToLower(string(domain.TemporaryTable)))
// 循环依赖判断
if err = ptr.validDependentCircle(ctx, querySet, queryComponents); err != nil {
return nil, err
}
table.TableInfo.ApplyOnModule = domain.ModuleAll
table.TableInfo.DependencyTables = dependencyTables
table, err = tableRepository.Save(table)
if err != nil {
return nil, err
}
// 调用底层的组装sql
if _, err = ByteCore.FormulasGenerate(domain.ReqFormulasGenerate{
QuerySet: querySet,
Table: table,
QueryComponents: queryComponents,
}); err != nil {
return nil, err
}
return table, nil
}
func (ptr *QuerySetService) validQueryComponents(queryComponents []*domain.QueryComponent) error {
if len(queryComponents) == 0 {
return nil
... ... @@ -154,6 +208,42 @@ func (ptr *QuerySetService) validQueryComponents(queryComponents []*domain.Query
return nil
}
func (ptr *QuerySetService) validDependentCircle(ctx *domain.Context, querySet *domain.QuerySet, queryComponents []*domain.QueryComponent) error {
tableRepository, _ := repository.NewTableRepository(ptr.transactionContext)
var validTables = make([]*domain.Table, 0)
dependencyTables := querySet.GetDependencyTables(queryComponents)
if len(dependencyTables) > 0 {
_, tables, err := tableRepository.Find(map[string]interface{}{"context": ctx, "tableIds": dependencyTables})
if err != nil {
return err
}
tableMap := make(map[int]*domain.Table)
for i := range tables {
tableMap[tables[i].TableId] = tables[i]
}
for _, c := range queryComponents {
if t, ok := tableMap[c.MasterTable.TableId]; ok {
c.MasterTable = domain.NewQueryComponentTable(t)
}
}
for _, t := range validTables {
if t.TableType == domain.SchemaTable.ToString() || t.TableType == domain.SubProcessTable.ToString() {
validTables = append(validTables, t)
}
}
}
// 循环依赖判断
tableDependencyService, _ := NewTableDependencyService(ptr.transactionContext)
if len(validTables) > 0 {
tree := tableDependencyService.TableDependTree(validTables, querySet.QuerySetInfo.BindTableId)
if tableDependencyService.Detect(ctx, tree.EdgesArray()) {
return NewCircleDependError(tableDependencyService.CircleTable(), querySet)
}
}
return nil
}
func (ptr *QuerySetService) UpdateQuerySetLog(ctx *domain.Context, querySet *domain.QuerySet, queryComponents []*domain.QueryComponent) error {
var res = make([]FastSourceLog, 0)
if logs := conditionsEditLog(ctx, querySet, queryComponents); len(logs) > 0 {
... ... @@ -254,10 +344,14 @@ func conditionsEditLog(ctx *domain.Context, querySet *domain.QuerySet, queryComp
return res
}
func queryComponentsHasEdit(ctx *domain.Context, querySet *domain.QuerySet, queryComponents []*domain.QueryComponent) bool {
logs := selectsEditLog(ctx, querySet, queryComponents)
return len(logs) == 0
}
func selectsEditLog(ctx *domain.Context, querySet *domain.QuerySet, queryComponents []*domain.QueryComponent) []FastSourceLog {
var res = make([]FastSourceLog, 0)
oldQCs := domain.QueryComponentsToMapById(querySet.QueryComponents)
//newQCs := domain.QueryComponentsToMapById(queryComponents)
sourceId := querySet.QuerySetId
entry := domain.NewLogEntry(querySet.Name, querySet.Type, domain.UnKnown, ctx)
... ... @@ -351,7 +445,7 @@ func (ptr *QuerySetService) CreateOrUpdateQuerySetTable(ctx *domain.Context, que
return nil, err
}
masterTable = domain.NewQueryComponentTable(foundMasterTable)
var table *domain.Table = NewTable(domain.TableType(querySet.Type), querySet.Name, domain.RangeFields(masterTable.Fields, domain.ChangeFieldFlag), 0).WithContext(ctx).WithPrefix(strings.ToLower(querySet.Type))
var table *domain.Table = NewCopyTable(domain.TableType(querySet.Type), querySet.Name, domain.RangeFields(masterTable.Fields, domain.ChangeFieldFlag), 0).WithContext(ctx).WithPrefix(strings.ToLower(querySet.Type))
if querySet.QuerySetInfo.BindTableId > 0 {
table, err = tableRepository.FindOne(map[string]interface{}{"context": ctx, "tableId": querySet.QuerySetInfo.BindTableId})
if err != nil {
... ... @@ -360,36 +454,41 @@ func (ptr *QuerySetService) CreateOrUpdateQuerySetTable(ctx *domain.Context, que
table.DataFields = masterTable.Fields
table.UpdatedAt = time.Now()
}
var validTables = make([]*domain.Table, 0)
if len(dependencyTables) > 0 {
_, tables, err := tableRepository.Find(map[string]interface{}{"context": ctx, "tableIds": dependencyTables})
if err != nil {
return nil, err
}
tableMap := make(map[int]*domain.Table)
for i := range tables {
tableMap[tables[i].TableId] = tables[i]
}
for _, c := range queryComponents {
if t, ok := tableMap[c.MasterTable.TableId]; ok {
c.MasterTable = domain.NewQueryComponentTable(t)
}
}
for _, t := range validTables {
if t.TableType == domain.SchemaTable.ToString() || t.TableType == domain.SubProcessTable.ToString() {
validTables = append(validTables, t)
}
}
}
// 循环依赖判断
tableDependencyService, _ := NewTableDependencyService(ptr.transactionContext)
if len(validTables) > 0 {
tree := tableDependencyService.TableDependTree(validTables, querySet.QuerySetInfo.BindTableId)
if tableDependencyService.Detect(ctx, tree.EdgesArray()) {
return nil, NewCircleDependError(tableDependencyService.CircleTable(), querySet)
}
if err = ptr.validDependentCircle(ctx, querySet, queryComponents); err != nil {
return nil, err
}
//var validTables = make([]*domain.Table, 0)
//if len(dependencyTables) > 0 {
// _, tables, err := tableRepository.Find(map[string]interface{}{"context": ctx, "tableIds": dependencyTables})
// if err != nil {
// return nil, err
// }
// tableMap := make(map[int]*domain.Table)
// for i := range tables {
// tableMap[tables[i].TableId] = tables[i]
// }
// for _, c := range queryComponents {
// if t, ok := tableMap[c.MasterTable.TableId]; ok {
// c.MasterTable = domain.NewQueryComponentTable(t)
// }
// }
// for _, t := range validTables {
// if t.TableType == domain.SchemaTable.ToString() || t.TableType == domain.SubProcessTable.ToString() {
// validTables = append(validTables, t)
// }
// }
//}
//
//// 循环依赖判断
//tableDependencyService, _ := NewTableDependencyService(ptr.transactionContext)
//if len(validTables) > 0 {
// tree := tableDependencyService.TableDependTree(validTables, querySet.QuerySetInfo.BindTableId)
// if tableDependencyService.Detect(ctx, tree.EdgesArray()) {
// return nil, NewCircleDependError(tableDependencyService.CircleTable(), querySet)
// }
//}
table.TableInfo.ApplyOnModule = domain.ModuleAll
table.TableInfo.DependencyTables = dependencyTables
... ...
... ... @@ -182,6 +182,9 @@ func (repository *TableRepository) Find(queryOptions map[string]interface{}) (in
query.Where(`table_info->'dependencyTables' @> '[?]'`, v.(int))
}
query.SetWhereByQueryOption("created_at >= ?", "beginTime")
query.SetWhereByQueryOption("created_at < ?", "endTime")
//query.SetOffsetAndLimit(20)
query.SetOrderDirect("table_id", "DESC")
if count, err := query.SelectAndCount(); err != nil {
... ...
... ... @@ -65,3 +65,10 @@ func Delete(db *gorm.DB, tableName string, fields []*domain.FieldValue) error {
tx := db.Exec(sql)
return tx.Error
}
//DROP VIEW IF EXISTS
func DropView(db *gorm.DB, tableName string) error {
tx := db.Exec("DROP VIEW IF EXISTS " + tableName)
return tx.Error
}
... ...
... ... @@ -29,6 +29,14 @@ func (controller *QuerySetController) UpdateQuerySet() {
controller.Response(data, err)
}
func (controller *QuerySetController) PreviewPrepare() {
querySetService := service.NewQuerySetService(nil)
updateQuerySetCommand := &command.UpdateQuerySetCommand{}
Must(controller.Unmarshal(updateQuerySetCommand))
data, err := querySetService.PreviewPrepare(ParseContext(controller.BaseController), updateQuerySetCommand)
controller.Response(data, err)
}
func (controller *QuerySetController) GetQuerySet() {
querySetService := service.NewQuerySetService(nil)
getQuerySetQuery := &query.GetQuerySetQuery{}
... ...
... ... @@ -18,4 +18,5 @@ func init() {
web.Router("/data/query-sets/move", &controllers.QuerySetController{}, "Post:Move")
web.Router("/data/query-sets/rename", &controllers.QuerySetController{}, "Post:Rename")
web.Router("/data/query-sets/search", &controllers.QuerySetController{}, "Post:SearchQuerySet")
web.Router("/data/query-sets/preview-prepare", &controllers.QuerySetController{}, "Post:PreviewPrepare")
}
... ...