作者 yangfu

Merge remote-tracking branch 'origin/test' into dev

# Conflicts:
#	pkg/infrastructure/repository/pg_users_repository.go
... ... @@ -101,6 +101,8 @@ spec:
value: "8082"
- name: SERVICE_ENV
value: "dev"
- name: GROUP_UP_BLOCK_CHAIN
value: "allied_creation_message_dev"
- name: REDIS_HOST
valueFrom:
configMapKeyRef:
... ...
... ... @@ -105,6 +105,8 @@ spec:
value: "8082"
- name: SERVICE_ENV
value: "test"
- name: GROUP_UP_BLOCK_CHAIN
value: "allied_creation_message_test"
- name: REDIS_HOST
valueFrom:
configMapKeyRef:
... ...
... ... @@ -80,6 +80,9 @@ func (dto *UserDto) LoadDto(user *domain.User, company *domain.Company) error {
dto.UserInfo.IcCardNumber = user.Ext.IcCardNumber
dto.UserInfo.EmployeeType = user.Ext.EmployeeType
}
if dto.UserInfo.EmployeeType == 0 {
dto.UserInfo.EmployeeType = 1
}
if len(dto.UserRole) == 0 {
dto.UserRole = make([]*domain.Role, 0)
}
... ...
... ... @@ -46,6 +46,10 @@ type ListUserQuery struct {
AdvancedQuery string `json:"advancedQuery"`
// 在企业范围内
InCompanyIds []int64 `cname:"在企业范围内" json:"inCompanyIds,omitempty"`
// 用户编号 企业内标识
UserCode string `cname:"用户编号" json:"userCode,omitempty"`
// IC卡号
IcCardNumber string `cname:"IC卡号" json:"icCardNumber,omitempty"`
}
func (listUserQuery *ListUserQuery) Valid(validation *validation.Validation) {
... ...
... ... @@ -9,6 +9,8 @@ var (
TOPIC_LOG_STASH = "go_stash_dev" //"pushMessage"
// kafka topic up_block_chain
TOPIC_UP_BLOCK_CHAIN = "allied_creation_message"
// 区块链消息 消费组
GROUP_UP_BLOCK_CHAIN = "allied_creation_message"
// 是否启用日志收集 (本地不启用)
ENABLE_KAFKA_LOG = false
)
... ... @@ -23,4 +25,10 @@ func init() {
if os.Getenv("ENABLE_KAFKA_LOG") != "" {
ENABLE_KAFKA_LOG = true
}
if os.Getenv("TOPIC_UP_BLOCK_CHAIN") != "" {
TOPIC_UP_BLOCK_CHAIN = os.Getenv("TOPIC_UP_BLOCK_CHAIN")
}
if os.Getenv("GROUP_UP_BLOCK_CHAIN") != "" {
GROUP_UP_BLOCK_CHAIN = os.Getenv("GROUP_UP_BLOCK_CHAIN")
}
}
... ...
... ... @@ -109,7 +109,7 @@ func TestBSNBlockChain_UpToChain(t *testing.T) {
PublicKey: pubKey,
PrivatePem: priK,
}
options := NewUpToChainOptions("table", "2", "149848948").WithDesc("desc")
options := NewUpToChainOptions("table", "2", "149848948000").WithDesc("desc")
rsp, err := bc.UpToChain(options)
if err != nil {
t.Fatal(err)
... ...
... ... @@ -210,6 +210,8 @@ func (repository *UserRepository) Find(queryOptions map[string]interface{}) (int
if v, ok := queryOptions["inCompanyIds"]; ok && len(v.([]int64)) > 0 {
query.Where(`company_id in (?)`, pg.In(v))
}
query.SetWhereByQueryOption("user_code = ?", "userCode")
query.SetWhereByQueryOption("ext->>'icCardNumber' = ?", "icCardNumber")
query.SetWhereByQueryOption("user_base_id=?", "userBaseId")
query.SetWhereByQueryOption("(user_type & ?)>0", "userType")
query.SetWhereByQueryOption("enable_status=?", "enableStatus")
... ... @@ -232,7 +234,7 @@ func (repository *UserRepository) Find(queryOptions map[string]interface{}) (int
if v, ok := queryOptions["advancedQuery"]; ok && len(v.(string)) > 0 {
query.Where(v.(string))
}
query.SetOffsetAndLimit(20)
query.SetOffsetAndLimit(domain.MaxQueryRow)
query.SetOrderDirect("user_id", "DESC")
if count, err := query.SelectAndCount(); err != nil {
return 0, users, err
... ...
... ... @@ -12,10 +12,10 @@ import (
func SetUp() {
go func() {
q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.GROUP_UP_BLOCK_CHAIN, 1), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
defer func() {
q.Stop()
log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN))
log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.GROUP_UP_BLOCK_CHAIN))
}()
q.Start()
}()
... ... @@ -38,7 +38,7 @@ func NewConfig(topic, group string, consumers int) kq.KqConf {
Offset: "first",
Conns: 1,
Consumers: consumers,
Processors: 4,
Processors: 1,
MinBytes: 10200,
MaxBytes: 10485760,
}
... ...
... ... @@ -10,11 +10,11 @@ import (
)
func UpToChainHandler(k, v string) error {
log.Logger.Debug(fmt.Sprintf("%s", v), map[string]interface{}{"handler": "UptoChain"})
blockChainService := service.NewBlockChainService(nil)
upChainCommand := &command.UpChainCommand{}
err := json.UnmarshalFromString(v, upChainCommand)
if err != nil {
log.Logger.Error(err.Error(), map[string]interface{}{"info": "UpToChainHandler 解析json错误", "data": v})
return err
}
_, err = blockChainService.UpChain(upChainCommand)
... ... @@ -26,9 +26,12 @@ func FilterTopicHandler(topic string, handler func(string, string) error) func(s
raw := &RawData{}
err := json.UnmarshalFromString(v, raw)
if err != nil {
log.Logger.Error("【GoQueue】消息解析错误:"+err.Error(), map[string]interface{}{"info": "FilterTopicHandler 解析json错误", "data": v})
return err
}
log.Logger.Debug(fmt.Sprintf("【GoQueue】收到消息 Topic:%s", raw.Topic), map[string]interface{}{"data": v})
if raw.Topic != topic {
log.Logger.Debug(fmt.Sprintf("【GoQueue】 topic not equal get:%v want:%v", raw.Topic, topic), map[string]interface{}{"data": v})
return nil
}
return handler(k, string(raw.Data))
... ...