作者 yangfu

Merge remote-tracking branch 'origin/dev' into test

正在显示 38 个修改的文件 包含 1109 行增加47 行删除
@@ -3,6 +3,7 @@ module gitlab.fjmaimaimai.com/mmm-go/partner @@ -3,6 +3,7 @@ module gitlab.fjmaimaimai.com/mmm-go/partner
3 go 1.14 3 go 1.14
4 4
5 require ( 5 require (
  6 + github.com/Shopify/sarama v1.26.4
6 github.com/astaxie/beego v1.12.1 7 github.com/astaxie/beego v1.12.1
7 github.com/dgrijalva/jwt-go v3.2.0+incompatible 8 github.com/dgrijalva/jwt-go v3.2.0+incompatible
8 github.com/gin-gonic/gin v1.5.0 9 github.com/gin-gonic/gin v1.5.0
@@ -10,5 +11,5 @@ require ( @@ -10,5 +11,5 @@ require (
10 github.com/linmadan/egglib-go v0.0.0-20191217144343-ca4539f95bf9 11 github.com/linmadan/egglib-go v0.0.0-20191217144343-ca4539f95bf9
11 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect 12 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
12 github.com/shopspring/decimal v1.2.0 13 github.com/shopspring/decimal v1.2.0
13 - github.com/tiptok/gocomm v1.0.1 14 + github.com/tiptok/gocomm v1.0.2
14 ) 15 )
@@ -168,7 +168,7 @@ func getStatistics(userId int64, transactionContext *transaction.TransactionCont @@ -168,7 +168,7 @@ func getStatistics(userId int64, transactionContext *transaction.TransactionCont
168 if count, e := PartnerInfoDao.PartnerStatic(map[string]interface{}{"inPartnerIds": partnerIds, "inPartnerCategory": domain.App}); e == nil { 168 if count, e := PartnerInfoDao.PartnerStatic(map[string]interface{}{"inPartnerIds": partnerIds, "inPartnerCategory": domain.App}); e == nil {
169 Statistics["appCount"] = count 169 Statistics["appCount"] = count
170 } 170 }
171 - if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: partnerIds, OrderType: domain.OrderReal}); e == nil { 171 + if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: partnerIds, OrderTypes: domain.UserOrderTypes(domain.Career)}); e == nil {
172 Statistics["careerOrdersMoney"] = bonus.TotalOrderAmount 172 Statistics["careerOrdersMoney"] = bonus.TotalOrderAmount
173 Statistics["careerDividend"] = bonus.Bonus 173 Statistics["careerDividend"] = bonus.Bonus
174 } 174 }
@@ -24,7 +24,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.DividendStatis @@ -24,7 +24,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.DividendStatis
24 }() 24 }()
25 25
26 // 事业分红统计-查询订单 26 // 事业分红统计-查询订单
27 - _, orderAll, e := OrderBaseResponsitory.Find(utils.ObjectJsonToMap(domain.OrderQueryOption{PartnerId: header.UserId, EndTime: time.Now(), SortByCreateTime: domain.DESC, OrderType: domain.OrderReal})) 27 + _, orderAll, e := OrderBaseResponsitory.Find(utils.ObjectJsonToMap(domain.OrderQueryOption{PartnerId: header.UserId, EndTime: time.Now(), SortByCreateTime: domain.DESC, OrderTypes: domain.UserOrderTypes(domain.Career)}))
28 if e != nil { 28 if e != nil {
29 log.Error(e) 29 log.Error(e)
30 } 30 }
@@ -124,7 +124,7 @@ func OrderList(header *protocol.RequestHeader, request *protocol.DividendOrdersR @@ -124,7 +124,7 @@ func OrderList(header *protocol.RequestHeader, request *protocol.DividendOrdersR
124 rsp = &protocol.DividendOrdersResponse{List: make([]*protocol.DividendOrderListItem, 0)} 124 rsp = &protocol.DividendOrdersResponse{List: make([]*protocol.DividendOrderListItem, 0)}
125 125
126 count, orders, err = OrderDao.DividendOrders(&domain.DividendOrdersQueryOption{ 126 count, orders, err = OrderDao.DividendOrders(&domain.DividendOrdersQueryOption{
127 - OrderType: domain.OrderReal, 127 + OrderTypes: domain.UserOrderTypes(domain.Career),
128 PartnerId: header.UserId, 128 PartnerId: header.UserId,
129 DetailAction: request.DetailAction, 129 DetailAction: request.DetailAction,
130 DividendAction: request.DividendAction, 130 DividendAction: request.DividendAction,
  1 +package factory
  2 +
  3 +//import (
  4 +// "fmt"
  5 +// "github.com/linmadan/egglib-go/core/application"
  6 +// "github.com/linmadan/egglib-go/message/publisher/kafkaMessage/sarama"
  7 +// localMessagePublisher "github.com/linmadan/egglib-go/message/publisher/local_message"
  8 +// localMessageReceiver "github.com/linmadan/egglib-go/message/receiver/local_message"
  9 +// "suplus-message/pkg/constant"
  10 +// "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
  11 +//)
  12 +//
  13 +//func CreateMessagePublisher(options map[string]interface{}) (application.MessagePublisher, error) {
  14 +// if localMessageOptions, ok := options["localMessageOptions"]; ok {
  15 +// var storeType string
  16 +// var storeOptions map[string]interface{}
  17 +// if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok {
  18 +// storeType = value.(string)
  19 +// } else {
  20 +// return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeType")
  21 +// }
  22 +// if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok {
  23 +// storeOptions = value.(map[string]interface{})
  24 +// } else {
  25 +// return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeOptions")
  26 +// }
  27 +// return localMessagePublisher.NewLocalMessagePublisher(storeType, storeOptions)
  28 +// } else {
  29 +// return sarama.NewKafkaSaramaMessagePublisher(constant.KAFKA_HOSTS,log.Logger)
  30 +// }
  31 +//}
  32 +//
  33 +//func CreateMessageReceiver(options map[string]interface{}) (application.MessageReceiver, error) {
  34 +// if localMessageOptions, ok := options["localMessageOptions"]; ok {
  35 +// var converterType string
  36 +// if value, ok := localMessageOptions.(map[string]interface{})["converterType"]; ok {
  37 +// converterType = value.(string)
  38 +// } else {
  39 +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数converterType")
  40 +// }
  41 +// var storeType string
  42 +// var storeOptions map[string]interface{}
  43 +// if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok {
  44 +// storeType = value.(string)
  45 +// } else {
  46 +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeType")
  47 +// }
  48 +// if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok {
  49 +// storeOptions = value.(map[string]interface{})
  50 +// } else {
  51 +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeOptions")
  52 +// }
  53 +// return localMessageReceiver.NewLocalMessageReceiver(converterType, nil, storeType, storeOptions)
  54 +// } else {
  55 +// return nil, fmt.Errorf("缺少参数localMessageOptions")
  56 +// }
  57 +//}
@@ -138,14 +138,14 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic @@ -138,14 +138,14 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic
138 if rsp.Statistics.TodayRealQuantity, rsp.Statistics.TodayRealMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ 138 if rsp.Statistics.TodayRealQuantity, rsp.Statistics.TodayRealMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{
139 BeginTime: utils.GetDayBegin().Unix() * 1000, 139 BeginTime: utils.GetDayBegin().Unix() * 1000,
140 EndTime: utils.GetDayEnd().Unix() * 1000, 140 EndTime: utils.GetDayEnd().Unix() * 1000,
141 - OrderType: domain.OrderReal, 141 + OrderTypes: domain.UserOrderTypes(domain.Career),
142 PartnerId: header.UserId, 142 PartnerId: header.UserId,
143 }); err != nil { 143 }); err != nil {
144 return 144 return
145 } 145 }
146 if rsp.Statistics.CumulativeQuantity, rsp.Statistics.CumulativeMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ 146 if rsp.Statistics.CumulativeQuantity, rsp.Statistics.CumulativeMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{
147 EndTime: time.Now().Unix() * 1000, 147 EndTime: time.Now().Unix() * 1000,
148 - OrderType: domain.OrderReal, 148 + OrderTypes: domain.UserOrderTypes(domain.Career),
149 PartnerId: header.UserId, 149 PartnerId: header.UserId,
150 }); err != nil { 150 }); err != nil {
151 return 151 return
@@ -207,6 +207,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r @@ -207,6 +207,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r
207 queryOption.EndTime = time.Unix(request.EndTime/1000, 0) 207 queryOption.EndTime = time.Unix(request.EndTime/1000, 0)
208 } 208 }
209 queryOption.OrderType = request.OrderType 209 queryOption.OrderType = request.OrderType
  210 + queryOption.OrderTypes = request.OrderTypes
210 total, orders, _ = OrderResponsitory.Find(utils.ObjectJsonToMap(queryOption)) 211 total, orders, _ = OrderResponsitory.Find(utils.ObjectJsonToMap(queryOption))
211 if len(orders) != 0 { 212 if len(orders) != 0 {
212 for i := range orders { 213 for i := range orders {
@@ -222,7 +223,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r @@ -222,7 +223,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r
222 //累计实发订单 223 //累计实发订单
223 cumulativeQuantity, _, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ 224 cumulativeQuantity, _, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{
224 EndTime: time.Now().Unix() * 1000, 225 EndTime: time.Now().Unix() * 1000,
225 - OrderType: domain.OrderReal, 226 + OrderTypes: domain.UserOrderTypes(domain.Career),
226 PartnerId: header.UserId, 227 PartnerId: header.UserId,
227 }) 228 })
228 rsp.Total = cumulativeQuantity 229 rsp.Total = cumulativeQuantity
@@ -65,7 +65,7 @@ func getDetail(userId int64, transactionContext *transaction.TransactionContext) @@ -65,7 +65,7 @@ func getDetail(userId int64, transactionContext *transaction.TransactionContext)
65 } 65 }
66 } 66 }
67 p.CooperateTime = partner.CooperateTime.Unix() * 1000 67 p.CooperateTime = partner.CooperateTime.Unix() * 1000
68 - if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: userId, OrderType: domain.OrderReal}); e == nil { 68 + if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: userId, OrderTypes: domain.UserOrderTypes(domain.Career)}); e == nil {
69 p.CareerOrdersCount = int(bonus.Total) 69 p.CareerOrdersCount = int(bonus.Total)
70 p.CareerOrdersMoney = utils.Decimal(bonus.TotalOrderAmount) 70 p.CareerOrdersMoney = utils.Decimal(bonus.TotalOrderAmount)
71 p.CareerDividend = utils.Decimal(bonus.Bonus) 71 p.CareerDividend = utils.Decimal(bonus.Bonus)
@@ -13,6 +13,7 @@ import ( @@ -13,6 +13,7 @@ import (
13 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log" 13 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
14 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol" 14 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol"
15 protocolx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/auth" 15 protocolx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/auth"
  16 + userx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/user"
16 "strconv" 17 "strconv"
17 "strings" 18 "strings"
18 ) 19 )
@@ -82,10 +83,6 @@ func UserInfo(header *protocol.RequestHeader, request *protocol.UserInfoRequest) @@ -82,10 +83,6 @@ func UserInfo(header *protocol.RequestHeader, request *protocol.UserInfoRequest)
82 Name: company.Name, 83 Name: company.Name,
83 Phone: company.Phone, 84 Phone: company.Phone,
84 }, 85 },
85 - //JoinWay: partnerInfo.PartnerCategoryInfo(),  
86 - //District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName},  
87 - //SerialNo: partnerInfo.Id,  
88 - //CooperateTime: partnerInfo.CooperateTime.Unix() * 1000,  
89 } 86 }
90 } 87 }
91 switch header.AdminType { 88 switch header.AdminType {
@@ -205,28 +202,6 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques @@ -205,28 +202,6 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques
205 transactionContext.RollbackTransaction() 202 transactionContext.RollbackTransaction()
206 }() 203 }()
207 rsp = &protocol.UserInfoResponse{} 204 rsp = &protocol.UserInfoResponse{}
208 -  
209 - type xcompany struct {  
210 - Id int64 `json:"id"`  
211 - Name string `json:"name"`  
212 - Phone string `json:"phone"`  
213 - //合作区域  
214 - District interface{} `json:"district"`  
215 - //合作编码  
216 - SerialNo int64 `json:"serialNo"`  
217 - //合作时间  
218 - CooperateTime int64 `json:"cooperationTime"`  
219 - Salesman interface{} `json:"salesman"`  
220 - }  
221 - type xuser struct {  
222 - Id int64 `json:"uid"`  
223 - //用户名称  
224 - PartnerName string `json:"uname"`  
225 - //手机号  
226 - Phone string `json:"phone"`  
227 - //合作公司  
228 - CooperateCompany xcompany `json:"company"`  
229 - }  
230 rspMap := make(map[string]interface{}) 205 rspMap := make(map[string]interface{})
231 funcPartnerInfo := func() { 206 funcPartnerInfo := func() {
232 if partnerInfo, err = PartnerInfoService.FindOne(map[string]interface{}{"id": header.UserId}); err != nil { 207 if partnerInfo, err = PartnerInfoService.FindOne(map[string]interface{}{"id": header.UserId}); err != nil {
@@ -236,18 +211,27 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques @@ -236,18 +211,27 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques
236 if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { 211 if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil {
237 return 212 return
238 } 213 }
239 -  
240 - u := xuser{ 214 + var miniProgram = make(map[string]interface{})
  215 + if len(company.Applets) > 0 {
  216 + if company.Applets[0].Valid() {
  217 + miniProgram["webpageUrl"] = "www.baidu.com"
  218 + miniProgram["userName"] = company.Applets[0].Id
  219 + miniProgram["path"] = fmt.Sprintf("%v?inviter_id=%v&company_id=%v", company.Applets[0].URL, partnerInfo.Id, partnerInfo.CompanyId)
  220 + miniProgram["hdImageUrl"] = "http://suplus-business-admin-test.fjmaimaimai.com/images/default/default_logo.png"
  221 + miniProgram["title"] = company.Applets[0].Name
  222 + }
  223 + }
  224 + u := userx.User{
241 Id: partnerInfo.Id, 225 Id: partnerInfo.Id,
242 PartnerName: partnerInfo.PartnerName, 226 PartnerName: partnerInfo.PartnerName,
243 Phone: partnerInfo.Account, 227 Phone: partnerInfo.Account,
244 - CooperateCompany: xcompany{ 228 + CooperateCompany: userx.Company{
245 Id: company.Id, 229 Id: company.Id,
246 Name: company.Name, 230 Name: company.Name,
247 Phone: company.Phone, 231 Phone: company.Phone,
248 SerialNo: partnerInfo.Id, 232 SerialNo: partnerInfo.Id,
249 CooperateTime: partnerInfo.CooperateTime.Unix() * 1000, 233 CooperateTime: partnerInfo.CooperateTime.Unix() * 1000,
250 - //JoinWay: partnerInfo.PartnerCategoryInfo(), 234 + MiniProgram: miniProgram,
251 District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName}, 235 District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName},
252 }, 236 },
253 } 237 }
@@ -267,14 +251,15 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques @@ -267,14 +251,15 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques
267 if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { 251 if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil {
268 return 252 return
269 } 253 }
270 - rspMap["user"] = xuser{ 254 + rspMap["user"] = userx.User{
271 Id: user.Id, 255 Id: user.Id,
272 PartnerName: user.Name, 256 PartnerName: user.Name,
273 Phone: user.Phone, 257 Phone: user.Phone,
274 - CooperateCompany: xcompany{ 258 + CooperateCompany: userx.Company{
275 Id: company.Id, 259 Id: company.Id,
276 Name: company.Name, 260 Name: company.Name,
277 Phone: company.Phone, 261 Phone: company.Phone,
  262 + MiniProgram: struct{}{},
278 }, 263 },
279 } 264 }
280 rsp = rspMap 265 rsp = rspMap
  1 +package constant
  2 +
  3 +const TOPIC_UCENT_USER_CHANGE_PHONE = "ucent-user-changePhone"
  4 +
  5 +const KAFKA_HOSTS = "106.52.15.41:9092"
  1 +package domain
  2 +
  3 +type CompanyApplets struct {
  4 + Name string `json:"name"`
  5 + URL string `json:"url"`
  6 + Id string `json:"id"`
  7 +}
  8 +
  9 +func (applets CompanyApplets) Valid() bool {
  10 + if len(applets.Name) == 0 {
  11 + return false
  12 + }
  13 + if len(applets.URL) == 0 {
  14 + return false
  15 + }
  16 + if len(applets.Id) == 0 {
  17 + return false
  18 + }
  19 + return true
  20 +}
@@ -28,6 +28,8 @@ type Company struct { @@ -28,6 +28,8 @@ type Company struct {
28 DeleteAt time.Time `json:"deleteAt"` 28 DeleteAt time.Time `json:"deleteAt"`
29 // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 29 // 是否开启合伙人模块,是否有效【1:有效】【2:无效】
30 Enable int8 `json:"enable"` 30 Enable int8 `json:"enable"`
  31 + // 小程序
  32 + Applets []CompanyApplets `json:"applets"`
31 } 33 }
32 34
33 type CompanyRepository interface { 35 type CompanyRepository interface {
@@ -11,6 +11,7 @@ const ( @@ -11,6 +11,7 @@ const (
11 const ( 11 const (
12 OrderReal = iota + 1 //实发订单 12 OrderReal = iota + 1 //实发订单
13 OrderIntention //意向订单 13 OrderIntention //意向订单
  14 + OrderAppletSeafood //小程序-海鲜干货
14 ) 15 )
15 16
16 const ( 17 const (
@@ -21,3 +22,10 @@ const ( @@ -21,3 +22,10 @@ const (
21 var ( 22 var (
22 QueryNoRow = fmt.Errorf("not row found") 23 QueryNoRow = fmt.Errorf("not row found")
23 ) 24 )
  25 +
  26 +// UserOrderTypes
  27 +// @category 合伙人类型
  28 +// @desc 根据用户类别获取用户能看到的订单类型
  29 +func UserOrderTypes(category int) []int {
  30 + return []int{OrderReal, OrderAppletSeafood}
  31 +}
@@ -141,6 +141,7 @@ func (m *OrderBase) OrderBonusStatic() *OrderStatics { @@ -141,6 +141,7 @@ func (m *OrderBase) OrderBonusStatic() *OrderStatics {
141 type OrderQueryOption struct { 141 type OrderQueryOption struct {
142 PartnerId int64 `json:"partnerId,omitempty"` 142 PartnerId int64 `json:"partnerId,omitempty"`
143 OrderType int `json:"orderType,omitempty"` 143 OrderType int `json:"orderType,omitempty"`
  144 + OrderTypes []int `json:"orderTypes,omitempty"`
144 OrderStatus int `json:"orderStatus,omitempty"` 145 OrderStatus int `json:"orderStatus,omitempty"`
145 BeginTime time.Time `json:"beginTime,omitempty"` 146 BeginTime time.Time `json:"beginTime,omitempty"`
146 EndTime time.Time `json:"endTime,omitempty"` 147 EndTime time.Time `json:"endTime,omitempty"`
@@ -154,6 +155,7 @@ type OrderQueryOption struct { @@ -154,6 +155,7 @@ type OrderQueryOption struct {
154 type DividendOrdersQueryOption struct { 155 type DividendOrdersQueryOption struct {
155 PartnerId int64 `json:"partnerId"` 156 PartnerId int64 `json:"partnerId"`
156 OrderType int `json:"orderType"` //订单类型 157 OrderType int `json:"orderType"` //订单类型
  158 + OrderTypes []int `json:"orderTypes,omitempty"`
157 DetailAction int `json:"detailAction"` //明细类型(0已收明细、1未收明细) 159 DetailAction int `json:"detailAction"` //明细类型(0已收明细、1未收明细)
158 DividendAction int `json:"dividendAction"` //分红类型(0累计分红、1分红支出) 160 DividendAction int `json:"dividendAction"` //分红类型(0累计分红、1分红支出)
159 IsDisable string `json:"isDisable,omitempty"` 161 IsDisable string `json:"isDisable,omitempty"`
@@ -11,6 +11,7 @@ type OrderStaticQuery struct { @@ -11,6 +11,7 @@ type OrderStaticQuery struct {
11 EndTime int64 `json:"endTime,omitempty"` 11 EndTime int64 `json:"endTime,omitempty"`
12 OrderStatus int `json:"orderStatus,omitempty"` 12 OrderStatus int `json:"orderStatus,omitempty"`
13 OrderType int `json:"orderType,omitempty"` 13 OrderType int `json:"orderType,omitempty"`
  14 + OrderTypes []int `json:"orderTypes,omitempty"`
14 //IsDisable int `json:"isDisable,omitempty"` 15 //IsDisable int `json:"isDisable,omitempty"`
15 } 16 }
16 17
@@ -28,6 +29,7 @@ type OrderBonusQuery struct { @@ -28,6 +29,7 @@ type OrderBonusQuery struct {
28 InPartnerIds []int64 `json:"inPartnerIds,omitempty"` 29 InPartnerIds []int64 `json:"inPartnerIds,omitempty"`
29 IsDisable int `json:"isDisable,omitempty"` 30 IsDisable int `json:"isDisable,omitempty"`
30 OrderType int `json:"orderType,omitempty"` 31 OrderType int `json:"orderType,omitempty"`
  32 + OrderTypes []int `json:"orderTypes,omitempty"`
31 } 33 }
32 34
33 // 订单分红统计-应答 35 // 订单分红统计-应答
  1 +package domain
  2 +
  3 +// SysMessageConsume
  4 +type SysMessageConsume struct {
  5 + // 消息ID
  6 + Id int64 `json:"id"`
  7 + // 主题
  8 + Topic string `json:"topic"`
  9 + // 分区信息
  10 + Partition int `json:"partition"`
  11 + // 消息偏移序号
  12 + Offset int64 `json:"offset"`
  13 + // 键值
  14 + Key string `json:"key"`
  15 + // 消息内容
  16 + Value string `json:"value"`
  17 + // 消息时间
  18 + MsgTime int64 `json:"msgTime"`
  19 + // 创建时间
  20 + CreateAt int64 `json:"createAt"`
  21 + // 状态
  22 + Status int64 `json:"status"`
  23 +}
  24 +
  25 +type SysMessageConsumeRepository interface {
  26 + Save(dm *SysMessageConsume) (*SysMessageConsume, error)
  27 + Remove(dm *SysMessageConsume) (*SysMessageConsume, error)
  28 + FindOne(queryOptions map[string]interface{}) (*SysMessageConsume, error)
  29 + Find(queryOptions map[string]interface{}) (int64, []*SysMessageConsume, error)
  30 +}
  31 +
  32 +func (m *SysMessageConsume) Identify() interface{} {
  33 + if m.Id == 0 {
  34 + return nil
  35 + }
  36 + return m.Id
  37 +}
  1 +package domain
  2 +
  3 +// SysMessageProduce
  4 +type SysMessageProduce struct {
  5 + // 消息ID
  6 + Id int64 `json:"id"`
  7 + // 主题
  8 + Topic string `json:"topic"`
  9 + // 分区信息
  10 + Partition int `json:"partition"`
  11 + // 消息内容
  12 + Value string `json:"value"`
  13 + // 消息时间
  14 + MsgTime int64 `json:"msgTime"`
  15 + // 状态
  16 + Status int64 `json:"status"`
  17 +}
  18 +
  19 +type SysMessageProduceRepository interface {
  20 + Save(dm *SysMessageProduce) (*SysMessageProduce, error)
  21 + Remove(dm *SysMessageProduce) (*SysMessageProduce, error)
  22 + FindOne(queryOptions map[string]interface{}) (*SysMessageProduce, error)
  23 + Find(queryOptions map[string]interface{}) (int64, []*SysMessageProduce, error)
  24 +}
  25 +
  26 +func (m *SysMessageProduce) Identify() interface{} {
  27 + if m.Id == 0 {
  28 + return nil
  29 + }
  30 + return m.Id
  31 +}
@@ -30,6 +30,9 @@ func (dao *OrderBaseDao) OrderStatics(option *domain.OrderStaticQuery) (count in @@ -30,6 +30,9 @@ func (dao *OrderBaseDao) OrderStatics(option *domain.OrderStaticQuery) (count in
30 if option.OrderType > 0 { 30 if option.OrderType > 0 {
31 q.Where(`"order_base".order_type =?`, option.OrderType) 31 q.Where(`"order_base".order_type =?`, option.OrderType)
32 } 32 }
  33 + if len(option.OrderTypes) > 0 {
  34 + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes))
  35 + }
33 if option.BeginTime > 0 { 36 if option.BeginTime > 0 {
34 q.Where(`"order_base".create_time >=?`, time.Unix(option.BeginTime/1000, 0)) 37 q.Where(`"order_base".create_time >=?`, time.Unix(option.BeginTime/1000, 0))
35 } 38 }
@@ -68,6 +71,9 @@ func (dao *OrderBaseDao) OrderBonusStatics(option domain.OrderBonusQuery) (rsp d @@ -68,6 +71,9 @@ func (dao *OrderBaseDao) OrderBonusStatics(option domain.OrderBonusQuery) (rsp d
68 if option.OrderType > 0 { 71 if option.OrderType > 0 {
69 q.Where(`"order_base".order_type =?`, option.OrderType) 72 q.Where(`"order_base".order_type =?`, option.OrderType)
70 } 73 }
  74 + if len(option.OrderTypes) > 0 {
  75 + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes))
  76 + }
71 err = q.Select(&rsp.Total, &rsp.Bonus, &rsp.BonusExpense, &rsp.TotalOrderAmount) 77 err = q.Select(&rsp.Total, &rsp.Bonus, &rsp.BonusExpense, &rsp.TotalOrderAmount)
72 return 78 return
73 } 79 }
@@ -81,6 +87,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption @@ -81,6 +87,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption
81 if option.OrderType > 0 { 87 if option.OrderType > 0 {
82 q.Where(`"order_base".order_type=?`, option.OrderType) 88 q.Where(`"order_base".order_type=?`, option.OrderType)
83 } 89 }
  90 + if len(option.OrderTypes) > 0 {
  91 + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes))
  92 + }
84 if option.PartnerId > 0 { 93 if option.PartnerId > 0 {
85 q.Where(`"order_base".partner_id=?`, option.PartnerId) 94 q.Where(`"order_base".partner_id=?`, option.PartnerId)
86 } 95 }
@@ -112,7 +112,7 @@ A left join @@ -112,7 +112,7 @@ A left join
112 plan_order_amount amount, 112 plan_order_amount amount,
113 (case when use_order_count>=0 then use_partner_bonus else plan_partner_bonus end) bonus, 113 (case when use_order_count>=0 then use_partner_bonus else plan_partner_bonus end) bonus,
114 partner_bonus_expense bonus_expense FROM "order_base" AS "order_base" 114 partner_bonus_expense bonus_expense FROM "order_base" AS "order_base"
115 - WHERE (partner_id in (?)) and order_type =1 115 + WHERE (partner_id in (?)) and (order_type in (?))
116 UNION ALL 116 UNION ALL
117 SELECT partner_info_id partner_id, 117 SELECT partner_info_id partner_id,
118 0 amount, bonus bonus, bonus_expense bonus_expense FROM business_bonus 118 0 amount, bonus bonus, bonus_expense bonus_expense FROM business_bonus
@@ -129,7 +129,7 @@ GROUP BY partner_id @@ -129,7 +129,7 @@ GROUP BY partner_id
129 sql.WriteString(fmt.Sprintf(" \nOFFSET %v", offset)) 129 sql.WriteString(fmt.Sprintf(" \nOFFSET %v", offset))
130 } 130 }
131 } 131 }
132 - _, err = tx.Query(&statics, sql.String(), pg.In(partnerIds), pg.In(partnerIds), pg.In(partnerIds)) 132 + _, err = tx.Query(&statics, sql.String(), pg.In(partnerIds), pg.In(partnerIds), pg.In(domain.UserOrderTypes(domain.Career)), pg.In(partnerIds))
133 return 133 return
134 } 134 }
135 135
@@ -104,7 +104,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { @@ -104,7 +104,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) {
104 if len(companyList) == 0 { 104 if len(companyList) == 0 {
105 return response, nil 105 return response, nil
106 } 106 }
107 - totalBonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: doGetPartnerIds(), OrderType: domain.OrderReal}) 107 + totalBonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: doGetPartnerIds(), OrderTypes: domain.UserOrderTypes(domain.Career)})
108 if e != nil { 108 if e != nil {
109 return response, e 109 return response, e
110 } 110 }
@@ -153,7 +153,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { @@ -153,7 +153,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) {
153 continue 153 continue
154 } 154 }
155 155
156 - bonus, _ := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: partner.Id, OrderType: domain.OrderReal}) 156 + bonus, _ := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: partner.Id, OrderTypes: domain.UserOrderTypes(domain.Career)})
157 if v, ok := mapPartnerBussinessBonus[partner.Id]; ok { 157 if v, ok := mapPartnerBussinessBonus[partner.Id]; ok {
158 bonus.Bonus += v.Bonus 158 bonus.Bonus += v.Bonus
159 } 159 }
@@ -196,7 +196,7 @@ func (svr *PgLoginService) ManagerStaticInfo() (interface{}, error) { @@ -196,7 +196,7 @@ func (svr *PgLoginService) ManagerStaticInfo() (interface{}, error) {
196 for i := range companyList { 196 for i := range companyList {
197 c := companyList[i] 197 c := companyList[i]
198 198
199 - if constant.POSTGRESQL_DB_NAME != "partner_dev1" { 199 + if constant.POSTGRESQL_DB_NAME != "partner_dev" {
200 //通过企业平台 校验模块权限 200 //通过企业平台 校验模块权限
201 var user *domain.Users 201 var user *domain.Users
202 for j := range svr.Users { 202 for j := range svr.Users {
  1 +package message
  2 +
  3 +import (
  4 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
  5 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  6 +)
  7 +
  8 +//新消费者-消费组
  9 +func NewConsumer(kafkaHosts string, groupId string) models.Consumer {
  10 + return kafkax.NewSaramaConsumer(kafkaHosts, groupId)
  11 +}
  1 +package message
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/Shopify/sarama"
  6 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  8 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg"
  9 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
  10 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils"
  11 + "log"
  12 + "testing"
  13 +)
  14 +
  15 +// 消息持久化
  16 +func TestNewConsumer(t *testing.T) {
  17 + consumer := NewConsumer(constant.KAFKA_HOSTS, "0")
  18 + consumer.WithMessageReceiver(NewPgMessageReceiverRepository(transaction.NewPGTransactionContext(pg.DB)))
  19 + consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error {
  20 + m, ok := message.(*sarama.Message)
  21 + if !ok {
  22 + return nil
  23 + }
  24 + if len(m.Value) > 0 {
  25 + var msg models.Message
  26 + utils.JsonUnmarshal(string(m.Value), &msg)
  27 + t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value)
  28 + }
  29 + return nil
  30 + })
  31 + consumer.StartConsume()
  32 +}
  33 +
  34 +// 消息不需要持久化
  35 +func TestNewConsumerNoRepository(t *testing.T) {
  36 + consumer := NewConsumer(constant.KAFKA_HOSTS, "0")
  37 + consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error {
  38 + m, ok := message.(*sarama.Message)
  39 + if !ok {
  40 + return nil
  41 + }
  42 + if len(m.Value) > 0 {
  43 + var msg models.Message
  44 + utils.JsonUnmarshal(string(m.Value), &msg)
  45 + t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value)
  46 + }
  47 + return nil
  48 + })
  49 + consumer.StartConsume()
  50 +}
  51 +
  52 +type PgMessageReceiverRepository struct {
  53 + transactionContext *transaction.TransactionContext
  54 +}
  55 +
  56 +func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository {
  57 + return &PgMessageReceiverRepository{
  58 + transactionContext: transactionContext,
  59 + }
  60 +}
  61 +func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error {
  62 + var num int
  63 + checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?`
  64 + _, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"])
  65 + if err != nil {
  66 + return err
  67 + }
  68 + if num > 0 {
  69 + return fmt.Errorf("receive repeate message [%v]", params)
  70 + }
  71 +
  72 + sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)`
  73 + _, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"])
  74 + return err
  75 +}
  76 +func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error {
  77 + log.Println(params)
  78 + _, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"])
  79 + return err
  80 +}
  1 +package kafkax
  2 +
  3 +import (
  4 + "context"
  5 + "fmt"
  6 + "github.com/Shopify/sarama"
  7 + "github.com/tiptok/gocomm/identity/idgen"
  8 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  9 + "log"
  10 + "strings"
  11 + "sync"
  12 + "time"
  13 +)
  14 +
  15 +type SaramaConsumer struct {
  16 + ready chan bool
  17 + messageHandlerMap map[string]func(message interface{}) error
  18 + //Logger log.Logger
  19 + kafkaHosts string
  20 + groupId string
  21 + topicMiss map[string]string //记录未被消费的topic
  22 + receiver models.MessageReceiverRepository
  23 +}
  24 +
  25 +func (consumer *SaramaConsumer) Setup(sarama.ConsumerGroupSession) error {
  26 + close(consumer.ready)
  27 + return nil
  28 +}
  29 +func (consumer *SaramaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
  30 + return nil
  31 +}
  32 +func (consumer *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  33 + var err error
  34 + for message := range claim.Messages() {
  35 + log.Printf("Message claimed: timestamp = %v, topic = %s offset = %v value = %v", message.Timestamp, message.Topic, message.Offset, string(message.Value))
  36 + handler, ok := consumer.messageHandlerMap[message.Topic]
  37 + consumer.messageReceiveBefore(message)
  38 + if !ok {
  39 + continue
  40 + }
  41 + if err = handler(message); err == nil {
  42 + session.MarkMessage(message, "")
  43 + } else {
  44 + fmt.Println("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
  45 + }
  46 + session.MarkMessage(message, "")
  47 + if err != nil {
  48 + continue
  49 + }
  50 + consumer.messageReceiveAfter(message)
  51 + }
  52 + return err
  53 +}
  54 +
  55 +func (consumer *SaramaConsumer) messageReceiveBefore(message *sarama.ConsumerMessage) {
  56 + if consumer.receiver == nil {
  57 + return
  58 + }
  59 +
  60 + var params = make(map[string]interface{})
  61 + var err error
  62 + _, ok := consumer.messageHandlerMap[message.Topic]
  63 + if !ok {
  64 + params["status"] = models.Ignore
  65 + _, topicMiss := consumer.topicMiss[message.Topic]
  66 + if !topicMiss {
  67 + fmt.Printf("topic:[%v] has not consumer handler", message.Topic)
  68 + }
  69 + return
  70 + }
  71 +
  72 + _, err = consumer.storeMessage(params, message)
  73 + if err != nil {
  74 + log.Println("ConsumeClaim:", err)
  75 + }
  76 +}
  77 +func (consumer *SaramaConsumer) messageReceiveAfter(message *sarama.ConsumerMessage) {
  78 + if consumer.receiver == nil {
  79 + return
  80 + }
  81 + consumer.finishMessage(map[string]interface{}{"offset": message.Offset, "topic": message.Topic})
  82 +}
  83 +
  84 +func (consumer *SaramaConsumer) storeMessage(params map[string]interface{}, message *sarama.ConsumerMessage) (id int64, err error) {
  85 + defer func() {
  86 + if e := recover(); e != nil {
  87 + log.Println(e)
  88 + }
  89 + }()
  90 + id = idgen.Next()
  91 + params = make(map[string]interface{})
  92 + params["id"] = message.Offset
  93 + params["topic"] = message.Topic
  94 + params["partition"] = message.Partition
  95 + params["offset"] = message.Offset
  96 + params["key"] = string(message.Key)
  97 + params["value"] = string(message.Value)
  98 + params["msg_time"] = message.Timestamp.Unix()
  99 + params["create_at"] = time.Now().Unix()
  100 + params["status"] = models.UnFinished //0:未完成 1:已完成 2:未命中
  101 + err = consumer.receiver.ReceiveMessage(params)
  102 + return
  103 +}
  104 +func (consumer *SaramaConsumer) finishMessage(params map[string]interface{}) error {
  105 + defer func() {
  106 + if e := recover(); e != nil {
  107 + log.Println(e)
  108 + }
  109 + }()
  110 + consumer.receiver.ConfirmReceive(params)
  111 + return nil
  112 +}
  113 +
  114 +func (consumer *SaramaConsumer) StartConsume() error {
  115 + config := sarama.NewConfig()
  116 + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  117 + config.Consumer.Offsets.Initial = sarama.OffsetNewest
  118 + config.Version = sarama.V0_11_0_0
  119 + brokerList := strings.Split(consumer.kafkaHosts, ",")
  120 + consumerGroup, err := sarama.NewConsumerGroup(brokerList, consumer.groupId, config)
  121 + if err != nil {
  122 + return err
  123 + }
  124 + ctx, cancel := context.WithCancel(context.Background())
  125 + wg := &sync.WaitGroup{}
  126 + wg.Add(1)
  127 + consumer.ready = make(chan bool)
  128 + go func() {
  129 + defer wg.Done()
  130 + for {
  131 + var topics []string
  132 + for key := range consumer.messageHandlerMap {
  133 + topics = append(topics, key)
  134 + }
  135 + if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
  136 + log.Println(err.Error())
  137 + return
  138 + }
  139 + if ctx.Err() != nil {
  140 + return
  141 + }
  142 + }
  143 + }()
  144 + <-consumer.ready
  145 + log.Println("Sarama consumer up and running!...")
  146 + select {
  147 + case <-ctx.Done():
  148 + log.Println("Sarama consumer : context cancelled")
  149 + }
  150 + cancel()
  151 + wg.Wait()
  152 + if err := consumerGroup.Close(); err != nil {
  153 + return err
  154 + }
  155 + return nil
  156 +}
  157 +func (consumer *SaramaConsumer) WithTopicHandler(topic string, handler func(message interface{}) error) { //*sarama.ConsumerMessage
  158 + consumer.messageHandlerMap[topic] = handler
  159 +}
  160 +func (consumer *SaramaConsumer) WithMessageReceiver(receiver models.MessageReceiverRepository) {
  161 + consumer.receiver = receiver
  162 +}
  163 +
  164 +func NewSaramaConsumer(kafkaHosts string, groupId string) models.Consumer {
  165 + return &SaramaConsumer{
  166 + kafkaHosts: kafkaHosts,
  167 + groupId: groupId,
  168 + topicMiss: make(map[string]string),
  169 + messageHandlerMap: make(map[string]func(message interface{}) error),
  170 + }
  171 +}
  1 +package kafkax
  2 +
  3 +import (
  4 + "encoding/json"
  5 + "fmt"
  6 + "github.com/Shopify/sarama"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  8 + "log"
  9 + "strings"
  10 + "time"
  11 +)
  12 +
  13 +// sarame kafka 消息生产
  14 +type KafkaMessageProducer struct {
  15 + KafkaHosts string
  16 + LogInfo models.LogInfo
  17 +}
  18 +
  19 +// 同步发送
  20 +func (engine *KafkaMessageProducer) Publish(messages []*models.Message, option map[string]interface{}) (*models.MessagePublishResult, error) {
  21 + config := sarama.NewConfig()
  22 + config.Producer.Return.Successes = true
  23 + config.Producer.Return.Errors = true
  24 + config.Producer.Partitioner = sarama.NewRandomPartitioner
  25 + config.Producer.Retry.Max = 10
  26 + config.Producer.RequiredAcks = sarama.WaitForAll
  27 + config.Version = sarama.V0_11_0_0
  28 + brokerList := strings.Split(engine.KafkaHosts, ",")
  29 + producer, err := sarama.NewSyncProducer(brokerList, config)
  30 + if err != nil {
  31 + return nil, err
  32 + }
  33 + defer func() {
  34 + if err := producer.Close(); err != nil {
  35 + log.Println(err)
  36 + }
  37 + }()
  38 + var successMessageIds []int64
  39 + var errMessageIds []int64
  40 + for _, message := range messages {
  41 + if value, err := json.Marshal(message); err == nil {
  42 + msg := &sarama.ProducerMessage{
  43 + Topic: message.Topic,
  44 + Value: sarama.StringEncoder(value),
  45 + Timestamp: time.Now(),
  46 + }
  47 + partition, offset, err := producer.SendMessage(msg)
  48 + if err != nil {
  49 + errMessageIds = append(errMessageIds, message.Id)
  50 + log.Println(err)
  51 + } else {
  52 + successMessageIds = append(successMessageIds, message.Id)
  53 + var append = make(map[string]interface{})
  54 + append["topic"] = message.Topic
  55 + append["partition"] = partition
  56 + append["offset"] = offset
  57 + log.Println("kafka消息发送", append)
  58 + }
  59 + }
  60 + }
  61 + return &models.MessagePublishResult{SuccessMessageIds: successMessageIds, ErrorMessageIds: errMessageIds}, nil
  62 +}
  63 +
  64 +// 消息调度器
  65 +type MessageDispatcher struct {
  66 + notifications chan struct{}
  67 + messageChan chan *models.Message
  68 + dispatchTicker *time.Ticker
  69 + messageRepository models.MessageRepository
  70 + producer models.MessageProducer
  71 +}
  72 +
  73 +func (dispatcher *MessageDispatcher) MessagePublishedNotice() error {
  74 + time.Sleep(time.Second * 2)
  75 + dispatcher.notifications <- struct{}{}
  76 + return nil
  77 +}
  78 +
  79 +func (dispatcher *MessageDispatcher) MessagePublish(messages []*models.Message) error {
  80 + for i := range messages {
  81 + dispatcher.messageChan <- messages[i]
  82 + }
  83 + return nil
  84 +}
  85 +
  86 +// go dispatcher.Dispatch() 启动一个独立协程
  87 +func (dispatcher *MessageDispatcher) Dispatch() {
  88 + for {
  89 + select {
  90 + case <-dispatcher.dispatchTicker.C:
  91 + go func(dispatcher *MessageDispatcher) {
  92 + dispatcher.notifications <- struct{}{}
  93 + }(dispatcher)
  94 + case <-dispatcher.notifications:
  95 + if dispatcher.messageRepository == nil {
  96 + continue
  97 + }
  98 + messages, _ := dispatcher.messageRepository.FindNoPublishedStoredMessages()
  99 + var messagesInProcessIds []int64
  100 + for i := range messages {
  101 + messagesInProcessIds = append(messagesInProcessIds, messages[i].Id)
  102 + }
  103 + if messages != nil && len(messages) > 0 {
  104 + dispatcher.messageRepository.FinishMessagesStatus(messagesInProcessIds, int(models.InProcess))
  105 +
  106 + reuslt, err := dispatcher.producer.Publish(messages, nil)
  107 + if err == nil && len(reuslt.SuccessMessageIds) > 0 {
  108 + dispatcher.messageRepository.FinishMessagesStatus(reuslt.SuccessMessageIds, int(models.Finished))
  109 + }
  110 + //发送失败的消息ID列表 更新状态 进行中->未开始
  111 + if len(reuslt.ErrorMessageIds) > 0 {
  112 + dispatcher.messageRepository.FinishMessagesStatus(reuslt.ErrorMessageIds, int(models.UnFinished))
  113 + }
  114 + }
  115 + case msg := <-dispatcher.messageChan:
  116 + dispatcher.producer.Publish([]*models.Message{msg}, nil)
  117 + }
  118 + }
  119 +}
  120 +
  121 +type MessageDirector struct {
  122 + messageRepository models.MessageRepository
  123 + dispatcher *MessageDispatcher
  124 +}
  125 +
  126 +func (d *MessageDirector) PublishMessages(messages []*models.Message) error {
  127 + if d.dispatcher == nil {
  128 + return fmt.Errorf("dispatcher还没有启动")
  129 + }
  130 + if d.messageRepository == nil {
  131 + d.dispatcher.MessagePublish(messages)
  132 + return nil
  133 + }
  134 + for _, message := range messages {
  135 + if err := d.messageRepository.SaveMessage(message); err != nil {
  136 + return err
  137 + }
  138 + }
  139 + if err := d.dispatcher.MessagePublishedNotice(); err != nil {
  140 + return err
  141 + }
  142 + return nil
  143 +}
  144 +
  145 +// 消息发布器
  146 +// options["kafkaHosts"]="localhost:9092"
  147 +// options["timeInterval"]=time.Second*60*5
  148 +func NewMessageDirector(messageRepository models.MessageRepository, options map[string]interface{}) *MessageDirector {
  149 + dispatcher := &MessageDispatcher{
  150 + notifications: make(chan struct{}),
  151 + messageRepository: messageRepository,
  152 + messageChan: make(chan *models.Message, 100),
  153 + }
  154 +
  155 + var hosts string
  156 + if kafkaHosts, ok := options["kafkaHosts"]; ok {
  157 + hosts = kafkaHosts.(string)
  158 + } else {
  159 + hosts = "localhost:9092"
  160 + }
  161 + dispatcher.producer = &KafkaMessageProducer{KafkaHosts: hosts, LogInfo: models.DefaultLog}
  162 +
  163 + if interval, ok := options["timeInterval"]; ok {
  164 + dispatcher.dispatchTicker = time.NewTicker(interval.(time.Duration))
  165 + } else {
  166 + dispatcher.dispatchTicker = time.NewTicker(time.Second * 60 * 5)
  167 + }
  168 + go dispatcher.Dispatch()
  169 +
  170 + return &MessageDirector{
  171 + messageRepository: messageRepository,
  172 + dispatcher: dispatcher,
  173 + }
  174 +}
  1 +package models
  2 +
  3 +import "log"
  4 +
  5 +// 消息存储-发布
  6 +type MessageRepository interface {
  7 + SaveMessage(message *Message) error
  8 + FindNoPublishedStoredMessages() ([]*Message, error)
  9 + FinishMessagesStatus(messageIds []int64, finishStatus int) error
  10 +}
  11 +
  12 +// 消息存储-接收
  13 +type MessageReceiverRepository interface {
  14 + ReceiveMessage(params map[string]interface{}) error
  15 + ConfirmReceive(params map[string]interface{}) error
  16 +}
  17 +
  18 +// 消费者
  19 +type Consumer interface {
  20 + StartConsume() error
  21 + WithTopicHandler(topic string, handler func(message interface{}) error)
  22 + WithMessageReceiver(receiver MessageReceiverRepository)
  23 +}
  24 +
  25 +// 生产者
  26 +type MessageProducer interface {
  27 + Publish(messages []*Message, option map[string]interface{}) (*MessagePublishResult, error)
  28 +}
  29 +
  30 +type LogInfo func(params ...interface{})
  31 +
  32 +var DefaultLog LogInfo = func(params ...interface{}) {
  33 + log.Println(params...)
  34 +}
  1 +package models
  2 +
  3 +type Message struct {
  4 + Id int64 `json:"id"`
  5 + Topic string `json:"topic"`
  6 + Value string `json:"value"`
  7 + MsgTime int64 `json:"msg_time"`
  8 + FinishStatus int `json:"-"` //0:未完成 2:已完成 1:进行中 3:忽略
  9 +}
  10 +
  11 +//结束状态
  12 +type FinishStatus int
  13 +
  14 +const (
  15 + UnFinished FinishStatus = 0
  16 + InProcess FinishStatus = 1
  17 + Finished FinishStatus = 2
  18 + Ignore FinishStatus = 3
  19 +)
  20 +
  21 +type MessagePublishResult struct {
  22 + SuccessMessageIds []int64
  23 + ErrorMessageIds []int64
  24 +}
  1 +package message
  2 +
  3 +import (
  4 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
  5 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  6 +)
  7 +
  8 +// 消息发布器
  9 +// options["kafkaHosts"]="localhost:9092"
  10 +// options["timeInterval"]=time.Second*60*5
  11 +func NewMessageProducer(messageRepository models.MessageRepository, options map[string]interface{}) *kafkax.MessageDirector {
  12 + dispatcher := kafkax.NewMessageDirector(messageRepository, options)
  13 + return dispatcher
  14 +}
  1 +package message
  2 +
  3 +import (
  4 + "github.com/go-pg/pg/v10"
  5 + "github.com/tiptok/gocomm/identity/idgen"
  6 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
  8 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
  9 + pgDB "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg"
  10 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
  11 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils"
  12 + "testing"
  13 + "time"
  14 +)
  15 +
  16 +// 发布消息本地持久化
  17 +func TestNewMessageProducer(t *testing.T) {
  18 + var (
  19 + transactionContext = transaction.NewPGTransactionContext(pgDB.DB)
  20 + err error
  21 + )
  22 +
  23 + producer := NewMessageProducer(NewPgMessageRepository(transactionContext), map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS})
  24 + err = producer.PublishMessages([]*models.Message{
  25 + &models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0},
  26 + })
  27 +
  28 + if err != nil {
  29 + return
  30 + }
  31 + time.Sleep(time.Second * 2)
  32 +}
  33 +
  34 +// 发布消息无本地持久化
  35 +func TestNewMessageProducerNoRepository(t *testing.T) {
  36 + var (
  37 + err error
  38 + )
  39 +
  40 + producer := NewMessageProducer(nil, map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS})
  41 + err = producer.PublishMessages([]*models.Message{
  42 + &models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0},
  43 + })
  44 +
  45 + if err != nil {
  46 + return
  47 + }
  48 + time.Sleep(time.Second * 2)
  49 +}
  50 +
  51 +// 简单发布消息
  52 +func TestSampleProducer(t *testing.T) {
  53 + var producer models.MessageProducer = &kafkax.KafkaMessageProducer{
  54 + KafkaHosts: constant.KAFKA_HOSTS,
  55 + }
  56 + _, err := producer.Publish([]*models.Message{{Id: 22, Topic: "mmm_xcx_orders", MsgTime: time.Now().Unix(), Value: "hello ccc20201009!"}}, nil)
  57 + if err != nil {
  58 + t.Fatal(err)
  59 + }
  60 +}
  61 +
  62 +type PgMessageRepository struct {
  63 + transactionContext *transaction.TransactionContext
  64 +}
  65 +
  66 +func (repository *PgMessageRepository) SaveMessage(message *models.Message) error {
  67 + sql := `insert into sys_message_produce (id,topic,value,msg_time,status)values(?,?,?,?,?)`
  68 + _, err := repository.transactionContext.PgDd.Exec(sql, message.Id, message.Topic, utils.JsonAssertString(message), message.MsgTime, int64(models.UnFinished))
  69 + return err
  70 +}
  71 +func (repository *PgMessageRepository) FindNoPublishedStoredMessages() ([]*models.Message, error) {
  72 + sql := `select value from sys_message_produce where status=?`
  73 + var values []string
  74 + _, e := repository.transactionContext.PgDd.Query(&values, sql, int64(models.UnFinished))
  75 + var messages = make([]*models.Message, 0)
  76 + if e != nil {
  77 + return messages, nil
  78 + }
  79 + for _, v := range values {
  80 + item := &models.Message{}
  81 + utils.JsonUnmarshal(v, item)
  82 + if item.Id != 0 {
  83 + messages = append(messages, item)
  84 + }
  85 + }
  86 + return messages, nil
  87 +}
  88 +func (repository *PgMessageRepository) FinishMessagesStatus(messageIds []int64, finishStatus int) error {
  89 + _, err := repository.transactionContext.PgDd.Exec("update sys_message_produce set status=? where id in (?)", finishStatus, pg.In(messageIds))
  90 + return err
  91 +}
  92 +func NewPgMessageRepository(transactionContext *transaction.TransactionContext) *PgMessageRepository {
  93 + return &PgMessageRepository{
  94 + transactionContext: transactionContext,
  95 + }
  96 +}
@@ -28,6 +28,8 @@ func init() { @@ -28,6 +28,8 @@ func init() {
28 (*models.PartnerInfo)(nil), 28 (*models.PartnerInfo)(nil),
29 (*models.PartnerSubAccount)(nil), 29 (*models.PartnerSubAccount)(nil),
30 (*models.Company)(nil), 30 (*models.Company)(nil),
  31 + //(*models.SysMessageConsume)(nil),
  32 + //(*models.SysMessageProduce)(nil),
31 (*models.OrderBase)(nil), 33 (*models.OrderBase)(nil),
32 (*models.OrderGood)(nil), 34 (*models.OrderGood)(nil),
33 (*models.ImInfo)(nil), 35 (*models.ImInfo)(nil),
1 package models 1 package models
2 2
3 -import "time" 3 +import (
  4 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain"
  5 + "time"
  6 +)
4 7
5 // 公司信息 8 // 公司信息
6 type Company struct { 9 type Company struct {
@@ -29,4 +32,6 @@ type Company struct { @@ -29,4 +32,6 @@ type Company struct {
29 DeleteAt time.Time 32 DeleteAt time.Time
30 // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 33 // 是否开启合伙人模块,是否有效【1:有效】【2:无效】
31 Enable int8 34 Enable int8
  35 + // 小程序
  36 + Applets []domain.CompanyApplets
32 } 37 }
  1 +package models
  2 +
  3 +// SysMessageConsume
  4 +type SysMessageConsume struct {
  5 + tableName struct{} `pg:"sys_message_consume"`
  6 + // 消息ID
  7 + Id int64
  8 + // 主题
  9 + Topic string
  10 + // 分区信息
  11 + Partition int
  12 + // 消息偏移序号
  13 + Offset int64
  14 + // 键值
  15 + Key string
  16 + // 消息内容
  17 + Value string
  18 + // 消息时间
  19 + MsgTime int64
  20 + // 创建时间
  21 + CreateAt int64
  22 + // 状态
  23 + Status int64
  24 +}
  1 +package models
  2 +
  3 +// SysMessageProduce
  4 +type SysMessageProduce struct {
  5 + tableName struct{} `pg:"sys_message_produce"`
  6 + // 消息ID
  7 + Id int64
  8 + // 主题
  9 + Topic string
  10 + // 分区信息
  11 + Partition int
  12 + // 消息内容
  13 + Value string
  14 + // 消息时间
  15 + MsgTime int64
  16 + // 状态
  17 + Status int64
  18 +}
1 package repository 1 package repository
2 2
3 import ( 3 import (
  4 + "github.com/go-pg/pg/v10"
4 "github.com/go-pg/pg/v10/orm" 5 "github.com/go-pg/pg/v10/orm"
5 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" 6 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain"
6 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models" 7 "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models"
@@ -84,6 +85,9 @@ func (repository *OrderBaseRepository) Find(queryOptions map[string]interface{}) @@ -84,6 +85,9 @@ func (repository *OrderBaseRepository) Find(queryOptions map[string]interface{})
84 SetLimit(). 85 SetLimit().
85 SetOrder(`order_base.create_time`, "sortByCreateTime"). 86 SetOrder(`order_base.create_time`, "sortByCreateTime").
86 SetOrder(`order_base.update_time`, "sortByUpdateTime") 87 SetOrder(`order_base.update_time`, "sortByUpdateTime")
  88 + if v, ok := queryOptions["orderTypes"]; ok {
  89 + query.Where(`"order_base".order_type in (?)`, pg.In(v))
  90 + }
87 var err error 91 var err error
88 if query.AffectRow, err = query.SelectAndCount(); err != nil { 92 if query.AffectRow, err = query.SelectAndCount(); err != nil {
89 return 0, OrderBases, err 93 return 0, OrderBases, err
  1 +package repository
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/tiptok/gocomm/common"
  6 + . "github.com/tiptok/gocomm/pkg/orm/pgx"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain"
  8 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models"
  9 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
  10 +)
  11 +
  12 +type SysMessageConsumeRepository struct {
  13 + transactionContext *transaction.TransactionContext
  14 +}
  15 +
  16 +func (repository *SysMessageConsumeRepository) Save(dm *domain.SysMessageConsume) (*domain.SysMessageConsume, error) {
  17 + var (
  18 + err error
  19 + m = &models.SysMessageConsume{}
  20 + tx = repository.transactionContext.PgTx
  21 + )
  22 + if err = common.GobModelTransform(m, dm); err != nil {
  23 + return nil, err
  24 + }
  25 + if dm.Identify() == nil {
  26 + if err = tx.Insert(m); err != nil {
  27 + return nil, err
  28 + }
  29 + return dm, nil
  30 + }
  31 + if err = tx.Update(m); err != nil {
  32 + return nil, err
  33 + }
  34 + return dm, nil
  35 +}
  36 +
  37 +func (repository *SysMessageConsumeRepository) Remove(SysMessageConsume *domain.SysMessageConsume) (*domain.SysMessageConsume, error) {
  38 + var (
  39 + tx = repository.transactionContext.PgTx
  40 + SysMessageConsumeModel = &models.SysMessageConsume{Id: SysMessageConsume.Identify().(int64)}
  41 + )
  42 + if _, err := tx.Model(SysMessageConsumeModel).Where("id = ?", SysMessageConsume.Id).Delete(); err != nil {
  43 + return SysMessageConsume, err
  44 + }
  45 + return SysMessageConsume, nil
  46 +}
  47 +
  48 +func (repository *SysMessageConsumeRepository) FindOne(queryOptions map[string]interface{}) (*domain.SysMessageConsume, error) {
  49 + tx := repository.transactionContext.PgTx
  50 + SysMessageConsumeModel := new(models.SysMessageConsume)
  51 + query := NewQuery(tx.Model(SysMessageConsumeModel), queryOptions)
  52 + query.SetWhere("id = ?", "id")
  53 + if err := query.First(); err != nil {
  54 + return nil, fmt.Errorf("query row not found")
  55 + }
  56 + if SysMessageConsumeModel.Id == 0 {
  57 + return nil, fmt.Errorf("query row not found")
  58 + }
  59 + return repository.transformPgModelToDomainModel(SysMessageConsumeModel)
  60 +}
  61 +
  62 +func (repository *SysMessageConsumeRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.SysMessageConsume, error) {
  63 + tx := repository.transactionContext.PgTx
  64 + var SysMessageConsumeModels []*models.SysMessageConsume
  65 + SysMessageConsumes := make([]*domain.SysMessageConsume, 0)
  66 + query := NewQuery(tx.Model(&SysMessageConsumeModels), queryOptions).
  67 + SetOrder("create_at", "sortByCreateTime")
  68 + var err error
  69 + if query.AffectRow, err = query.SelectAndCount(); err != nil {
  70 + return 0, SysMessageConsumes, err
  71 + }
  72 + for _, SysMessageConsumeModel := range SysMessageConsumeModels {
  73 + if SysMessageConsume, err := repository.transformPgModelToDomainModel(SysMessageConsumeModel); err != nil {
  74 + return 0, SysMessageConsumes, err
  75 + } else {
  76 + SysMessageConsumes = append(SysMessageConsumes, SysMessageConsume)
  77 + }
  78 + }
  79 + return int64(query.AffectRow), SysMessageConsumes, nil
  80 +}
  81 +
  82 +func (repository *SysMessageConsumeRepository) transformPgModelToDomainModel(SysMessageConsumeModel *models.SysMessageConsume) (*domain.SysMessageConsume, error) {
  83 + m := &domain.SysMessageConsume{}
  84 + err := common.GobModelTransform(m, SysMessageConsumeModel)
  85 + return m, err
  86 +}
  87 +
  88 +func NewSysMessageConsumeRepository(transactionContext *transaction.TransactionContext) (*SysMessageConsumeRepository, error) {
  89 + if transactionContext == nil {
  90 + return nil, fmt.Errorf("transactionContext参数不能为nil")
  91 + }
  92 + return &SysMessageConsumeRepository{transactionContext: transactionContext}, nil
  93 +}
  1 +package repository
  2 +
  3 +import (
  4 + "fmt"
  5 + "github.com/tiptok/gocomm/common"
  6 + . "github.com/tiptok/gocomm/pkg/orm/pgx"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain"
  8 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models"
  9 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
  10 +)
  11 +
  12 +type SysMessageProduceRepository struct {
  13 + transactionContext *transaction.TransactionContext
  14 +}
  15 +
  16 +func (repository *SysMessageProduceRepository) Save(dm *domain.SysMessageProduce) (*domain.SysMessageProduce, error) {
  17 + var (
  18 + err error
  19 + m = &models.SysMessageProduce{}
  20 + tx = repository.transactionContext.PgTx
  21 + )
  22 + if err = common.GobModelTransform(m, dm); err != nil {
  23 + return nil, err
  24 + }
  25 +
  26 + if err = tx.Insert(m); err != nil {
  27 + return nil, err
  28 + }
  29 + return dm, nil
  30 +}
  31 +
  32 +func (repository *SysMessageProduceRepository) Remove(SysMessageProduce *domain.SysMessageProduce) (*domain.SysMessageProduce, error) {
  33 + var (
  34 + tx = repository.transactionContext.PgTx
  35 + SysMessageProduceModel = &models.SysMessageProduce{Id: SysMessageProduce.Identify().(int64)}
  36 + )
  37 + if _, err := tx.Model(SysMessageProduceModel).Where("id = ?", SysMessageProduce.Id).Delete(); err != nil {
  38 + return SysMessageProduce, err
  39 + }
  40 + return SysMessageProduce, nil
  41 +}
  42 +
  43 +func (repository *SysMessageProduceRepository) FindOne(queryOptions map[string]interface{}) (*domain.SysMessageProduce, error) {
  44 + tx := repository.transactionContext.PgTx
  45 + SysMessageProduceModel := new(models.SysMessageProduce)
  46 + query := NewQuery(tx.Model(SysMessageProduceModel), queryOptions)
  47 + query.SetWhere("id = ?", "id")
  48 + if err := query.First(); err != nil {
  49 + return nil, fmt.Errorf("query row not found")
  50 + }
  51 + if SysMessageProduceModel.Id == 0 {
  52 + return nil, fmt.Errorf("query row not found")
  53 + }
  54 + return repository.transformPgModelToDomainModel(SysMessageProduceModel)
  55 +}
  56 +
  57 +func (repository *SysMessageProduceRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.SysMessageProduce, error) {
  58 + tx := repository.transactionContext.PgDd
  59 + var SysMessageProduceModels []*models.SysMessageProduce
  60 + SysMessageProduces := make([]*domain.SysMessageProduce, 0)
  61 + query := NewQuery(tx.Model(&SysMessageProduceModels), queryOptions).
  62 + SetWhere("status = ?", "status").
  63 + SetOrder("update_time", "sortByUpdateTime")
  64 + var err error
  65 + if query.AffectRow, err = query.SelectAndCount(); err != nil {
  66 + return 0, SysMessageProduces, err
  67 + }
  68 + for _, SysMessageProduceModel := range SysMessageProduceModels {
  69 + if SysMessageProduce, err := repository.transformPgModelToDomainModel(SysMessageProduceModel); err != nil {
  70 + return 0, SysMessageProduces, err
  71 + } else {
  72 + SysMessageProduces = append(SysMessageProduces, SysMessageProduce)
  73 + }
  74 + }
  75 + return int64(query.AffectRow), SysMessageProduces, nil
  76 +}
  77 +
  78 +func (repository *SysMessageProduceRepository) transformPgModelToDomainModel(SysMessageProduceModel *models.SysMessageProduce) (*domain.SysMessageProduce, error) {
  79 + m := &domain.SysMessageProduce{}
  80 + err := common.GobModelTransform(m, SysMessageProduceModel)
  81 + return m, err
  82 +}
  83 +
  84 +func NewSysMessageProduceRepository(transactionContext *transaction.TransactionContext) (*SysMessageProduceRepository, error) {
  85 + if transactionContext == nil {
  86 + return nil, fmt.Errorf("transactionContext参数不能为nil")
  87 + }
  88 + return &SysMessageProduceRepository{transactionContext: transactionContext}, nil
  89 +}
@@ -66,7 +66,7 @@ func (this *OrderController) OrderList() { @@ -66,7 +66,7 @@ func (this *OrderController) OrderList() {
66 msg = m 66 msg = m
67 return 67 return
68 } 68 }
69 - request.OrderType = domain.OrderReal 69 + request.OrderTypes = domain.UserOrderTypes(domain.Career)
70 header := this.GetRequestHeader(this.Ctx) 70 header := this.GetRequestHeader(this.Ctx)
71 msg = protocol.NewReturnResponse(order.List(header, request)) 71 msg = protocol.NewReturnResponse(order.List(header, request))
72 } 72 }
  1 +package messageHandler
  2 +
  3 +import "github.com/Shopify/sarama"
  4 +
  5 +type UcenterMessageCommand struct {
  6 +}
  7 +
  8 +func (c *UcenterMessageCommand) ChangePhoneHandler(message interface{}) error {
  9 + msg, ok := message.(*sarama.Message)
  10 + if !ok && msg == nil {
  11 + return nil
  12 + }
  13 + return nil
  14 +}
  1 +package sarama
  2 +
  3 +import (
  4 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
  5 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
  6 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log"
  7 + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/port/sarama/messageHandler"
  8 + //"suplus-message/pkg/constant"
  9 + //"suplus-message/pkg/port/sarama/messageHandler"
  10 +)
  11 +
  12 +func Run() {
  13 + var (
  14 + ucenterMessage = &messageHandler.UcenterMessageCommand{}
  15 + )
  16 +
  17 + saramaConsumer := kafkax.NewSaramaConsumer(constant.KAFKA_HOSTS, constant.SERVICE_NAME)
  18 + saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, ucenterMessage.ChangePhoneHandler)
  19 +
  20 + err := saramaConsumer.StartConsume()
  21 + if err != nil {
  22 + log.Error(err)
  23 + }
  24 +}
@@ -71,6 +71,7 @@ type OrderListRequest struct { @@ -71,6 +71,7 @@ type OrderListRequest struct {
71 PageIndex int `json:"pageIndex"` 71 PageIndex int `json:"pageIndex"`
72 PageSize int `json:"pageSize" valid:"Required"` 72 PageSize int `json:"pageSize" valid:"Required"`
73 OrderType int `json:"-"` 73 OrderType int `json:"-"`
  74 + OrderTypes []int `json:"-"`
74 } 75 }
75 type OrderListResponse struct { 76 type OrderListResponse struct {
76 List []*OrderListItem `json:"list"` 77 List []*OrderListItem `json:"list"`
  1 +package user
  2 +
  3 +type Company struct {
  4 + Id int64 `json:"id"`
  5 + Name string `json:"name"`
  6 + Phone string `json:"phone"`
  7 + //合作区域
  8 + District interface{} `json:"district"`
  9 + //合作编码
  10 + SerialNo int64 `json:"serialNo"`
  11 + //合作时间
  12 + CooperateTime int64 `json:"cooperationTime"`
  13 + Salesman interface{} `json:"salesman"`
  14 + MiniProgram interface{} `json:"miniProgram"`
  15 +}
  16 +type User struct {
  17 + Id int64 `json:"uid"`
  18 + //用户名称
  19 + PartnerName string `json:"uname"`
  20 + //手机号
  21 + Phone string `json:"phone"`
  22 + //合作公司
  23 + CooperateCompany Company `json:"company"`
  24 +}