作者 庄敏学

移动端筛选项本地存储获取

... ... @@ -7,17 +7,16 @@ import (
"github.com/golang-jwt/jwt/v4/request"
"github.com/zeromicro/go-queue/kq"
"github.com/zeromicro/go-zero/core/logx"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/config"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/handler"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/logic/consumer"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
"net/http"
"strings"
"time"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/config"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/handler"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/rest"
)
... ... @@ -61,23 +60,23 @@ func main() {
db.Migrate(ctx.DB)
//启动消费队列
go startConsume(c)
startConsume(c)
logx.Infof("Starting server at %s:%d... \n", c.Host, c.Port)
server.Start()
}
func startConsume(c config.Config) {
svcCtx := svc.NewServiceContext(c)
//svcCtx := svc.NewServiceContext(c)
go func() {
for {
notice := &domain.ObjectNotice{
CompanyId: 1594869884284571648,
TableId: 1573,
CompanyId: 1598224576532189184,
TableId: 521,
TableType: "主表",
ObjectType: "导入模块",
Event: "table.data.edit",
TableAffectedList: []int{1573},
TableAffectedList: []int{521},
DataChanged: true,
StructChanged: true,
MetaData: domain.ObjectNoticeMetaData{
... ... @@ -91,18 +90,29 @@ func startConsume(c config.Config) {
time.Sleep(10 * 10 * time.Second)
}
}()
//kafka消费队列 处理字库推送事件
go func() {
}()
//kq.MustNewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
//for {
svcCtx := svc.NewServiceContext(c)
queue, err := kq.NewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
if err != nil {
panic(err)
} else {
queue.Start()
}
}()
//redis消费队列 处理表数据存储到本地
go func() {
for {
svcCtx := svc.NewServiceContext(c)
str, err := svcCtx.Redis.Rpop(c.Name + ":table_data")
if err == nil {
_ = consumer.NewByteTableDataLogic(svcCtx).Sync(str)
}
time.Sleep(3 * time.Second)
}
}()
//kq.MustNewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
//for {
//time.Sleep(1 * time.Second)
//}
}
... ...
... ... @@ -2,6 +2,7 @@ package chart
import (
"context"
"encoding/json"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/pkg/contextdata"
... ... @@ -89,6 +90,16 @@ func (l *SaveChartLogic) SaveChart(req *types.ChartSaveRequest) (resp *types.Cha
}, true); err != nil {
return nil, xerr.NewErrMsgErr("创建失败", err)
}
if len(chartSetting.DataSourceIds) > 0 {
for _, sourceId := range chartSetting.DataSourceIds {
pusher := &types.SyncTableDataPusher{
CompanyId: tenantId,
ObjectId: int(sourceId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes))
}
}
resp = &types.ChartSaveResponse{
Chart: types.NewChartItemWithSetting(chart, chartSetting),
}
... ...
... ... @@ -2,6 +2,8 @@ package chart
import (
"context"
"encoding/json"
"github.com/samber/lo"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/pkg/contextdata"
... ... @@ -41,6 +43,7 @@ func (l *UpdateChartLogic) UpdateChart(req *types.ChartUpdateRequest) (resp *typ
if chartSetting, err = l.svcCtx.ChartSettingRepository.FindOne(l.ctx, conn, req.Id); err != nil {
return nil, xerr.NewErrMsgErr("图表配置不存在", err)
}
oldDataSource := chartSetting.DataSourceIds
if err = transaction.UseTrans(l.ctx, l.svcCtx.DB, func(ctx context.Context, conn transaction.Conn) error {
if len(req.Cover) > 0 && chart.Cover != req.Cover {
... ... @@ -54,9 +57,36 @@ func (l *UpdateChartLogic) UpdateChart(req *types.ChartUpdateRequest) (resp *typ
chartSetting.TableAbility = chartProperty.TableAbility
chartSetting.Series = chartProperty.Series
chartSetting.Other = chartProperty.Other
chartSetting.DataSourceIds = chartProperty.GetAllDataSourceId()
if chartSetting, err = l.svcCtx.ChartSettingRepository.UpdateWithVersion(l.ctx, conn, chartSetting); err != nil {
return err
}
//对比更新前后数据源
left, right := lo.Difference(chartSetting.DataSourceIds, oldDataSource)
//同步
if len(left) > 0 {
for _, sourceId := range left {
pusher := &types.SyncTableDataPusher{
CompanyId: tenantId,
ObjectId: int(sourceId),
}
mBytes, _ := json.Marshal(pusher)
_, _ = l.svcCtx.Redis.LpushCtx(l.ctx, l.svcCtx.Config.Name+":table_data", string(mBytes))
}
}
//删除
if len(right) > 0 {
for _, sourceId := range right {
//验证其他图表是否使用
used, err := l.svcCtx.ChartSettingRepository.CheckUseDataSource(l.ctx, conn, int(sourceId))
if err == nil && !used { //未使用,删除
err = l.svcCtx.ObjectTableDataRepository.DropTable(l.ctx, conn, int(sourceId))
if err != nil {
return err
}
}
}
}
return nil
}, true); err != nil {
return nil, xerr.NewErrMsgErr("创建失败", err)
... ...
... ... @@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/stores/redis"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
... ... @@ -125,38 +123,14 @@ func (logic *ByteNoticeLogic) handleNotice(conn transaction.Conn, notice *domain
}
//数据变更
if notice.DataChanged {
tableDataPreview, err := logic.svcCtx.ByteMetadataService.TableDataPreview(logic.ctx, &bytelib.TableDataPreviewRequest{
Token: accessToken,
ObjectId: int64(notice.TableId),
ObjectType: bytelib.ObjectMetaTable,
Where: &bytelib.TableQueryWhere{
PageNumber: 1,
PageSize: bytelib.MaxPageSize,
},
UseCache: true,
HiddenData: false,
})
if err != nil {
return err
}
//加锁,避免图表在请求读取本地数据时报错
key := logic.svcCtx.Config.Name + ":bytelib:" + fmt.Sprintf("%v", tableDataPreview.ObjectId)
lock := redis.NewRedisLock(logic.svcCtx.Redis, key)
// 设置过期时间
lock.SetExpire(10 * 60)
acquire, err := lock.Acquire()
fmt.Println(acquire, err)
defer lock.Release()
err = logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
if err != nil {
return err
data := &types.SyncTableDataPusher{
CompanyId: notice.CompanyId,
ObjectId: notice.TableId,
}
//更新标记本地存储
err = logic.updateTableWithLocal(conn, int(tableDataPreview.ObjectId))
if err != nil {
mBytes, _ := json.Marshal(data)
_, err := logic.svcCtx.Redis.LpushCtx(logic.ctx, logic.svcCtx.Config.Name+":table_data", string(mBytes))
return err
}
}
return nil
}
... ... @@ -191,6 +165,14 @@ func (logic *ByteNoticeLogic) handleDelete(conn transaction.Conn, notice *domain
return err
}
}
//是否有使用数据源
used, err := logic.svcCtx.ChartSettingRepository.CheckUseDataSource(logic.ctx, conn, notice.TableId)
if err == nil && !used {
err = logic.svcCtx.ObjectTableDataRepository.DropTable(logic.ctx, conn, notice.TableId)
if err != nil {
return err
}
}
return nil
}
... ... @@ -227,14 +209,3 @@ func (logic *ByteNoticeLogic) saveFields(conn transaction.Conn, objectField *dom
return logic.svcCtx.ObjectFieldRepository.Insert(logic.ctx, conn, objectField)
}
}
// updateTableWithLocal 更新表标记本地存储
func (logic *ByteNoticeLogic) updateTableWithLocal(conn transaction.Conn, tableId int) error {
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, tableId)
if err == nil && objectTable.Id > 0 {
objectTable.IsLocal = true
_, err = logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
return err
}
return errors.New("表不存在")
}
... ...
package consumer
import (
"context"
"encoding/json"
"github.com/pkg/errors"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/db/transaction"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
)
type ByteTableDataLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewByteTableDataLogic(svcCtx *svc.ServiceContext) *ByteTableDataLogic {
return &ByteTableDataLogic{
svcCtx: svcCtx,
ctx: context.Background(),
}
}
func (logic *ByteTableDataLogic) Sync(pusherStr string) error {
pusher := &types.SyncTableDataPusher{}
err := json.Unmarshal([]byte(pusherStr), pusher)
if err != nil {
return err
}
conn := logic.svcCtx.DefaultDBConn()
accessToken, _ := types.TableAccessToken{CompanyId: pusher.CompanyId}.GenerateToken()
tableDataPreview, err := logic.svcCtx.ByteMetadataService.TableDataPreview(logic.ctx, &bytelib.TableDataPreviewRequest{
Token: accessToken,
ObjectId: int64(pusher.ObjectId),
ObjectType: bytelib.ObjectMetaTable,
Where: &bytelib.TableQueryWhere{
PageNumber: 1,
PageSize: bytelib.MaxPageSize,
},
UseCache: true,
HiddenData: false,
})
if err != nil {
return err
}
err = transaction.UseTrans(logic.ctx, conn.DB(), func(ctx context.Context, conn transaction.Conn) error {
err = logic.svcCtx.ObjectTableDataRepository.InsertWithTableData(logic.ctx, conn, bytelib.TableData(tableDataPreview))
if err != nil {
return err
}
//更新标记本地存储
err = logic.updateTableWithLocal(conn, int(tableDataPreview.ObjectId))
if err != nil {
return err
}
return nil
}, true)
return err
}
// updateTableWithLocal 更新表标记本地存储
func (logic *ByteTableDataLogic) updateTableWithLocal(conn transaction.Conn, tableId int) error {
objectTable, err := logic.svcCtx.ObjectTableRepository.FindOneByTableId(logic.ctx, conn, tableId)
if err == nil && objectTable.Id > 0 {
objectTable.IsLocal = true
_, err = logic.svcCtx.ObjectTableRepository.Update(logic.ctx, conn, objectTable)
return err
}
return errors.New("表不存在")
}
... ...
... ... @@ -2,16 +2,15 @@ package table
import (
"context"
"fmt"
"github.com/jinzhu/copier"
"github.com/samber/lo"
"github.com/zeromicro/go-zero/core/logx"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/interanl/pkg/gateway/bytelib"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/pkg/xerr"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/svc"
"gitlab.fjmaimaimai.com/allied-creation/sumifcc-bchart/cmd/chart-server/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type SearchTableFieldOptionalValuesLogic struct {
... ... @@ -29,6 +28,12 @@ func NewSearchTableFieldOptionalValuesLogic(ctx context.Context, svcCtx *svc.Ser
}
func (l *SearchTableFieldOptionalValuesLogic) SearchTableFieldOptionalValues(req *types.SearchTableFieldOptionalValuesRequest) (resp *types.SearchTableFieldOptionalValuesResponse, err error) {
//取本地数据
resp, err = l.getLocal(req)
if err == nil {
return resp, err
}
//获取远程数据
fieldOptionalValuesRequest := &bytelib.TableFieldOptionalValuesRequest{
Token: req.Token,
ObjectType: bytelib.ObjectMetaTable,
... ... @@ -103,9 +108,17 @@ func (l *SearchTableFieldOptionalValuesLogic) getLocal(req *types.SearchTableFie
return nil, xerr.NewErrMsg("字段" + item.FieldName + "不存在")
}
}
field := req.Field
for _, item := range objectField.Fields {
if req.Field == item.Name {
field = item.SQLName
}
}
//表数据
_, list, err := l.svcCtx.ObjectTableDataRepository.Find(l.ctx, conn, req.ObjectId, &domain.ObjectTableDataQuery{
Conditions: conditions,
Group: field,
Select: field,
})
if err != nil {
return nil, xerr.NewErrMsg("查询表数据失败")
... ... @@ -116,8 +129,12 @@ func (l *SearchTableFieldOptionalValuesLogic) getLocal(req *types.SearchTableFie
}
if len(list) > 0 {
lo.ForEach(list, func(item map[string]interface{}, index int) {
if _, ok := item[field]; ok {
resp.Values = append(resp.Values, fmt.Sprintf("%v", item[field]))
}
})
resp.Values = lo.Uniq(resp.Values)
resp.Total = int64(len(resp.Values))
}
return resp, nil
}
... ...
... ... @@ -242,6 +242,11 @@ type SearchTableDataRequest struct {
type SearchTableDataResponse struct {
}
type SyncTableDataPusher struct {
CompanyId int64 `json:"companyId,string"` //公司ID
ObjectId int `json:"objectId"`
}
type AppPageGetRequest struct {
Id int64 `path:"id"`
}
... ...
... ... @@ -140,6 +140,16 @@ func (repository *ChartSettingRepository) Find(ctx context.Context, conn transac
return total, dms, nil
}
// CheckUseDataSource 检验是否使用数据源
func (repository *ChartSettingRepository) CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error) {
var count int64
err := conn.DB().Model(&models.ChartSetting{}).Where("data_source_ids::jsonb @>'[?]'", objectId).Count(&count).Error
if err != nil {
return false, err
}
return count > 0, nil
}
func (repository *ChartSettingRepository) ModelToDomainModel(from *models.ChartSetting) (*domain.ChartSetting, error) {
to := &domain.ChartSetting{}
err := copier.Copy(to, from)
... ...
... ... @@ -76,7 +76,12 @@ func (repository *ObjectTableDataRepository) DropTable(ctx context.Context, conn
if err != nil {
return err
}
return conn.DB().Exec(sql).Error
err = conn.DB().Exec(sql).Error
if err != nil {
return err
}
//设置数据表本地存储为false
return conn.DB().Exec("update object_table set is_local=false where table_id = ? and is_del=0", tableId).Error
}
// InsertWithTableData 写入表数据
... ... @@ -139,6 +144,12 @@ func (repository *ObjectTableDataRepository) Find(ctx context.Context, conn tran
}
}
}
if query.Group != "" {
tx = tx.Group(query.Group)
}
if query.Select != "" {
tx = tx.Select(query.Select)
}
list := make([]map[string]interface{}, 0)
options := domain.NewQueryOptions()
if query.Size > 0 {
... ... @@ -148,6 +159,11 @@ func (repository *ObjectTableDataRepository) Find(ctx context.Context, conn tran
return total, list, tx.Error
}
// SyncData 同步表数据
//func (repository *ObjectTableDataRepository) SyncData(ctx context.Context, conn transaction.Conn, companyId int64, tableId int) error {
//
//}
func NewObjectTableDataRepository() domain.ObjectTableDataRepository {
return &ObjectTableDataRepository{}
}
... ...
... ... @@ -28,6 +28,7 @@ type ChartProperty struct {
}
type Other struct {
Quarter *Quarter `json:"quarter,optional,omitempty"` // 四分图
Divider *Divider `json:"divider,optional,omitempty"` // 分割线
}
type Quarter struct {
XAxisLabel string `json:"xAxisLabel"` // x轴标签名
... ... @@ -41,6 +42,12 @@ type Quarter struct {
SeriesList []QuarterSeries `json:"seriesList"` // 图形系列
}
type Divider struct {
SelectedIdx string `json:"selectedIdx,optional,omitempty"` // 选择的分割线样式
TextSwitch bool `json:"textSwitch,optional,omitempty"` // 是否展示组件文本
Text string `json:"text"` // 组件文本内容
}
type QuarterSeries struct {
SeriesValue string `json:"seriesValue"`
}
... ...
... ... @@ -30,6 +30,7 @@ type ChartSettingRepository interface {
Delete(ctx context.Context, conn transaction.Conn, dm *ChartSetting) (*ChartSetting, error)
FindOne(ctx context.Context, conn transaction.Conn, id int64) (*ChartSetting, error)
Find(ctx context.Context, conn transaction.Conn, queryOptions map[string]interface{}) (int64, []*ChartSetting, error)
CheckUseDataSource(ctx context.Context, conn transaction.Conn, objectId int) (bool, error)
}
func (m *ChartSetting) Identify() interface{} {
... ...
... ... @@ -38,6 +38,8 @@ type ObjectTableDataQuery struct {
Page int
Size int
Conditions []*TableDataCondition
Group string
Select string
}
type TableDataCondition struct {
... ... @@ -50,6 +52,7 @@ type TableDataCondition struct {
type ObjectTableDataRepository interface {
InsertWithTableData(ctx context.Context, conn transaction.Conn, tableDataPreview bytelib.TableData) error
Find(ctx context.Context, conn transaction.Conn, tableId int, query *ObjectTableDataQuery) (int64, []map[string]interface{}, error)
DropTable(ctx context.Context, conn transaction.Conn, tableId int) error
}
func (m *ObjectTable) Identify() interface{} {
... ...
... ... @@ -96,4 +96,9 @@ type (
}
SearchTableDataResponse {
}
SyncTableDataPusher {
CompanyId int64 `json:"companyId,string"` //公司ID
ObjectId int `json:"objectId"`
}
)
\ No newline at end of file
... ...