作者 yangfu

fix(core): append data error

... ... @@ -13,7 +13,7 @@ type UpdateMappingRuleCommand struct {
// 匹配规则ID
MappingRuleId int `cname:"匹配规则ID" json:"mappingRuleId" valid:"Required"`
// 名称
Name string `cname:"名称" json:"name" valid:"Required"`
Name string `cname:"名称" json:"name"`
// 校验文件列
MappingFields []*domain.MappingField `cname:"匹配规则列表" json:"mappingFields" valid:"Required"`
}
... ...
... ... @@ -288,11 +288,14 @@ func (mappingRuleService *MappingRuleService) UpdateMappingRule(ctx *domain.Cont
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if duplicateRule, e := mappingRuleRepository.FindOne(map[string]interface{}{"context": ctx, "name": cmd.Name}); e == nil && duplicateRule != nil && duplicateRule.MappingRuleId != cmd.MappingRuleId {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, "方案名称重复")
if len(cmd.Name) != 0 {
if duplicateRule, e := mappingRuleRepository.FindOne(map[string]interface{}{"context": ctx, "name": cmd.Name}); e == nil && duplicateRule != nil && duplicateRule.MappingRuleId != cmd.MappingRuleId {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, "方案名称重复")
}
}
if len(cmd.Name) > 0 {
mappingRule.Name = cmd.Name
}
mappingRule.Name = cmd.Name
mappingRule.MappingFields = cmd.MappingFields
mappingRule.VerifiedFileFields = fileTable.Fields(false)
mappingRule.UpdatedAt = time.Now()
... ...
... ... @@ -165,6 +165,8 @@ type (
FileId int
FileUrl string
Table *Table
From []*Field
To []*Field
}
DataAppendData struct {
... ...
... ... @@ -52,12 +52,18 @@ type Condition struct {
}
func (t *DataTable) OptionalValue() []string {
//set := make(map[string]string)
var values = make([]string, 0)
if len(t.Data) > 0 && len(t.Data[0]) == 1 {
for i := range t.Data {
if len(t.Data[i]) == 0 {
continue
}
//if _, ok := set[t.Data[i][0]]; ok {
// continue
//} else {
// set[t.Data[i][0]] = ""
//}
values = append(values, t.Data[i][0])
}
}
... ...
... ... @@ -86,6 +86,7 @@ var (
var SQLTypeMap = map[string]string{
String.ToString(): "文本",
Int.ToString(): "整数",
BigInt.ToString(): "整数",
Float.ToString(): "小数",
Date.ToString(): "日期",
Datetime.ToString(): "日期时间",
... ... @@ -180,8 +181,8 @@ var DBTables = map[int]*Table{
{
Index: 5,
Name: "操作时间",
SQLName: "created_at",
SQLType: Datetime.ToString(),
SQLName: "log_time", //"created_at",
SQLType: String.ToString(),
Flag: MainTableField,
},
{
... ...
... ... @@ -73,6 +73,7 @@ func (file *File) CopyTo(fileType FileType, ctx *Context) *File {
Url: file.FileInfo.Url,
Ext: file.FileInfo.Ext,
RowCount: file.FileInfo.RowCount,
TableId: file.FileInfo.TableId,
},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
... ...
... ... @@ -26,6 +26,8 @@ type Log struct {
CreatedAt time.Time `json:"createdAt"`
// 扩展
Context *Context `json:"context"`
// 日志时间
LogTime string `json:"log_time"`
}
type LogRepository interface {
... ...
package domain
import "time"
// LogEntry 日志内容
type LogEntry struct {
// 对象名称 数据表名 / 文件名
... ... @@ -14,6 +16,8 @@ type LogEntry struct {
OperatorName string `json:"operatorName"`
// 错误级别
Level string `json:"level"`
// 日志时间
LogTime string `json:"logTime"`
// 错误信息
Error string `json:"error"`
ctx *Context `json:"-"`
... ... @@ -33,6 +37,7 @@ func NewLogEntry(fileOrTableName string, objectType string, operationType Operat
ObjectType: objectType,
OperationType: operationType.ToString(),
OperatorName: ctx.OperatorName,
LogTime: time.Now().Local().Format("2006-01-02 15:04:05"),
ctx: ctx,
}
}
... ...
... ... @@ -226,13 +226,18 @@ type (
)
func NewTableAppendRequest(param domain.ReqAppendData) TableAppendRequest {
return TableAppendRequest{
req := TableAppendRequest{
OriginalTableId: intToString(param.FileId),
CheckoutTableFileUrl: param.FileUrl,
DatabaseTableName: param.Table.SQLName,
ColumnSchemas: DomainFieldsToColumnSchemas(param.Table.DataFields),
FieldSchemas: ToFieldSchemas(param.Table.DataFields),
}
if len(param.From) > 0 {
req.ColumnSchemas = DomainFieldsToColumnSchemas(param.From)
req.FieldSchemas = ToFieldSchemas(param.To)
}
return req
}
func intToString(i int) string {
... ...
... ... @@ -60,6 +60,8 @@ func (ptr *EditDataTableService) Edit(ctx *domain.Context, req domain.EditTableR
if response != nil && len(response.Fields) > 0 {
// 特殊处理修改类型错误
options := make([]redis.FileCacheOptionsFunc, 0)
cacheService := redis.NewFileCacheService()
if req.Action == "convert-column-type" {
var toType = req.Params["convertType"].(string)
var fieldName = req.ProcessFieldNames[0]
... ... @@ -68,9 +70,15 @@ func (ptr *EditDataTableService) Edit(ctx *domain.Context, req domain.EditTableR
} else {
options = append(options, redis.WithRemoveConvertTypeErrors([]redis.ConvertTypeError{{FieldName: fieldName, ErrMsg: errMsg, ToType: toType}}))
}
// 底层未返回更改类型以后的字段列表,手动修改缓存
if file, err := cacheService.UpdateField(redis.KeyTemporaryFileInfo(file.FileId), file, options...); err != nil {
return nil, err
} else {
response.Fields = file.Fields
}
return response, nil
}
cacheService := redis.NewFileCacheService()
if _, err := cacheService.Update(redis.KeyTemporaryFileInfo(file.FileId), file, response.Fields, response.Total, options...); err != nil {
return nil, err
}
... ...
... ... @@ -31,6 +31,12 @@ func (ptr *PreviewDataTableService) Preview(ctx *domain.Context, fileId int, fie
if tempFile, _ := fileCache.Get(redis.KeyTemporaryFileInfo(fileId)); tempFile != nil {
isSourceFile = false
fields = tempFile.Fields
} else if file.FileInfo.TableId > 0 {
tableRepository, _ := repository.NewTableRepository(ptr.transactionContext)
table, _ := tableRepository.FindOne(map[string]interface{}{"tableId": file.FileInfo.TableId})
if table != nil {
fields = table.Fields(false)
}
}
// Load Data From Excel(python api)
... ... @@ -52,8 +58,11 @@ func (ptr *PreviewDataTableService) Preview(ctx *domain.Context, fileId int, fie
if err != nil {
return nil, err
}
if len(fields) == 0 {
fields = response.Fields
}
cache := redis.NewFileCacheService()
tempFile, err := cache.Update(redis.KeyTemporaryFileInfo(file.FileId), file, response.Fields, response.Total)
tempFile, err := cache.Update(redis.KeyTemporaryFileInfo(file.FileId), file, fields, response.Total)
if err != nil {
return nil, err
}
... ... @@ -92,7 +101,15 @@ func (d *FilePreviewDto) Load(fileId int, m *domain.DataLoadDataTable, file *red
d.ObjectId = fileId
d.ObjectType = domain.ObjectFile
d.TableType = domain.ExcelTable.ToString()
d.Fields = m.Fields
for i, f := range file.Fields {
d.Fields = append(d.Fields, &domain.Field{
Index: i + 1,
Name: f.Name,
SQLName: f.Name,
SQLType: f.SQLType,
})
}
var fields []*domain.Field
mapData := domain.ToFieldData(m.Fields, formatData(m.Data, func(s string) string {
if s == "<NA>" {
... ... @@ -103,7 +120,7 @@ func (d *FilePreviewDto) Load(fileId int, m *domain.DataLoadDataTable, file *red
d.Data = domain.GripData(mapData, int64(m.Total))
d.PageNumber = m.PageNumber
for _, f := range m.Fields {
for _, f := range file.Fields {
copyField := f.Copy()
for _, e := range file.ConvertTypeErrors {
if e.FieldName == copyField.Name {
... ...
... ... @@ -43,6 +43,7 @@ func (ptr *PGLogService) Log(logType domain.LogType, sourceId int, logEntry Log)
OperatorName: entry.OperatorName,
CreatedAt: time.Now(),
Context: logEntry.Context(),
LogTime: entry.LogTime,
}
if v, ok := logEntry.Context().GetValue(domain.ContextWithLogLevel); ok {
... ...
... ... @@ -59,7 +59,25 @@ func (ptr *AppendDataToTableService) AppendData(ctx *domain.Context, fileId int,
}
// 通知底层进行追加数据
if _, err = ByteCore.AppendData(domain.ReqAppendData{Table: table, FileId: fileId, FileUrl: file.FileInfo.Url}); err != nil {
requestData := domain.ReqAppendData{Table: table, FileId: fileId, FileUrl: file.FileInfo.Url}
if len(mappingFields) > 0 {
for _, m := range mappingFields {
if len(m.VerifiedFileFieldName) == 0 {
continue
}
fromField, ok := excelTable.MatchField(&domain.Field{Name: m.VerifiedFileFieldName})
if !ok {
continue
}
toField, ok := table.MatchField(m.MainTableField)
if !ok {
continue
}
requestData.To = append(requestData.To, toField)
requestData.From = append(requestData.From, fromField)
}
}
if _, err = ByteCore.AppendData(requestData); err != nil {
return nil, err
}
return map[string]interface{}{
... ...
... ... @@ -113,7 +113,54 @@ func (s *FileCacheService) Update(key string, file *domain.File, fields []*domai
response.AddConvertTypeError(options.AddConvertTypeErrors[i])
}
for i := range options.RemoveConvertTypeErrors {
convertType := options.RemoveConvertTypeErrors[i]
response.RemoveConvertTypeError(options.RemoveConvertTypeErrors[i])
for j := range response.Fields {
if response.Fields[j].Name == convertType.FieldName {
response.Fields[j].SQLType = convertType.ToType
break
}
}
}
err = ZeroCoreRedis.Setex(key, json.MarshalToString(response), TemporaryFileExpire)
if err != nil {
return nil, err
}
return response, err
}
func (s *FileCacheService) UpdateField(key string, file *domain.File, errors ...FileCacheOptionsFunc) (*TemporaryFileInfo, error) {
ok, err := ZeroCoreRedis.Exists(key)
var response = &TemporaryFileInfo{}
if err != nil {
return response, err
}
if !ok {
return nil, fmt.Errorf("文件不存在")
}
data, err := ZeroCoreRedis.Get(key)
if err != nil {
return nil, err
}
err = json.UnmarshalFromString(data, response)
if err != nil {
return nil, err
}
options := NewFileCacheOptions(errors...)
for i := range options.AddConvertTypeErrors {
response.AddConvertTypeError(options.AddConvertTypeErrors[i])
}
for i := range options.RemoveConvertTypeErrors {
convertType := options.RemoveConvertTypeErrors[i]
response.RemoveConvertTypeError(options.RemoveConvertTypeErrors[i])
for j := range response.Fields {
if response.Fields[j].Name == convertType.FieldName {
response.Fields[j].SQLType = convertType.ToType
break
}
}
}
err = ZeroCoreRedis.Setex(key, json.MarshalToString(response), TemporaryFileExpire)
... ...
... ... @@ -31,6 +31,7 @@ func (repository *LogRepository) Save(log *domain.Log) (*domain.Log, error) {
"created_at",
"entry",
"context",
"log_time",
}
insertFieldsSnippet := sqlbuilder.SqlFieldsSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "log_id"))
insertPlaceHoldersSnippet := sqlbuilder.SqlPlaceHoldersSnippet(sqlbuilder.RemoveSqlFields(sqlBuildFields, "log_id"))
... ... @@ -52,6 +53,7 @@ func (repository *LogRepository) Save(log *domain.Log) (*domain.Log, error) {
&log.CreatedAt,
&log.Entry,
&log.Context,
&log.LogTime,
),
fmt.Sprintf("INSERT INTO metadata.logs (%s) VALUES (%s) RETURNING %s", insertFieldsSnippet, insertPlaceHoldersSnippet, returningFieldsSnippet),
log.LogType,
... ... @@ -64,6 +66,7 @@ func (repository *LogRepository) Save(log *domain.Log) (*domain.Log, error) {
log.CreatedAt,
log.Entry,
log.Context,
log.LogTime,
); err != nil {
return log, err
}
... ... @@ -81,6 +84,7 @@ func (repository *LogRepository) Save(log *domain.Log) (*domain.Log, error) {
&log.CreatedAt,
&log.Entry,
&log.Context,
&log.LogTime,
),
fmt.Sprintf("UPDATE metadata.logs SET %s WHERE log_id=? RETURNING %s", updateFieldsSnippet, returningFieldsSnippet),
log.LogType,
... ... @@ -93,6 +97,7 @@ func (repository *LogRepository) Save(log *domain.Log) (*domain.Log, error) {
log.CreatedAt,
log.Entry,
log.Context,
log.LogTime,
log.Identify(),
); err != nil {
return log, err
... ...
... ... @@ -142,7 +142,7 @@ var opMap = map[string]string{
func (c Condition) formatByOp(op string, val interface{}) string {
if op == "like" || op == "not like" {
return fmt.Sprintf("'%%%v%%'", AssertString(val))
return fmt.Sprintf("'%%%s%%'", AssertString(val))
}
return c.Arg(val)
}
... ... @@ -155,7 +155,12 @@ func (c Condition) InArgs(args interface{}) string {
func (c Condition) Arg(args interface{}) string {
bytes := make([]byte, 0)
bytes = appendValue(bytes, reflect.ValueOf(args))
v := reflect.ValueOf(args)
if v.Kind() == reflect.Int || v.Kind() == reflect.Int64 || v.Kind() == reflect.Float64 {
bytes = append(bytes, []byte(AssertString(args))...)
return string(bytes)
}
bytes = appendValue(bytes, v)
return string(bytes)
}
... ... @@ -171,7 +176,6 @@ func appendIn(b []byte, slice reflect.Value) []byte {
if elem.Kind() == reflect.Interface {
elem = elem.Elem()
}
if elem.Kind() == reflect.Slice {
//b = appendIn(b, elem)
} else {
... ...