Merge remote-tracking branch 'origin/test'
正在显示
43 个修改的文件
包含
1147 行增加
和
51 行删除
| @@ -3,6 +3,7 @@ module gitlab.fjmaimaimai.com/mmm-go/partner | @@ -3,6 +3,7 @@ module gitlab.fjmaimaimai.com/mmm-go/partner | ||
| 3 | go 1.14 | 3 | go 1.14 |
| 4 | 4 | ||
| 5 | require ( | 5 | require ( |
| 6 | + github.com/Shopify/sarama v1.26.4 | ||
| 6 | github.com/astaxie/beego v1.12.1 | 7 | github.com/astaxie/beego v1.12.1 |
| 7 | github.com/dgrijalva/jwt-go v3.2.0+incompatible | 8 | github.com/dgrijalva/jwt-go v3.2.0+incompatible |
| 8 | github.com/gin-gonic/gin v1.5.0 | 9 | github.com/gin-gonic/gin v1.5.0 |
| @@ -10,5 +11,5 @@ require ( | @@ -10,5 +11,5 @@ require ( | ||
| 10 | github.com/linmadan/egglib-go v0.0.0-20191217144343-ca4539f95bf9 | 11 | github.com/linmadan/egglib-go v0.0.0-20191217144343-ca4539f95bf9 |
| 11 | github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect | 12 | github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect |
| 12 | github.com/shopspring/decimal v1.2.0 | 13 | github.com/shopspring/decimal v1.2.0 |
| 13 | - github.com/tiptok/gocomm v1.0.1 | 14 | + github.com/tiptok/gocomm v1.0.2 |
| 14 | ) | 15 | ) |
| @@ -168,7 +168,7 @@ func getStatistics(userId int64, transactionContext *transaction.TransactionCont | @@ -168,7 +168,7 @@ func getStatistics(userId int64, transactionContext *transaction.TransactionCont | ||
| 168 | if count, e := PartnerInfoDao.PartnerStatic(map[string]interface{}{"inPartnerIds": partnerIds, "inPartnerCategory": domain.App}); e == nil { | 168 | if count, e := PartnerInfoDao.PartnerStatic(map[string]interface{}{"inPartnerIds": partnerIds, "inPartnerCategory": domain.App}); e == nil { |
| 169 | Statistics["appCount"] = count | 169 | Statistics["appCount"] = count |
| 170 | } | 170 | } |
| 171 | - if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: partnerIds, OrderType: domain.OrderReal}); e == nil { | 171 | + if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: partnerIds, OrderTypes: domain.UserOrderTypes(domain.Career)}); e == nil { |
| 172 | Statistics["careerOrdersMoney"] = bonus.TotalOrderAmount | 172 | Statistics["careerOrdersMoney"] = bonus.TotalOrderAmount |
| 173 | Statistics["careerDividend"] = bonus.Bonus | 173 | Statistics["careerDividend"] = bonus.Bonus |
| 174 | } | 174 | } |
| @@ -24,7 +24,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.DividendStatis | @@ -24,7 +24,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.DividendStatis | ||
| 24 | }() | 24 | }() |
| 25 | 25 | ||
| 26 | // 事业分红统计-查询订单 | 26 | // 事业分红统计-查询订单 |
| 27 | - _, orderAll, e := OrderBaseResponsitory.Find(utils.ObjectJsonToMap(domain.OrderQueryOption{PartnerId: header.UserId, EndTime: time.Now(), SortByCreateTime: domain.DESC, OrderType: domain.OrderReal})) | 27 | + _, orderAll, e := OrderBaseResponsitory.Find(utils.ObjectJsonToMap(domain.OrderQueryOption{PartnerId: header.UserId, EndTime: time.Now(), SortByCreateTime: domain.DESC, OrderTypes: domain.UserOrderTypes(domain.Career)})) |
| 28 | if e != nil { | 28 | if e != nil { |
| 29 | log.Error(e) | 29 | log.Error(e) |
| 30 | } | 30 | } |
| @@ -124,7 +124,7 @@ func OrderList(header *protocol.RequestHeader, request *protocol.DividendOrdersR | @@ -124,7 +124,7 @@ func OrderList(header *protocol.RequestHeader, request *protocol.DividendOrdersR | ||
| 124 | rsp = &protocol.DividendOrdersResponse{List: make([]*protocol.DividendOrderListItem, 0)} | 124 | rsp = &protocol.DividendOrdersResponse{List: make([]*protocol.DividendOrderListItem, 0)} |
| 125 | 125 | ||
| 126 | count, orders, err = OrderDao.DividendOrders(&domain.DividendOrdersQueryOption{ | 126 | count, orders, err = OrderDao.DividendOrders(&domain.DividendOrdersQueryOption{ |
| 127 | - OrderType: domain.OrderReal, | 127 | + OrderTypes: domain.UserOrderTypes(domain.Career), |
| 128 | PartnerId: header.UserId, | 128 | PartnerId: header.UserId, |
| 129 | DetailAction: request.DetailAction, | 129 | DetailAction: request.DetailAction, |
| 130 | DividendAction: request.DividendAction, | 130 | DividendAction: request.DividendAction, |
pkg/application/factory/message.go
0 → 100644
| 1 | +package factory | ||
| 2 | + | ||
| 3 | +//import ( | ||
| 4 | +// "fmt" | ||
| 5 | +// "github.com/linmadan/egglib-go/core/application" | ||
| 6 | +// "github.com/linmadan/egglib-go/message/publisher/kafkaMessage/sarama" | ||
| 7 | +// localMessagePublisher "github.com/linmadan/egglib-go/message/publisher/local_message" | ||
| 8 | +// localMessageReceiver "github.com/linmadan/egglib-go/message/receiver/local_message" | ||
| 9 | +// "suplus-message/pkg/constant" | ||
| 10 | +// "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log" | ||
| 11 | +//) | ||
| 12 | +// | ||
| 13 | +//func CreateMessagePublisher(options map[string]interface{}) (application.MessagePublisher, error) { | ||
| 14 | +// if localMessageOptions, ok := options["localMessageOptions"]; ok { | ||
| 15 | +// var storeType string | ||
| 16 | +// var storeOptions map[string]interface{} | ||
| 17 | +// if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok { | ||
| 18 | +// storeType = value.(string) | ||
| 19 | +// } else { | ||
| 20 | +// return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeType") | ||
| 21 | +// } | ||
| 22 | +// if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok { | ||
| 23 | +// storeOptions = value.(map[string]interface{}) | ||
| 24 | +// } else { | ||
| 25 | +// return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeOptions") | ||
| 26 | +// } | ||
| 27 | +// return localMessagePublisher.NewLocalMessagePublisher(storeType, storeOptions) | ||
| 28 | +// } else { | ||
| 29 | +// return sarama.NewKafkaSaramaMessagePublisher(constant.KAFKA_HOSTS,log.Logger) | ||
| 30 | +// } | ||
| 31 | +//} | ||
| 32 | +// | ||
| 33 | +//func CreateMessageReceiver(options map[string]interface{}) (application.MessageReceiver, error) { | ||
| 34 | +// if localMessageOptions, ok := options["localMessageOptions"]; ok { | ||
| 35 | +// var converterType string | ||
| 36 | +// if value, ok := localMessageOptions.(map[string]interface{})["converterType"]; ok { | ||
| 37 | +// converterType = value.(string) | ||
| 38 | +// } else { | ||
| 39 | +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数converterType") | ||
| 40 | +// } | ||
| 41 | +// var storeType string | ||
| 42 | +// var storeOptions map[string]interface{} | ||
| 43 | +// if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok { | ||
| 44 | +// storeType = value.(string) | ||
| 45 | +// } else { | ||
| 46 | +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeType") | ||
| 47 | +// } | ||
| 48 | +// if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok { | ||
| 49 | +// storeOptions = value.(map[string]interface{}) | ||
| 50 | +// } else { | ||
| 51 | +// return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeOptions") | ||
| 52 | +// } | ||
| 53 | +// return localMessageReceiver.NewLocalMessageReceiver(converterType, nil, storeType, storeOptions) | ||
| 54 | +// } else { | ||
| 55 | +// return nil, fmt.Errorf("缺少参数localMessageOptions") | ||
| 56 | +// } | ||
| 57 | +//} |
| @@ -138,14 +138,14 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic | @@ -138,14 +138,14 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic | ||
| 138 | if rsp.Statistics.TodayRealQuantity, rsp.Statistics.TodayRealMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ | 138 | if rsp.Statistics.TodayRealQuantity, rsp.Statistics.TodayRealMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ |
| 139 | BeginTime: utils.GetDayBegin().Unix() * 1000, | 139 | BeginTime: utils.GetDayBegin().Unix() * 1000, |
| 140 | EndTime: utils.GetDayEnd().Unix() * 1000, | 140 | EndTime: utils.GetDayEnd().Unix() * 1000, |
| 141 | - OrderType: domain.OrderReal, | 141 | + OrderTypes: domain.UserOrderTypes(domain.Career), |
| 142 | PartnerId: header.UserId, | 142 | PartnerId: header.UserId, |
| 143 | }); err != nil { | 143 | }); err != nil { |
| 144 | return | 144 | return |
| 145 | } | 145 | } |
| 146 | if rsp.Statistics.CumulativeQuantity, rsp.Statistics.CumulativeMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ | 146 | if rsp.Statistics.CumulativeQuantity, rsp.Statistics.CumulativeMoney, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ |
| 147 | EndTime: time.Now().Unix() * 1000, | 147 | EndTime: time.Now().Unix() * 1000, |
| 148 | - OrderType: domain.OrderReal, | 148 | + OrderTypes: domain.UserOrderTypes(domain.Career), |
| 149 | PartnerId: header.UserId, | 149 | PartnerId: header.UserId, |
| 150 | }); err != nil { | 150 | }); err != nil { |
| 151 | return | 151 | return |
| @@ -155,7 +155,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic | @@ -155,7 +155,7 @@ func Statistics(header *protocol.RequestHeader, request *protocol.OrderStatistic | ||
| 155 | 155 | ||
| 156 | //事业分红/业务分红 | 156 | //事业分红/业务分红 |
| 157 | var careerBonus, businessBonus, total float64 //,developBonus,appBonus | 157 | var careerBonus, businessBonus, total float64 //,developBonus,appBonus |
| 158 | - if bonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: header.UserId, OrderType: domain.OrderReal}); e == nil { | 158 | + if bonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: header.UserId, OrderTypes: domain.UserOrderTypes(domain.Career)}); e == nil { |
| 159 | careerBonus = bonus.Bonus | 159 | careerBonus = bonus.Bonus |
| 160 | total += careerBonus | 160 | total += careerBonus |
| 161 | } | 161 | } |
| @@ -207,6 +207,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r | @@ -207,6 +207,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r | ||
| 207 | queryOption.EndTime = time.Unix(request.EndTime/1000, 0) | 207 | queryOption.EndTime = time.Unix(request.EndTime/1000, 0) |
| 208 | } | 208 | } |
| 209 | queryOption.OrderType = request.OrderType | 209 | queryOption.OrderType = request.OrderType |
| 210 | + queryOption.OrderTypes = request.OrderTypes | ||
| 210 | total, orders, _ = OrderResponsitory.Find(utils.ObjectJsonToMap(queryOption)) | 211 | total, orders, _ = OrderResponsitory.Find(utils.ObjectJsonToMap(queryOption)) |
| 211 | if len(orders) != 0 { | 212 | if len(orders) != 0 { |
| 212 | for i := range orders { | 213 | for i := range orders { |
| @@ -222,7 +223,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r | @@ -222,7 +223,7 @@ func List(header *protocol.RequestHeader, request *protocol.OrderListRequest) (r | ||
| 222 | //累计实发订单 | 223 | //累计实发订单 |
| 223 | cumulativeQuantity, _, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ | 224 | cumulativeQuantity, _, err = OrderDao.OrderStatics(&domain.OrderStaticQuery{ |
| 224 | EndTime: time.Now().Unix() * 1000, | 225 | EndTime: time.Now().Unix() * 1000, |
| 225 | - OrderType: domain.OrderReal, | 226 | + OrderTypes: domain.UserOrderTypes(domain.Career), |
| 226 | PartnerId: header.UserId, | 227 | PartnerId: header.UserId, |
| 227 | }) | 228 | }) |
| 228 | rsp.Total = cumulativeQuantity | 229 | rsp.Total = cumulativeQuantity |
| @@ -65,7 +65,7 @@ func getDetail(userId int64, transactionContext *transaction.TransactionContext) | @@ -65,7 +65,7 @@ func getDetail(userId int64, transactionContext *transaction.TransactionContext) | ||
| 65 | } | 65 | } |
| 66 | } | 66 | } |
| 67 | p.CooperateTime = partner.CooperateTime.Unix() * 1000 | 67 | p.CooperateTime = partner.CooperateTime.Unix() * 1000 |
| 68 | - if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: userId, OrderType: domain.OrderReal}); e == nil { | 68 | + if bonus, e := OrderBaseDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: userId, OrderTypes: domain.UserOrderTypes(domain.Career)}); e == nil { |
| 69 | p.CareerOrdersCount = int(bonus.Total) | 69 | p.CareerOrdersCount = int(bonus.Total) |
| 70 | p.CareerOrdersMoney = utils.Decimal(bonus.TotalOrderAmount) | 70 | p.CareerOrdersMoney = utils.Decimal(bonus.TotalOrderAmount) |
| 71 | p.CareerDividend = utils.Decimal(bonus.Bonus) | 71 | p.CareerDividend = utils.Decimal(bonus.Bonus) |
| @@ -6,6 +6,7 @@ import ( | @@ -6,6 +6,7 @@ import ( | ||
| 6 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/factory" | 6 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/factory" |
| 7 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/partnerInfo/query" | 7 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/partnerInfo/query" |
| 8 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/partnerInfo/service" | 8 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/application/partnerInfo/service" |
| 9 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant" | ||
| 9 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | 10 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" |
| 10 | domain_service_i "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain/service" | 11 | domain_service_i "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain/service" |
| 11 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/domain_service" | 12 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/domain_service" |
| @@ -13,6 +14,7 @@ import ( | @@ -13,6 +14,7 @@ import ( | ||
| 13 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log" | 14 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log" |
| 14 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol" | 15 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol" |
| 15 | protocolx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/auth" | 16 | protocolx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/auth" |
| 17 | + userx "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/protocol/user" | ||
| 16 | "strconv" | 18 | "strconv" |
| 17 | "strings" | 19 | "strings" |
| 18 | ) | 20 | ) |
| @@ -82,10 +84,6 @@ func UserInfo(header *protocol.RequestHeader, request *protocol.UserInfoRequest) | @@ -82,10 +84,6 @@ func UserInfo(header *protocol.RequestHeader, request *protocol.UserInfoRequest) | ||
| 82 | Name: company.Name, | 84 | Name: company.Name, |
| 83 | Phone: company.Phone, | 85 | Phone: company.Phone, |
| 84 | }, | 86 | }, |
| 85 | - //JoinWay: partnerInfo.PartnerCategoryInfo(), | ||
| 86 | - //District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName}, | ||
| 87 | - //SerialNo: partnerInfo.Id, | ||
| 88 | - //CooperateTime: partnerInfo.CooperateTime.Unix() * 1000, | ||
| 89 | } | 87 | } |
| 90 | } | 88 | } |
| 91 | switch header.AdminType { | 89 | switch header.AdminType { |
| @@ -205,28 +203,6 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | @@ -205,28 +203,6 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | ||
| 205 | transactionContext.RollbackTransaction() | 203 | transactionContext.RollbackTransaction() |
| 206 | }() | 204 | }() |
| 207 | rsp = &protocol.UserInfoResponse{} | 205 | rsp = &protocol.UserInfoResponse{} |
| 208 | - | ||
| 209 | - type xcompany struct { | ||
| 210 | - Id int64 `json:"id"` | ||
| 211 | - Name string `json:"name"` | ||
| 212 | - Phone string `json:"phone"` | ||
| 213 | - //合作区域 | ||
| 214 | - District interface{} `json:"district"` | ||
| 215 | - //合作编码 | ||
| 216 | - SerialNo int64 `json:"serialNo"` | ||
| 217 | - //合作时间 | ||
| 218 | - CooperateTime int64 `json:"cooperationTime"` | ||
| 219 | - Salesman interface{} `json:"salesman"` | ||
| 220 | - } | ||
| 221 | - type xuser struct { | ||
| 222 | - Id int64 `json:"uid"` | ||
| 223 | - //用户名称 | ||
| 224 | - PartnerName string `json:"uname"` | ||
| 225 | - //手机号 | ||
| 226 | - Phone string `json:"phone"` | ||
| 227 | - //合作公司 | ||
| 228 | - CooperateCompany xcompany `json:"company"` | ||
| 229 | - } | ||
| 230 | rspMap := make(map[string]interface{}) | 206 | rspMap := make(map[string]interface{}) |
| 231 | funcPartnerInfo := func() { | 207 | funcPartnerInfo := func() { |
| 232 | if partnerInfo, err = PartnerInfoService.FindOne(map[string]interface{}{"id": header.UserId}); err != nil { | 208 | if partnerInfo, err = PartnerInfoService.FindOne(map[string]interface{}{"id": header.UserId}); err != nil { |
| @@ -236,18 +212,29 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | @@ -236,18 +212,29 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | ||
| 236 | if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { | 212 | if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { |
| 237 | return | 213 | return |
| 238 | } | 214 | } |
| 239 | - | ||
| 240 | - u := xuser{ | 215 | + var miniProgram map[string]interface{} |
| 216 | + if len(company.Applets) > 0 { | ||
| 217 | + if company.Applets[0].Valid() { | ||
| 218 | + miniProgram = make(map[string]interface{}) | ||
| 219 | + miniProgram["webpageUrl"] = "www.baidu.com" | ||
| 220 | + miniProgram["userName"] = company.Applets[0].Id | ||
| 221 | + miniProgram["path"] = fmt.Sprintf("%v?inviter_id=%v&company_id=%v", company.Applets[0].URL, partnerInfo.Id, partnerInfo.CompanyId) | ||
| 222 | + miniProgram["hdImageUrl"] = constant.SHARE_SHOP_PREVIEW_IMADGE | ||
| 223 | + //miniProgram["miniprogramType"] = constant.WEHAT_MINI_PROGRAM_VERSION_TYPE | ||
| 224 | + miniProgram["title"] = company.Applets[0].Name | ||
| 225 | + } | ||
| 226 | + } | ||
| 227 | + u := userx.User{ | ||
| 241 | Id: partnerInfo.Id, | 228 | Id: partnerInfo.Id, |
| 242 | PartnerName: partnerInfo.PartnerName, | 229 | PartnerName: partnerInfo.PartnerName, |
| 243 | Phone: partnerInfo.Account, | 230 | Phone: partnerInfo.Account, |
| 244 | - CooperateCompany: xcompany{ | 231 | + CooperateCompany: userx.Company{ |
| 245 | Id: company.Id, | 232 | Id: company.Id, |
| 246 | Name: company.Name, | 233 | Name: company.Name, |
| 247 | Phone: company.Phone, | 234 | Phone: company.Phone, |
| 248 | SerialNo: partnerInfo.Id, | 235 | SerialNo: partnerInfo.Id, |
| 249 | CooperateTime: partnerInfo.CooperateTime.Unix() * 1000, | 236 | CooperateTime: partnerInfo.CooperateTime.Unix() * 1000, |
| 250 | - //JoinWay: partnerInfo.PartnerCategoryInfo(), | 237 | + MiniProgram: miniProgram, |
| 251 | District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName}, | 238 | District: map[string]interface{}{"id": partnerInfo.RegionInfo.RegionId, "name": partnerInfo.RegionInfo.RegionName}, |
| 252 | }, | 239 | }, |
| 253 | } | 240 | } |
| @@ -267,14 +254,15 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | @@ -267,14 +254,15 @@ func UserInfoV2(header *protocol.RequestHeader, request *protocol.UserInfoReques | ||
| 267 | if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { | 254 | if company, err = CompanyResponsitory.FindOne(map[string]interface{}{"id": header.CompanyId}); err != nil { |
| 268 | return | 255 | return |
| 269 | } | 256 | } |
| 270 | - rspMap["user"] = xuser{ | 257 | + rspMap["user"] = userx.User{ |
| 271 | Id: user.Id, | 258 | Id: user.Id, |
| 272 | PartnerName: user.Name, | 259 | PartnerName: user.Name, |
| 273 | Phone: user.Phone, | 260 | Phone: user.Phone, |
| 274 | - CooperateCompany: xcompany{ | 261 | + CooperateCompany: userx.Company{ |
| 275 | Id: company.Id, | 262 | Id: company.Id, |
| 276 | Name: company.Name, | 263 | Name: company.Name, |
| 277 | Phone: company.Phone, | 264 | Phone: company.Phone, |
| 265 | + MiniProgram: nil, | ||
| 278 | }, | 266 | }, |
| 279 | } | 267 | } |
| 280 | rsp = rspMap | 268 | rsp = rspMap |
| @@ -19,6 +19,8 @@ var BUSINESS_ADMIN_PLATFORM_ID = "25" //合伙人模块 | @@ -19,6 +19,8 @@ var BUSINESS_ADMIN_PLATFORM_ID = "25" //合伙人模块 | ||
| 19 | 19 | ||
| 20 | var DEFAULT_GUEST_COMPANY int = 10011 | 20 | var DEFAULT_GUEST_COMPANY int = 10011 |
| 21 | 21 | ||
| 22 | +var SHARE_SHOP_PREVIEW_IMADGE = "https://media.fjmaimaimai.com/image/default/F6ABD8ECDB564582BFFACA053F330005-6-2.jpg" //分享店铺预览图 | ||
| 23 | +var WEHAT_MINI_PROGRAM_VERSION_TYPE = 2 //0:正式版 1:开发版 2:体验版 | ||
| 22 | func init() { | 24 | func init() { |
| 23 | if os.Getenv("LOG_LEVEL") != "" { | 25 | if os.Getenv("LOG_LEVEL") != "" { |
| 24 | LOG_LEVEL = os.Getenv("LOG_LEVEL") | 26 | LOG_LEVEL = os.Getenv("LOG_LEVEL") |
pkg/constant/kafka.go
0 → 100644
pkg/domain/company_applets.go
0 → 100644
| 1 | +package domain | ||
| 2 | + | ||
| 3 | +type CompanyApplets struct { | ||
| 4 | + Name string `json:"name"` | ||
| 5 | + URL string `json:"url"` | ||
| 6 | + Id string `json:"id"` | ||
| 7 | +} | ||
| 8 | + | ||
| 9 | +func (applets CompanyApplets) Valid() bool { | ||
| 10 | + if len(applets.Name) == 0 { | ||
| 11 | + return false | ||
| 12 | + } | ||
| 13 | + if len(applets.URL) == 0 { | ||
| 14 | + return false | ||
| 15 | + } | ||
| 16 | + if len(applets.Id) == 0 { | ||
| 17 | + return false | ||
| 18 | + } | ||
| 19 | + return true | ||
| 20 | +} |
| @@ -28,6 +28,8 @@ type Company struct { | @@ -28,6 +28,8 @@ type Company struct { | ||
| 28 | DeleteAt time.Time `json:"deleteAt"` | 28 | DeleteAt time.Time `json:"deleteAt"` |
| 29 | // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 | 29 | // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 |
| 30 | Enable int8 `json:"enable"` | 30 | Enable int8 `json:"enable"` |
| 31 | + // 小程序 | ||
| 32 | + Applets []CompanyApplets `json:"applets"` | ||
| 31 | } | 33 | } |
| 32 | 34 | ||
| 33 | type CompanyRepository interface { | 35 | type CompanyRepository interface { |
| @@ -11,6 +11,7 @@ const ( | @@ -11,6 +11,7 @@ const ( | ||
| 11 | const ( | 11 | const ( |
| 12 | OrderReal = iota + 1 //实发订单 | 12 | OrderReal = iota + 1 //实发订单 |
| 13 | OrderIntention //意向订单 | 13 | OrderIntention //意向订单 |
| 14 | + OrderAppletSeafood //小程序-海鲜干货 | ||
| 14 | ) | 15 | ) |
| 15 | 16 | ||
| 16 | const ( | 17 | const ( |
| @@ -21,3 +22,10 @@ const ( | @@ -21,3 +22,10 @@ const ( | ||
| 21 | var ( | 22 | var ( |
| 22 | QueryNoRow = fmt.Errorf("not row found") | 23 | QueryNoRow = fmt.Errorf("not row found") |
| 23 | ) | 24 | ) |
| 25 | + | ||
| 26 | +// UserOrderTypes | ||
| 27 | +// @category 合伙人类型 | ||
| 28 | +// @desc 根据用户类别获取用户能看到的订单类型 | ||
| 29 | +func UserOrderTypes(category int) []int { | ||
| 30 | + return []int{OrderReal, OrderAppletSeafood} | ||
| 31 | +} |
| @@ -59,6 +59,10 @@ type OrderBase struct { | @@ -59,6 +59,10 @@ type OrderBase struct { | ||
| 59 | BonusStatus int8 | 59 | BonusStatus int8 |
| 60 | //货物列表 | 60 | //货物列表 |
| 61 | OrderGood []*OrderGood | 61 | OrderGood []*OrderGood |
| 62 | + // 数据来源 | ||
| 63 | + DataFrom *OrderDataFrom | ||
| 64 | + // 备注 | ||
| 65 | + Remark string | ||
| 62 | } | 66 | } |
| 63 | 67 | ||
| 64 | func (m *OrderBase) Identify() interface{} { | 68 | func (m *OrderBase) Identify() interface{} { |
| @@ -141,6 +145,7 @@ func (m *OrderBase) OrderBonusStatic() *OrderStatics { | @@ -141,6 +145,7 @@ func (m *OrderBase) OrderBonusStatic() *OrderStatics { | ||
| 141 | type OrderQueryOption struct { | 145 | type OrderQueryOption struct { |
| 142 | PartnerId int64 `json:"partnerId,omitempty"` | 146 | PartnerId int64 `json:"partnerId,omitempty"` |
| 143 | OrderType int `json:"orderType,omitempty"` | 147 | OrderType int `json:"orderType,omitempty"` |
| 148 | + OrderTypes []int `json:"orderTypes,omitempty"` | ||
| 144 | OrderStatus int `json:"orderStatus,omitempty"` | 149 | OrderStatus int `json:"orderStatus,omitempty"` |
| 145 | BeginTime time.Time `json:"beginTime,omitempty"` | 150 | BeginTime time.Time `json:"beginTime,omitempty"` |
| 146 | EndTime time.Time `json:"endTime,omitempty"` | 151 | EndTime time.Time `json:"endTime,omitempty"` |
| @@ -154,6 +159,7 @@ type OrderQueryOption struct { | @@ -154,6 +159,7 @@ type OrderQueryOption struct { | ||
| 154 | type DividendOrdersQueryOption struct { | 159 | type DividendOrdersQueryOption struct { |
| 155 | PartnerId int64 `json:"partnerId"` | 160 | PartnerId int64 `json:"partnerId"` |
| 156 | OrderType int `json:"orderType"` //订单类型 | 161 | OrderType int `json:"orderType"` //订单类型 |
| 162 | + OrderTypes []int `json:"orderTypes,omitempty"` | ||
| 157 | DetailAction int `json:"detailAction"` //明细类型(0已收明细、1未收明细) | 163 | DetailAction int `json:"detailAction"` //明细类型(0已收明细、1未收明细) |
| 158 | DividendAction int `json:"dividendAction"` //分红类型(0累计分红、1分红支出) | 164 | DividendAction int `json:"dividendAction"` //分红类型(0累计分红、1分红支出) |
| 159 | IsDisable string `json:"isDisable,omitempty"` | 165 | IsDisable string `json:"isDisable,omitempty"` |
pkg/domain/order_data_from.go
0 → 100644
| @@ -36,6 +36,10 @@ type OrderGood struct { | @@ -36,6 +36,10 @@ type OrderGood struct { | ||
| 36 | BonusStatus int | 36 | BonusStatus int |
| 37 | //备注信息 | 37 | //备注信息 |
| 38 | Remark string | 38 | Remark string |
| 39 | + // 数据来源 | ||
| 40 | + DataFrom *OrderDataFrom | ||
| 41 | + // 备注原因 | ||
| 42 | + RemarkReason string | ||
| 39 | } | 43 | } |
| 40 | 44 | ||
| 41 | func (g *OrderGood) Status() GoodStatus { | 45 | func (g *OrderGood) Status() GoodStatus { |
| @@ -11,6 +11,7 @@ type OrderStaticQuery struct { | @@ -11,6 +11,7 @@ type OrderStaticQuery struct { | ||
| 11 | EndTime int64 `json:"endTime,omitempty"` | 11 | EndTime int64 `json:"endTime,omitempty"` |
| 12 | OrderStatus int `json:"orderStatus,omitempty"` | 12 | OrderStatus int `json:"orderStatus,omitempty"` |
| 13 | OrderType int `json:"orderType,omitempty"` | 13 | OrderType int `json:"orderType,omitempty"` |
| 14 | + OrderTypes []int `json:"orderTypes,omitempty"` | ||
| 14 | //IsDisable int `json:"isDisable,omitempty"` | 15 | //IsDisable int `json:"isDisable,omitempty"` |
| 15 | } | 16 | } |
| 16 | 17 | ||
| @@ -28,6 +29,7 @@ type OrderBonusQuery struct { | @@ -28,6 +29,7 @@ type OrderBonusQuery struct { | ||
| 28 | InPartnerIds []int64 `json:"inPartnerIds,omitempty"` | 29 | InPartnerIds []int64 `json:"inPartnerIds,omitempty"` |
| 29 | IsDisable int `json:"isDisable,omitempty"` | 30 | IsDisable int `json:"isDisable,omitempty"` |
| 30 | OrderType int `json:"orderType,omitempty"` | 31 | OrderType int `json:"orderType,omitempty"` |
| 32 | + OrderTypes []int `json:"orderTypes,omitempty"` | ||
| 31 | } | 33 | } |
| 32 | 34 | ||
| 33 | // 订单分红统计-应答 | 35 | // 订单分红统计-应答 |
pkg/domain/sys_message_consume.go
0 → 100644
| 1 | +package domain | ||
| 2 | + | ||
| 3 | +// SysMessageConsume | ||
| 4 | +type SysMessageConsume struct { | ||
| 5 | + // 消息ID | ||
| 6 | + Id int64 `json:"id"` | ||
| 7 | + // 主题 | ||
| 8 | + Topic string `json:"topic"` | ||
| 9 | + // 分区信息 | ||
| 10 | + Partition int `json:"partition"` | ||
| 11 | + // 消息偏移序号 | ||
| 12 | + Offset int64 `json:"offset"` | ||
| 13 | + // 键值 | ||
| 14 | + Key string `json:"key"` | ||
| 15 | + // 消息内容 | ||
| 16 | + Value string `json:"value"` | ||
| 17 | + // 消息时间 | ||
| 18 | + MsgTime int64 `json:"msgTime"` | ||
| 19 | + // 创建时间 | ||
| 20 | + CreateAt int64 `json:"createAt"` | ||
| 21 | + // 状态 | ||
| 22 | + Status int64 `json:"status"` | ||
| 23 | +} | ||
| 24 | + | ||
| 25 | +type SysMessageConsumeRepository interface { | ||
| 26 | + Save(dm *SysMessageConsume) (*SysMessageConsume, error) | ||
| 27 | + Remove(dm *SysMessageConsume) (*SysMessageConsume, error) | ||
| 28 | + FindOne(queryOptions map[string]interface{}) (*SysMessageConsume, error) | ||
| 29 | + Find(queryOptions map[string]interface{}) (int64, []*SysMessageConsume, error) | ||
| 30 | +} | ||
| 31 | + | ||
| 32 | +func (m *SysMessageConsume) Identify() interface{} { | ||
| 33 | + if m.Id == 0 { | ||
| 34 | + return nil | ||
| 35 | + } | ||
| 36 | + return m.Id | ||
| 37 | +} |
pkg/domain/sys_message_produce.go
0 → 100644
| 1 | +package domain | ||
| 2 | + | ||
| 3 | +// SysMessageProduce | ||
| 4 | +type SysMessageProduce struct { | ||
| 5 | + // 消息ID | ||
| 6 | + Id int64 `json:"id"` | ||
| 7 | + // 主题 | ||
| 8 | + Topic string `json:"topic"` | ||
| 9 | + // 分区信息 | ||
| 10 | + Partition int `json:"partition"` | ||
| 11 | + // 消息内容 | ||
| 12 | + Value string `json:"value"` | ||
| 13 | + // 消息时间 | ||
| 14 | + MsgTime int64 `json:"msgTime"` | ||
| 15 | + // 状态 | ||
| 16 | + Status int64 `json:"status"` | ||
| 17 | +} | ||
| 18 | + | ||
| 19 | +type SysMessageProduceRepository interface { | ||
| 20 | + Save(dm *SysMessageProduce) (*SysMessageProduce, error) | ||
| 21 | + Remove(dm *SysMessageProduce) (*SysMessageProduce, error) | ||
| 22 | + FindOne(queryOptions map[string]interface{}) (*SysMessageProduce, error) | ||
| 23 | + Find(queryOptions map[string]interface{}) (int64, []*SysMessageProduce, error) | ||
| 24 | +} | ||
| 25 | + | ||
| 26 | +func (m *SysMessageProduce) Identify() interface{} { | ||
| 27 | + if m.Id == 0 { | ||
| 28 | + return nil | ||
| 29 | + } | ||
| 30 | + return m.Id | ||
| 31 | +} |
| @@ -30,6 +30,9 @@ func (dao *OrderBaseDao) OrderStatics(option *domain.OrderStaticQuery) (count in | @@ -30,6 +30,9 @@ func (dao *OrderBaseDao) OrderStatics(option *domain.OrderStaticQuery) (count in | ||
| 30 | if option.OrderType > 0 { | 30 | if option.OrderType > 0 { |
| 31 | q.Where(`"order_base".order_type =?`, option.OrderType) | 31 | q.Where(`"order_base".order_type =?`, option.OrderType) |
| 32 | } | 32 | } |
| 33 | + if len(option.OrderTypes) > 0 { | ||
| 34 | + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes)) | ||
| 35 | + } | ||
| 33 | if option.BeginTime > 0 { | 36 | if option.BeginTime > 0 { |
| 34 | q.Where(`"order_base".create_time >=?`, time.Unix(option.BeginTime/1000, 0)) | 37 | q.Where(`"order_base".create_time >=?`, time.Unix(option.BeginTime/1000, 0)) |
| 35 | } | 38 | } |
| @@ -68,6 +71,9 @@ func (dao *OrderBaseDao) OrderBonusStatics(option domain.OrderBonusQuery) (rsp d | @@ -68,6 +71,9 @@ func (dao *OrderBaseDao) OrderBonusStatics(option domain.OrderBonusQuery) (rsp d | ||
| 68 | if option.OrderType > 0 { | 71 | if option.OrderType > 0 { |
| 69 | q.Where(`"order_base".order_type =?`, option.OrderType) | 72 | q.Where(`"order_base".order_type =?`, option.OrderType) |
| 70 | } | 73 | } |
| 74 | + if len(option.OrderTypes) > 0 { | ||
| 75 | + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes)) | ||
| 76 | + } | ||
| 71 | err = q.Select(&rsp.Total, &rsp.Bonus, &rsp.BonusExpense, &rsp.TotalOrderAmount) | 77 | err = q.Select(&rsp.Total, &rsp.Bonus, &rsp.BonusExpense, &rsp.TotalOrderAmount) |
| 72 | return | 78 | return |
| 73 | } | 79 | } |
| @@ -81,6 +87,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption | @@ -81,6 +87,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption | ||
| 81 | if option.OrderType > 0 { | 87 | if option.OrderType > 0 { |
| 82 | q.Where(`"order_base".order_type=?`, option.OrderType) | 88 | q.Where(`"order_base".order_type=?`, option.OrderType) |
| 83 | } | 89 | } |
| 90 | + if len(option.OrderTypes) > 0 { | ||
| 91 | + q.Where(`"order_base".order_type in (?)`, pg.In(option.OrderTypes)) | ||
| 92 | + } | ||
| 84 | if option.PartnerId > 0 { | 93 | if option.PartnerId > 0 { |
| 85 | q.Where(`"order_base".partner_id=?`, option.PartnerId) | 94 | q.Where(`"order_base".partner_id=?`, option.PartnerId) |
| 86 | } | 95 | } |
| @@ -96,9 +105,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption | @@ -96,9 +105,9 @@ func (dao *OrderBaseDao) DividendOrders(option *domain.DividendOrdersQueryOption | ||
| 96 | //} | 105 | //} |
| 97 | if option.DividendAction == 0 { //累计分红 | 106 | if option.DividendAction == 0 { //累计分红 |
| 98 | if option.DetailAction == 0 { //已收明细 | 107 | if option.DetailAction == 0 { //已收明细 |
| 99 | - q.Where(`"order_base".partner_bonus_has>0`) | ||
| 100 | - } else if option.DetailAction == 1 { //未收明细 //实际金额>已支付金额 | ||
| 101 | - q.Where(`"order_base".partner_bonus_not>0`) | 108 | + q.Where(`(partner_bonus_has>0 or (partner_bonus_has=0 and coalesce(partner_bonus_not,0)=0 and bonus_status=2)) `) |
| 109 | + } else if option.DetailAction == 1 { //未收明细 //实际金额>已支付金额 如果应收分红为0,根据支付状态判断是已收(2)还是未收(1) | ||
| 110 | + q.Where(`(partner_bonus_not>0 or (partner_bonus_has=0 and coalesce(partner_bonus_not,0)=0 and bonus_status=1))`) | ||
| 102 | } | 111 | } |
| 103 | } else if option.DividendAction == 1 { //分红支出 | 112 | } else if option.DividendAction == 1 { //分红支出 |
| 104 | q.Where(`"order_base".partner_bonus_expense>0`) | 113 | q.Where(`"order_base".partner_bonus_expense>0`) |
| @@ -112,7 +112,7 @@ A left join | @@ -112,7 +112,7 @@ A left join | ||
| 112 | plan_order_amount amount, | 112 | plan_order_amount amount, |
| 113 | (case when use_order_count>=0 then use_partner_bonus else plan_partner_bonus end) bonus, | 113 | (case when use_order_count>=0 then use_partner_bonus else plan_partner_bonus end) bonus, |
| 114 | partner_bonus_expense bonus_expense FROM "order_base" AS "order_base" | 114 | partner_bonus_expense bonus_expense FROM "order_base" AS "order_base" |
| 115 | - WHERE (partner_id in (?)) and order_type =1 | 115 | + WHERE (partner_id in (?)) and (order_type in (?)) |
| 116 | UNION ALL | 116 | UNION ALL |
| 117 | SELECT partner_info_id partner_id, | 117 | SELECT partner_info_id partner_id, |
| 118 | 0 amount, bonus bonus, bonus_expense bonus_expense FROM business_bonus | 118 | 0 amount, bonus bonus, bonus_expense bonus_expense FROM business_bonus |
| @@ -129,7 +129,7 @@ GROUP BY partner_id | @@ -129,7 +129,7 @@ GROUP BY partner_id | ||
| 129 | sql.WriteString(fmt.Sprintf(" \nOFFSET %v", offset)) | 129 | sql.WriteString(fmt.Sprintf(" \nOFFSET %v", offset)) |
| 130 | } | 130 | } |
| 131 | } | 131 | } |
| 132 | - _, err = tx.Query(&statics, sql.String(), pg.In(partnerIds), pg.In(partnerIds), pg.In(partnerIds)) | 132 | + _, err = tx.Query(&statics, sql.String(), pg.In(partnerIds), pg.In(partnerIds), pg.In(domain.UserOrderTypes(domain.Career)), pg.In(partnerIds)) |
| 133 | return | 133 | return |
| 134 | } | 134 | } |
| 135 | 135 |
| @@ -104,7 +104,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { | @@ -104,7 +104,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { | ||
| 104 | if len(companyList) == 0 { | 104 | if len(companyList) == 0 { |
| 105 | return response, nil | 105 | return response, nil |
| 106 | } | 106 | } |
| 107 | - totalBonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: doGetPartnerIds(), OrderType: domain.OrderReal}) | 107 | + totalBonus, e := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{InPartnerIds: doGetPartnerIds(), OrderTypes: domain.UserOrderTypes(domain.Career)}) |
| 108 | if e != nil { | 108 | if e != nil { |
| 109 | return response, e | 109 | return response, e |
| 110 | } | 110 | } |
| @@ -153,7 +153,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { | @@ -153,7 +153,7 @@ func (svr *PgLoginService) PartnerStaticInfo() (interface{}, error) { | ||
| 153 | continue | 153 | continue |
| 154 | } | 154 | } |
| 155 | 155 | ||
| 156 | - bonus, _ := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: partner.Id, OrderType: domain.OrderReal}) | 156 | + bonus, _ := OrderDao.OrderBonusStatics(domain.OrderBonusQuery{PartnerId: partner.Id, OrderTypes: domain.UserOrderTypes(domain.Career)}) |
| 157 | if v, ok := mapPartnerBussinessBonus[partner.Id]; ok { | 157 | if v, ok := mapPartnerBussinessBonus[partner.Id]; ok { |
| 158 | bonus.Bonus += v.Bonus | 158 | bonus.Bonus += v.Bonus |
| 159 | } | 159 | } |
| @@ -196,7 +196,7 @@ func (svr *PgLoginService) ManagerStaticInfo() (interface{}, error) { | @@ -196,7 +196,7 @@ func (svr *PgLoginService) ManagerStaticInfo() (interface{}, error) { | ||
| 196 | for i := range companyList { | 196 | for i := range companyList { |
| 197 | c := companyList[i] | 197 | c := companyList[i] |
| 198 | 198 | ||
| 199 | - if constant.POSTGRESQL_DB_NAME != "partner_dev1" { | 199 | + if constant.POSTGRESQL_DB_NAME != "partner_dev" { |
| 200 | //通过企业平台 校验模块权限 | 200 | //通过企业平台 校验模块权限 |
| 201 | var user *domain.Users | 201 | var user *domain.Users |
| 202 | for j := range svr.Users { | 202 | for j := range svr.Users { |
pkg/infrastructure/message/consumer.go
0 → 100644
| 1 | +package message | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax" | ||
| 5 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 6 | +) | ||
| 7 | + | ||
| 8 | +//新消费者-消费组 | ||
| 9 | +func NewConsumer(kafkaHosts string, groupId string) models.Consumer { | ||
| 10 | + return kafkax.NewSaramaConsumer(kafkaHosts, groupId) | ||
| 11 | +} |
pkg/infrastructure/message/consumer_test.go
0 → 100644
| 1 | +package message | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "fmt" | ||
| 5 | + "github.com/Shopify/sarama" | ||
| 6 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 8 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg" | ||
| 9 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction" | ||
| 10 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils" | ||
| 11 | + "log" | ||
| 12 | + "testing" | ||
| 13 | +) | ||
| 14 | + | ||
| 15 | +// 消息持久化 | ||
| 16 | +func TestNewConsumer(t *testing.T) { | ||
| 17 | + consumer := NewConsumer(constant.KAFKA_HOSTS, "0") | ||
| 18 | + consumer.WithMessageReceiver(NewPgMessageReceiverRepository(transaction.NewPGTransactionContext(pg.DB))) | ||
| 19 | + consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error { | ||
| 20 | + m, ok := message.(*sarama.Message) | ||
| 21 | + if !ok { | ||
| 22 | + return nil | ||
| 23 | + } | ||
| 24 | + if len(m.Value) > 0 { | ||
| 25 | + var msg models.Message | ||
| 26 | + utils.JsonUnmarshal(string(m.Value), &msg) | ||
| 27 | + t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value) | ||
| 28 | + } | ||
| 29 | + return nil | ||
| 30 | + }) | ||
| 31 | + consumer.StartConsume() | ||
| 32 | +} | ||
| 33 | + | ||
| 34 | +// 消息不需要持久化 | ||
| 35 | +func TestNewConsumerNoRepository(t *testing.T) { | ||
| 36 | + consumer := NewConsumer(constant.KAFKA_HOSTS, "0") | ||
| 37 | + consumer.WithTopicHandler("mmm_xcx_orders", func(message interface{}) error { | ||
| 38 | + m, ok := message.(*sarama.Message) | ||
| 39 | + if !ok { | ||
| 40 | + return nil | ||
| 41 | + } | ||
| 42 | + if len(m.Value) > 0 { | ||
| 43 | + var msg models.Message | ||
| 44 | + utils.JsonUnmarshal(string(m.Value), &msg) | ||
| 45 | + t.Log("handler message :", string(m.Value), msg.Id, msg.Topic, msg.Value) | ||
| 46 | + } | ||
| 47 | + return nil | ||
| 48 | + }) | ||
| 49 | + consumer.StartConsume() | ||
| 50 | +} | ||
| 51 | + | ||
| 52 | +type PgMessageReceiverRepository struct { | ||
| 53 | + transactionContext *transaction.TransactionContext | ||
| 54 | +} | ||
| 55 | + | ||
| 56 | +func NewPgMessageReceiverRepository(transactionContext *transaction.TransactionContext) *PgMessageReceiverRepository { | ||
| 57 | + return &PgMessageReceiverRepository{ | ||
| 58 | + transactionContext: transactionContext, | ||
| 59 | + } | ||
| 60 | +} | ||
| 61 | +func (repository *PgMessageReceiverRepository) ReceiveMessage(params map[string]interface{}) error { | ||
| 62 | + var num int | ||
| 63 | + checkSql := `select count(0) from sys_message_consume where "offset" =? and topic=?` | ||
| 64 | + _, err := repository.transactionContext.PgDd.Query(&num, checkSql, params["offset"], params["topic"]) | ||
| 65 | + if err != nil { | ||
| 66 | + return err | ||
| 67 | + } | ||
| 68 | + if num > 0 { | ||
| 69 | + return fmt.Errorf("receive repeate message [%v]", params) | ||
| 70 | + } | ||
| 71 | + | ||
| 72 | + sql := `insert into sys_message_consume(topic,partition,"offset",key,value,msg_time,create_at,status)values(?,?,?,?,?,?,?,?)` | ||
| 73 | + _, err = repository.transactionContext.PgDd.Exec(sql, params["topic"], params["partition"], params["offset"], params["key"], params["value"], params["msg_time"], params["create_at"], params["status"]) | ||
| 74 | + return err | ||
| 75 | +} | ||
| 76 | +func (repository *PgMessageReceiverRepository) ConfirmReceive(params map[string]interface{}) error { | ||
| 77 | + log.Println(params) | ||
| 78 | + _, err := repository.transactionContext.PgDd.Exec(`update sys_message_consume set status=? where "offset" =? and topic=?`, int(models.Finished), params["offset"], params["topic"]) | ||
| 79 | + return err | ||
| 80 | +} |
| 1 | +package kafkax | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "context" | ||
| 5 | + "fmt" | ||
| 6 | + "github.com/Shopify/sarama" | ||
| 7 | + "github.com/tiptok/gocomm/identity/idgen" | ||
| 8 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 9 | + "log" | ||
| 10 | + "strings" | ||
| 11 | + "sync" | ||
| 12 | + "time" | ||
| 13 | +) | ||
| 14 | + | ||
| 15 | +type SaramaConsumer struct { | ||
| 16 | + ready chan bool | ||
| 17 | + messageHandlerMap map[string]func(message interface{}) error | ||
| 18 | + //Logger log.Logger | ||
| 19 | + kafkaHosts string | ||
| 20 | + groupId string | ||
| 21 | + topicMiss map[string]string //记录未被消费的topic | ||
| 22 | + receiver models.MessageReceiverRepository | ||
| 23 | +} | ||
| 24 | + | ||
| 25 | +func (consumer *SaramaConsumer) Setup(sarama.ConsumerGroupSession) error { | ||
| 26 | + close(consumer.ready) | ||
| 27 | + return nil | ||
| 28 | +} | ||
| 29 | +func (consumer *SaramaConsumer) Cleanup(sarama.ConsumerGroupSession) error { | ||
| 30 | + return nil | ||
| 31 | +} | ||
| 32 | +func (consumer *SaramaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { | ||
| 33 | + var err error | ||
| 34 | + for message := range claim.Messages() { | ||
| 35 | + log.Printf("Message claimed: timestamp = %v, topic = %s offset = %v value = %v", message.Timestamp, message.Topic, message.Offset, string(message.Value)) | ||
| 36 | + handler, ok := consumer.messageHandlerMap[message.Topic] | ||
| 37 | + consumer.messageReceiveBefore(message) | ||
| 38 | + if !ok { | ||
| 39 | + continue | ||
| 40 | + } | ||
| 41 | + if err = handler(message); err == nil { | ||
| 42 | + session.MarkMessage(message, "") | ||
| 43 | + } else { | ||
| 44 | + fmt.Println("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err) | ||
| 45 | + } | ||
| 46 | + session.MarkMessage(message, "") | ||
| 47 | + if err != nil { | ||
| 48 | + continue | ||
| 49 | + } | ||
| 50 | + consumer.messageReceiveAfter(message) | ||
| 51 | + } | ||
| 52 | + return err | ||
| 53 | +} | ||
| 54 | + | ||
| 55 | +func (consumer *SaramaConsumer) messageReceiveBefore(message *sarama.ConsumerMessage) { | ||
| 56 | + if consumer.receiver == nil { | ||
| 57 | + return | ||
| 58 | + } | ||
| 59 | + | ||
| 60 | + var params = make(map[string]interface{}) | ||
| 61 | + var err error | ||
| 62 | + _, ok := consumer.messageHandlerMap[message.Topic] | ||
| 63 | + if !ok { | ||
| 64 | + params["status"] = models.Ignore | ||
| 65 | + _, topicMiss := consumer.topicMiss[message.Topic] | ||
| 66 | + if !topicMiss { | ||
| 67 | + fmt.Printf("topic:[%v] has not consumer handler", message.Topic) | ||
| 68 | + } | ||
| 69 | + return | ||
| 70 | + } | ||
| 71 | + | ||
| 72 | + _, err = consumer.storeMessage(params, message) | ||
| 73 | + if err != nil { | ||
| 74 | + log.Println("ConsumeClaim:", err) | ||
| 75 | + } | ||
| 76 | +} | ||
| 77 | +func (consumer *SaramaConsumer) messageReceiveAfter(message *sarama.ConsumerMessage) { | ||
| 78 | + if consumer.receiver == nil { | ||
| 79 | + return | ||
| 80 | + } | ||
| 81 | + consumer.finishMessage(map[string]interface{}{"offset": message.Offset, "topic": message.Topic}) | ||
| 82 | +} | ||
| 83 | + | ||
| 84 | +func (consumer *SaramaConsumer) storeMessage(params map[string]interface{}, message *sarama.ConsumerMessage) (id int64, err error) { | ||
| 85 | + defer func() { | ||
| 86 | + if e := recover(); e != nil { | ||
| 87 | + log.Println(e) | ||
| 88 | + } | ||
| 89 | + }() | ||
| 90 | + id = idgen.Next() | ||
| 91 | + params = make(map[string]interface{}) | ||
| 92 | + params["id"] = message.Offset | ||
| 93 | + params["topic"] = message.Topic | ||
| 94 | + params["partition"] = message.Partition | ||
| 95 | + params["offset"] = message.Offset | ||
| 96 | + params["key"] = string(message.Key) | ||
| 97 | + params["value"] = string(message.Value) | ||
| 98 | + params["msg_time"] = message.Timestamp.Unix() | ||
| 99 | + params["create_at"] = time.Now().Unix() | ||
| 100 | + params["status"] = models.UnFinished //0:未完成 1:已完成 2:未命中 | ||
| 101 | + err = consumer.receiver.ReceiveMessage(params) | ||
| 102 | + return | ||
| 103 | +} | ||
| 104 | +func (consumer *SaramaConsumer) finishMessage(params map[string]interface{}) error { | ||
| 105 | + defer func() { | ||
| 106 | + if e := recover(); e != nil { | ||
| 107 | + log.Println(e) | ||
| 108 | + } | ||
| 109 | + }() | ||
| 110 | + consumer.receiver.ConfirmReceive(params) | ||
| 111 | + return nil | ||
| 112 | +} | ||
| 113 | + | ||
| 114 | +func (consumer *SaramaConsumer) StartConsume() error { | ||
| 115 | + config := sarama.NewConfig() | ||
| 116 | + config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin | ||
| 117 | + config.Consumer.Offsets.Initial = sarama.OffsetNewest | ||
| 118 | + config.Version = sarama.V0_11_0_0 | ||
| 119 | + brokerList := strings.Split(consumer.kafkaHosts, ",") | ||
| 120 | + consumerGroup, err := sarama.NewConsumerGroup(brokerList, consumer.groupId, config) | ||
| 121 | + if err != nil { | ||
| 122 | + return err | ||
| 123 | + } | ||
| 124 | + ctx, cancel := context.WithCancel(context.Background()) | ||
| 125 | + wg := &sync.WaitGroup{} | ||
| 126 | + wg.Add(1) | ||
| 127 | + consumer.ready = make(chan bool) | ||
| 128 | + go func() { | ||
| 129 | + defer wg.Done() | ||
| 130 | + for { | ||
| 131 | + var topics []string | ||
| 132 | + for key := range consumer.messageHandlerMap { | ||
| 133 | + topics = append(topics, key) | ||
| 134 | + } | ||
| 135 | + if err := consumerGroup.Consume(ctx, topics, consumer); err != nil { | ||
| 136 | + log.Println(err.Error()) | ||
| 137 | + return | ||
| 138 | + } | ||
| 139 | + if ctx.Err() != nil { | ||
| 140 | + return | ||
| 141 | + } | ||
| 142 | + } | ||
| 143 | + }() | ||
| 144 | + <-consumer.ready | ||
| 145 | + log.Println("Sarama consumer up and running!...") | ||
| 146 | + select { | ||
| 147 | + case <-ctx.Done(): | ||
| 148 | + log.Println("Sarama consumer : context cancelled") | ||
| 149 | + } | ||
| 150 | + cancel() | ||
| 151 | + wg.Wait() | ||
| 152 | + if err := consumerGroup.Close(); err != nil { | ||
| 153 | + return err | ||
| 154 | + } | ||
| 155 | + return nil | ||
| 156 | +} | ||
| 157 | +func (consumer *SaramaConsumer) WithTopicHandler(topic string, handler func(message interface{}) error) { //*sarama.ConsumerMessage | ||
| 158 | + consumer.messageHandlerMap[topic] = handler | ||
| 159 | +} | ||
| 160 | +func (consumer *SaramaConsumer) WithMessageReceiver(receiver models.MessageReceiverRepository) { | ||
| 161 | + consumer.receiver = receiver | ||
| 162 | +} | ||
| 163 | + | ||
| 164 | +func NewSaramaConsumer(kafkaHosts string, groupId string) models.Consumer { | ||
| 165 | + return &SaramaConsumer{ | ||
| 166 | + kafkaHosts: kafkaHosts, | ||
| 167 | + groupId: groupId, | ||
| 168 | + topicMiss: make(map[string]string), | ||
| 169 | + messageHandlerMap: make(map[string]func(message interface{}) error), | ||
| 170 | + } | ||
| 171 | +} |
| 1 | +package kafkax | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "encoding/json" | ||
| 5 | + "fmt" | ||
| 6 | + "github.com/Shopify/sarama" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 8 | + "log" | ||
| 9 | + "strings" | ||
| 10 | + "time" | ||
| 11 | +) | ||
| 12 | + | ||
| 13 | +// sarame kafka 消息生产 | ||
| 14 | +type KafkaMessageProducer struct { | ||
| 15 | + KafkaHosts string | ||
| 16 | + LogInfo models.LogInfo | ||
| 17 | +} | ||
| 18 | + | ||
| 19 | +// 同步发送 | ||
| 20 | +func (engine *KafkaMessageProducer) Publish(messages []*models.Message, option map[string]interface{}) (*models.MessagePublishResult, error) { | ||
| 21 | + config := sarama.NewConfig() | ||
| 22 | + config.Producer.Return.Successes = true | ||
| 23 | + config.Producer.Return.Errors = true | ||
| 24 | + config.Producer.Partitioner = sarama.NewRandomPartitioner | ||
| 25 | + config.Producer.Retry.Max = 10 | ||
| 26 | + config.Producer.RequiredAcks = sarama.WaitForAll | ||
| 27 | + config.Version = sarama.V0_11_0_0 | ||
| 28 | + brokerList := strings.Split(engine.KafkaHosts, ",") | ||
| 29 | + producer, err := sarama.NewSyncProducer(brokerList, config) | ||
| 30 | + if err != nil { | ||
| 31 | + return nil, err | ||
| 32 | + } | ||
| 33 | + defer func() { | ||
| 34 | + if err := producer.Close(); err != nil { | ||
| 35 | + log.Println(err) | ||
| 36 | + } | ||
| 37 | + }() | ||
| 38 | + var successMessageIds []int64 | ||
| 39 | + var errMessageIds []int64 | ||
| 40 | + for _, message := range messages { | ||
| 41 | + if value, err := json.Marshal(message); err == nil { | ||
| 42 | + msg := &sarama.ProducerMessage{ | ||
| 43 | + Topic: message.Topic, | ||
| 44 | + Value: sarama.StringEncoder(value), | ||
| 45 | + Timestamp: time.Now(), | ||
| 46 | + } | ||
| 47 | + partition, offset, err := producer.SendMessage(msg) | ||
| 48 | + if err != nil { | ||
| 49 | + errMessageIds = append(errMessageIds, message.Id) | ||
| 50 | + log.Println(err) | ||
| 51 | + } else { | ||
| 52 | + successMessageIds = append(successMessageIds, message.Id) | ||
| 53 | + var append = make(map[string]interface{}) | ||
| 54 | + append["topic"] = message.Topic | ||
| 55 | + append["partition"] = partition | ||
| 56 | + append["offset"] = offset | ||
| 57 | + log.Println("kafka消息发送", append) | ||
| 58 | + } | ||
| 59 | + } | ||
| 60 | + } | ||
| 61 | + return &models.MessagePublishResult{SuccessMessageIds: successMessageIds, ErrorMessageIds: errMessageIds}, nil | ||
| 62 | +} | ||
| 63 | + | ||
| 64 | +// 消息调度器 | ||
| 65 | +type MessageDispatcher struct { | ||
| 66 | + notifications chan struct{} | ||
| 67 | + messageChan chan *models.Message | ||
| 68 | + dispatchTicker *time.Ticker | ||
| 69 | + messageRepository models.MessageRepository | ||
| 70 | + producer models.MessageProducer | ||
| 71 | +} | ||
| 72 | + | ||
| 73 | +func (dispatcher *MessageDispatcher) MessagePublishedNotice() error { | ||
| 74 | + time.Sleep(time.Second * 2) | ||
| 75 | + dispatcher.notifications <- struct{}{} | ||
| 76 | + return nil | ||
| 77 | +} | ||
| 78 | + | ||
| 79 | +func (dispatcher *MessageDispatcher) MessagePublish(messages []*models.Message) error { | ||
| 80 | + for i := range messages { | ||
| 81 | + dispatcher.messageChan <- messages[i] | ||
| 82 | + } | ||
| 83 | + return nil | ||
| 84 | +} | ||
| 85 | + | ||
| 86 | +// go dispatcher.Dispatch() 启动一个独立协程 | ||
| 87 | +func (dispatcher *MessageDispatcher) Dispatch() { | ||
| 88 | + for { | ||
| 89 | + select { | ||
| 90 | + case <-dispatcher.dispatchTicker.C: | ||
| 91 | + go func(dispatcher *MessageDispatcher) { | ||
| 92 | + dispatcher.notifications <- struct{}{} | ||
| 93 | + }(dispatcher) | ||
| 94 | + case <-dispatcher.notifications: | ||
| 95 | + if dispatcher.messageRepository == nil { | ||
| 96 | + continue | ||
| 97 | + } | ||
| 98 | + messages, _ := dispatcher.messageRepository.FindNoPublishedStoredMessages() | ||
| 99 | + var messagesInProcessIds []int64 | ||
| 100 | + for i := range messages { | ||
| 101 | + messagesInProcessIds = append(messagesInProcessIds, messages[i].Id) | ||
| 102 | + } | ||
| 103 | + if messages != nil && len(messages) > 0 { | ||
| 104 | + dispatcher.messageRepository.FinishMessagesStatus(messagesInProcessIds, int(models.InProcess)) | ||
| 105 | + | ||
| 106 | + reuslt, err := dispatcher.producer.Publish(messages, nil) | ||
| 107 | + if err == nil && len(reuslt.SuccessMessageIds) > 0 { | ||
| 108 | + dispatcher.messageRepository.FinishMessagesStatus(reuslt.SuccessMessageIds, int(models.Finished)) | ||
| 109 | + } | ||
| 110 | + //发送失败的消息ID列表 更新状态 进行中->未开始 | ||
| 111 | + if len(reuslt.ErrorMessageIds) > 0 { | ||
| 112 | + dispatcher.messageRepository.FinishMessagesStatus(reuslt.ErrorMessageIds, int(models.UnFinished)) | ||
| 113 | + } | ||
| 114 | + } | ||
| 115 | + case msg := <-dispatcher.messageChan: | ||
| 116 | + dispatcher.producer.Publish([]*models.Message{msg}, nil) | ||
| 117 | + } | ||
| 118 | + } | ||
| 119 | +} | ||
| 120 | + | ||
| 121 | +type MessageDirector struct { | ||
| 122 | + messageRepository models.MessageRepository | ||
| 123 | + dispatcher *MessageDispatcher | ||
| 124 | +} | ||
| 125 | + | ||
| 126 | +func (d *MessageDirector) PublishMessages(messages []*models.Message) error { | ||
| 127 | + if d.dispatcher == nil { | ||
| 128 | + return fmt.Errorf("dispatcher还没有启动") | ||
| 129 | + } | ||
| 130 | + if d.messageRepository == nil { | ||
| 131 | + d.dispatcher.MessagePublish(messages) | ||
| 132 | + return nil | ||
| 133 | + } | ||
| 134 | + for _, message := range messages { | ||
| 135 | + if err := d.messageRepository.SaveMessage(message); err != nil { | ||
| 136 | + return err | ||
| 137 | + } | ||
| 138 | + } | ||
| 139 | + if err := d.dispatcher.MessagePublishedNotice(); err != nil { | ||
| 140 | + return err | ||
| 141 | + } | ||
| 142 | + return nil | ||
| 143 | +} | ||
| 144 | + | ||
| 145 | +// 消息发布器 | ||
| 146 | +// options["kafkaHosts"]="localhost:9092" | ||
| 147 | +// options["timeInterval"]=time.Second*60*5 | ||
| 148 | +func NewMessageDirector(messageRepository models.MessageRepository, options map[string]interface{}) *MessageDirector { | ||
| 149 | + dispatcher := &MessageDispatcher{ | ||
| 150 | + notifications: make(chan struct{}), | ||
| 151 | + messageRepository: messageRepository, | ||
| 152 | + messageChan: make(chan *models.Message, 100), | ||
| 153 | + } | ||
| 154 | + | ||
| 155 | + var hosts string | ||
| 156 | + if kafkaHosts, ok := options["kafkaHosts"]; ok { | ||
| 157 | + hosts = kafkaHosts.(string) | ||
| 158 | + } else { | ||
| 159 | + hosts = "localhost:9092" | ||
| 160 | + } | ||
| 161 | + dispatcher.producer = &KafkaMessageProducer{KafkaHosts: hosts, LogInfo: models.DefaultLog} | ||
| 162 | + | ||
| 163 | + if interval, ok := options["timeInterval"]; ok { | ||
| 164 | + dispatcher.dispatchTicker = time.NewTicker(interval.(time.Duration)) | ||
| 165 | + } else { | ||
| 166 | + dispatcher.dispatchTicker = time.NewTicker(time.Second * 60 * 5) | ||
| 167 | + } | ||
| 168 | + go dispatcher.Dispatch() | ||
| 169 | + | ||
| 170 | + return &MessageDirector{ | ||
| 171 | + messageRepository: messageRepository, | ||
| 172 | + dispatcher: dispatcher, | ||
| 173 | + } | ||
| 174 | +} |
pkg/infrastructure/message/models/message.go
0 → 100644
| 1 | +package models | ||
| 2 | + | ||
| 3 | +import "log" | ||
| 4 | + | ||
| 5 | +// 消息存储-发布 | ||
| 6 | +type MessageRepository interface { | ||
| 7 | + SaveMessage(message *Message) error | ||
| 8 | + FindNoPublishedStoredMessages() ([]*Message, error) | ||
| 9 | + FinishMessagesStatus(messageIds []int64, finishStatus int) error | ||
| 10 | +} | ||
| 11 | + | ||
| 12 | +// 消息存储-接收 | ||
| 13 | +type MessageReceiverRepository interface { | ||
| 14 | + ReceiveMessage(params map[string]interface{}) error | ||
| 15 | + ConfirmReceive(params map[string]interface{}) error | ||
| 16 | +} | ||
| 17 | + | ||
| 18 | +// 消费者 | ||
| 19 | +type Consumer interface { | ||
| 20 | + StartConsume() error | ||
| 21 | + WithTopicHandler(topic string, handler func(message interface{}) error) | ||
| 22 | + WithMessageReceiver(receiver MessageReceiverRepository) | ||
| 23 | +} | ||
| 24 | + | ||
| 25 | +// 生产者 | ||
| 26 | +type MessageProducer interface { | ||
| 27 | + Publish(messages []*Message, option map[string]interface{}) (*MessagePublishResult, error) | ||
| 28 | +} | ||
| 29 | + | ||
| 30 | +type LogInfo func(params ...interface{}) | ||
| 31 | + | ||
| 32 | +var DefaultLog LogInfo = func(params ...interface{}) { | ||
| 33 | + log.Println(params...) | ||
| 34 | +} |
pkg/infrastructure/message/models/models.go
0 → 100644
| 1 | +package models | ||
| 2 | + | ||
| 3 | +type Message struct { | ||
| 4 | + Id int64 `json:"id"` | ||
| 5 | + Topic string `json:"topic"` | ||
| 6 | + Value string `json:"value"` | ||
| 7 | + MsgTime int64 `json:"msg_time"` | ||
| 8 | + FinishStatus int `json:"-"` //0:未完成 2:已完成 1:进行中 3:忽略 | ||
| 9 | +} | ||
| 10 | + | ||
| 11 | +//结束状态 | ||
| 12 | +type FinishStatus int | ||
| 13 | + | ||
| 14 | +const ( | ||
| 15 | + UnFinished FinishStatus = 0 | ||
| 16 | + InProcess FinishStatus = 1 | ||
| 17 | + Finished FinishStatus = 2 | ||
| 18 | + Ignore FinishStatus = 3 | ||
| 19 | +) | ||
| 20 | + | ||
| 21 | +type MessagePublishResult struct { | ||
| 22 | + SuccessMessageIds []int64 | ||
| 23 | + ErrorMessageIds []int64 | ||
| 24 | +} |
pkg/infrastructure/message/producer.go
0 → 100644
| 1 | +package message | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax" | ||
| 5 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 6 | +) | ||
| 7 | + | ||
| 8 | +// 消息发布器 | ||
| 9 | +// options["kafkaHosts"]="localhost:9092" | ||
| 10 | +// options["timeInterval"]=time.Second*60*5 | ||
| 11 | +func NewMessageProducer(messageRepository models.MessageRepository, options map[string]interface{}) *kafkax.MessageDirector { | ||
| 12 | + dispatcher := kafkax.NewMessageDirector(messageRepository, options) | ||
| 13 | + return dispatcher | ||
| 14 | +} |
pkg/infrastructure/message/producer_test.go
0 → 100644
| 1 | +package message | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "github.com/go-pg/pg/v10" | ||
| 5 | + "github.com/tiptok/gocomm/identity/idgen" | ||
| 6 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax" | ||
| 8 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models" | ||
| 9 | + pgDB "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg" | ||
| 10 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction" | ||
| 11 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils" | ||
| 12 | + "testing" | ||
| 13 | + "time" | ||
| 14 | +) | ||
| 15 | + | ||
| 16 | +// 发布消息本地持久化 | ||
| 17 | +func TestNewMessageProducer(t *testing.T) { | ||
| 18 | + var ( | ||
| 19 | + transactionContext = transaction.NewPGTransactionContext(pgDB.DB) | ||
| 20 | + err error | ||
| 21 | + ) | ||
| 22 | + | ||
| 23 | + producer := NewMessageProducer(NewPgMessageRepository(transactionContext), map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS}) | ||
| 24 | + err = producer.PublishMessages([]*models.Message{ | ||
| 25 | + &models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0}, | ||
| 26 | + }) | ||
| 27 | + | ||
| 28 | + if err != nil { | ||
| 29 | + return | ||
| 30 | + } | ||
| 31 | + time.Sleep(time.Second * 2) | ||
| 32 | +} | ||
| 33 | + | ||
| 34 | +// 发布消息无本地持久化 | ||
| 35 | +func TestNewMessageProducerNoRepository(t *testing.T) { | ||
| 36 | + var ( | ||
| 37 | + err error | ||
| 38 | + ) | ||
| 39 | + | ||
| 40 | + producer := NewMessageProducer(nil, map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS}) | ||
| 41 | + err = producer.PublishMessages([]*models.Message{ | ||
| 42 | + &models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0}, | ||
| 43 | + }) | ||
| 44 | + | ||
| 45 | + if err != nil { | ||
| 46 | + return | ||
| 47 | + } | ||
| 48 | + time.Sleep(time.Second * 2) | ||
| 49 | +} | ||
| 50 | + | ||
| 51 | +// 简单发布消息 | ||
| 52 | +func TestSampleProducer(t *testing.T) { | ||
| 53 | + var producer models.MessageProducer = &kafkax.KafkaMessageProducer{ | ||
| 54 | + KafkaHosts: constant.KAFKA_HOSTS, | ||
| 55 | + } | ||
| 56 | + _, err := producer.Publish([]*models.Message{{Id: 22, Topic: "mmm_xcx_orders", MsgTime: time.Now().Unix(), Value: "hello ccc20201009!"}}, nil) | ||
| 57 | + if err != nil { | ||
| 58 | + t.Fatal(err) | ||
| 59 | + } | ||
| 60 | +} | ||
| 61 | + | ||
| 62 | +type PgMessageRepository struct { | ||
| 63 | + transactionContext *transaction.TransactionContext | ||
| 64 | +} | ||
| 65 | + | ||
| 66 | +func (repository *PgMessageRepository) SaveMessage(message *models.Message) error { | ||
| 67 | + sql := `insert into sys_message_produce (id,topic,value,msg_time,status)values(?,?,?,?,?)` | ||
| 68 | + _, err := repository.transactionContext.PgDd.Exec(sql, message.Id, message.Topic, utils.JsonAssertString(message), message.MsgTime, int64(models.UnFinished)) | ||
| 69 | + return err | ||
| 70 | +} | ||
| 71 | +func (repository *PgMessageRepository) FindNoPublishedStoredMessages() ([]*models.Message, error) { | ||
| 72 | + sql := `select value from sys_message_produce where status=?` | ||
| 73 | + var values []string | ||
| 74 | + _, e := repository.transactionContext.PgDd.Query(&values, sql, int64(models.UnFinished)) | ||
| 75 | + var messages = make([]*models.Message, 0) | ||
| 76 | + if e != nil { | ||
| 77 | + return messages, nil | ||
| 78 | + } | ||
| 79 | + for _, v := range values { | ||
| 80 | + item := &models.Message{} | ||
| 81 | + utils.JsonUnmarshal(v, item) | ||
| 82 | + if item.Id != 0 { | ||
| 83 | + messages = append(messages, item) | ||
| 84 | + } | ||
| 85 | + } | ||
| 86 | + return messages, nil | ||
| 87 | +} | ||
| 88 | +func (repository *PgMessageRepository) FinishMessagesStatus(messageIds []int64, finishStatus int) error { | ||
| 89 | + _, err := repository.transactionContext.PgDd.Exec("update sys_message_produce set status=? where id in (?)", finishStatus, pg.In(messageIds)) | ||
| 90 | + return err | ||
| 91 | +} | ||
| 92 | +func NewPgMessageRepository(transactionContext *transaction.TransactionContext) *PgMessageRepository { | ||
| 93 | + return &PgMessageRepository{ | ||
| 94 | + transactionContext: transactionContext, | ||
| 95 | + } | ||
| 96 | +} |
| @@ -28,6 +28,8 @@ func init() { | @@ -28,6 +28,8 @@ func init() { | ||
| 28 | (*models.PartnerInfo)(nil), | 28 | (*models.PartnerInfo)(nil), |
| 29 | (*models.PartnerSubAccount)(nil), | 29 | (*models.PartnerSubAccount)(nil), |
| 30 | (*models.Company)(nil), | 30 | (*models.Company)(nil), |
| 31 | + //(*models.SysMessageConsume)(nil), | ||
| 32 | + //(*models.SysMessageProduce)(nil), | ||
| 31 | (*models.OrderBase)(nil), | 33 | (*models.OrderBase)(nil), |
| 32 | (*models.OrderGood)(nil), | 34 | (*models.OrderGood)(nil), |
| 33 | (*models.ImInfo)(nil), | 35 | (*models.ImInfo)(nil), |
| @@ -60,4 +60,8 @@ type OrderBase struct { | @@ -60,4 +60,8 @@ type OrderBase struct { | ||
| 60 | BonusStatus int8 | 60 | BonusStatus int8 |
| 61 | //货物列表 | 61 | //货物列表 |
| 62 | OrderGood []*OrderGood `pg:"fk:order_id"` | 62 | OrderGood []*OrderGood `pg:"fk:order_id"` |
| 63 | + // 数据来源 | ||
| 64 | + DataFrom *domain.OrderDataFrom | ||
| 65 | + // 备注 | ||
| 66 | + Remark string | ||
| 63 | } | 67 | } |
| 1 | package models | 1 | package models |
| 2 | 2 | ||
| 3 | +import "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | ||
| 4 | + | ||
| 3 | //OrderGood 订单中的货品 | 5 | //OrderGood 订单中的货品 |
| 4 | type OrderGood struct { | 6 | type OrderGood struct { |
| 5 | tableName struct{} `pg:"order_good"` | 7 | tableName struct{} `pg:"order_good"` |
| @@ -37,4 +39,8 @@ type OrderGood struct { | @@ -37,4 +39,8 @@ type OrderGood struct { | ||
| 37 | BonusStatus int | 39 | BonusStatus int |
| 38 | //备注信息 | 40 | //备注信息 |
| 39 | Remark string | 41 | Remark string |
| 42 | + // 数据来源 | ||
| 43 | + DataFrom *domain.OrderDataFrom | ||
| 44 | + // 备注原因 | ||
| 45 | + RemarkReason string | ||
| 40 | } | 46 | } |
| 1 | package models | 1 | package models |
| 2 | 2 | ||
| 3 | -import "time" | 3 | +import ( |
| 4 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | ||
| 5 | + "time" | ||
| 6 | +) | ||
| 4 | 7 | ||
| 5 | // 公司信息 | 8 | // 公司信息 |
| 6 | type Company struct { | 9 | type Company struct { |
| @@ -29,4 +32,6 @@ type Company struct { | @@ -29,4 +32,6 @@ type Company struct { | ||
| 29 | DeleteAt time.Time | 32 | DeleteAt time.Time |
| 30 | // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 | 33 | // 是否开启合伙人模块,是否有效【1:有效】【2:无效】 |
| 31 | Enable int8 | 34 | Enable int8 |
| 35 | + // 小程序 | ||
| 36 | + Applets []domain.CompanyApplets | ||
| 32 | } | 37 | } |
| 1 | +package models | ||
| 2 | + | ||
| 3 | +// SysMessageConsume | ||
| 4 | +type SysMessageConsume struct { | ||
| 5 | + tableName struct{} `pg:"sys_message_consume"` | ||
| 6 | + // 消息ID | ||
| 7 | + Id int64 | ||
| 8 | + // 主题 | ||
| 9 | + Topic string | ||
| 10 | + // 分区信息 | ||
| 11 | + Partition int | ||
| 12 | + // 消息偏移序号 | ||
| 13 | + Offset int64 | ||
| 14 | + // 键值 | ||
| 15 | + Key string | ||
| 16 | + // 消息内容 | ||
| 17 | + Value string | ||
| 18 | + // 消息时间 | ||
| 19 | + MsgTime int64 | ||
| 20 | + // 创建时间 | ||
| 21 | + CreateAt int64 | ||
| 22 | + // 状态 | ||
| 23 | + Status int64 | ||
| 24 | +} |
| 1 | +package models | ||
| 2 | + | ||
| 3 | +// SysMessageProduce | ||
| 4 | +type SysMessageProduce struct { | ||
| 5 | + tableName struct{} `pg:"sys_message_produce"` | ||
| 6 | + // 消息ID | ||
| 7 | + Id int64 | ||
| 8 | + // 主题 | ||
| 9 | + Topic string | ||
| 10 | + // 分区信息 | ||
| 11 | + Partition int | ||
| 12 | + // 消息内容 | ||
| 13 | + Value string | ||
| 14 | + // 消息时间 | ||
| 15 | + MsgTime int64 | ||
| 16 | + // 状态 | ||
| 17 | + Status int64 | ||
| 18 | +} |
| 1 | package repository | 1 | package repository |
| 2 | 2 | ||
| 3 | import ( | 3 | import ( |
| 4 | + "github.com/go-pg/pg/v10" | ||
| 4 | "github.com/go-pg/pg/v10/orm" | 5 | "github.com/go-pg/pg/v10/orm" |
| 5 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | 6 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" |
| 6 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models" | 7 | "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models" |
| @@ -84,6 +85,9 @@ func (repository *OrderBaseRepository) Find(queryOptions map[string]interface{}) | @@ -84,6 +85,9 @@ func (repository *OrderBaseRepository) Find(queryOptions map[string]interface{}) | ||
| 84 | SetLimit(). | 85 | SetLimit(). |
| 85 | SetOrder(`order_base.create_time`, "sortByCreateTime"). | 86 | SetOrder(`order_base.create_time`, "sortByCreateTime"). |
| 86 | SetOrder(`order_base.update_time`, "sortByUpdateTime") | 87 | SetOrder(`order_base.update_time`, "sortByUpdateTime") |
| 88 | + if v, ok := queryOptions["orderTypes"]; ok { | ||
| 89 | + query.Where(`"order_base".order_type in (?)`, pg.In(v)) | ||
| 90 | + } | ||
| 87 | var err error | 91 | var err error |
| 88 | if query.AffectRow, err = query.SelectAndCount(); err != nil { | 92 | if query.AffectRow, err = query.SelectAndCount(); err != nil { |
| 89 | return 0, OrderBases, err | 93 | return 0, OrderBases, err |
| 1 | +package repository | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "fmt" | ||
| 5 | + "github.com/tiptok/gocomm/common" | ||
| 6 | + . "github.com/tiptok/gocomm/pkg/orm/pgx" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | ||
| 8 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models" | ||
| 9 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction" | ||
| 10 | +) | ||
| 11 | + | ||
| 12 | +type SysMessageConsumeRepository struct { | ||
| 13 | + transactionContext *transaction.TransactionContext | ||
| 14 | +} | ||
| 15 | + | ||
| 16 | +func (repository *SysMessageConsumeRepository) Save(dm *domain.SysMessageConsume) (*domain.SysMessageConsume, error) { | ||
| 17 | + var ( | ||
| 18 | + err error | ||
| 19 | + m = &models.SysMessageConsume{} | ||
| 20 | + tx = repository.transactionContext.PgTx | ||
| 21 | + ) | ||
| 22 | + if err = common.GobModelTransform(m, dm); err != nil { | ||
| 23 | + return nil, err | ||
| 24 | + } | ||
| 25 | + if dm.Identify() == nil { | ||
| 26 | + if err = tx.Insert(m); err != nil { | ||
| 27 | + return nil, err | ||
| 28 | + } | ||
| 29 | + return dm, nil | ||
| 30 | + } | ||
| 31 | + if err = tx.Update(m); err != nil { | ||
| 32 | + return nil, err | ||
| 33 | + } | ||
| 34 | + return dm, nil | ||
| 35 | +} | ||
| 36 | + | ||
| 37 | +func (repository *SysMessageConsumeRepository) Remove(SysMessageConsume *domain.SysMessageConsume) (*domain.SysMessageConsume, error) { | ||
| 38 | + var ( | ||
| 39 | + tx = repository.transactionContext.PgTx | ||
| 40 | + SysMessageConsumeModel = &models.SysMessageConsume{Id: SysMessageConsume.Identify().(int64)} | ||
| 41 | + ) | ||
| 42 | + if _, err := tx.Model(SysMessageConsumeModel).Where("id = ?", SysMessageConsume.Id).Delete(); err != nil { | ||
| 43 | + return SysMessageConsume, err | ||
| 44 | + } | ||
| 45 | + return SysMessageConsume, nil | ||
| 46 | +} | ||
| 47 | + | ||
| 48 | +func (repository *SysMessageConsumeRepository) FindOne(queryOptions map[string]interface{}) (*domain.SysMessageConsume, error) { | ||
| 49 | + tx := repository.transactionContext.PgTx | ||
| 50 | + SysMessageConsumeModel := new(models.SysMessageConsume) | ||
| 51 | + query := NewQuery(tx.Model(SysMessageConsumeModel), queryOptions) | ||
| 52 | + query.SetWhere("id = ?", "id") | ||
| 53 | + if err := query.First(); err != nil { | ||
| 54 | + return nil, fmt.Errorf("query row not found") | ||
| 55 | + } | ||
| 56 | + if SysMessageConsumeModel.Id == 0 { | ||
| 57 | + return nil, fmt.Errorf("query row not found") | ||
| 58 | + } | ||
| 59 | + return repository.transformPgModelToDomainModel(SysMessageConsumeModel) | ||
| 60 | +} | ||
| 61 | + | ||
| 62 | +func (repository *SysMessageConsumeRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.SysMessageConsume, error) { | ||
| 63 | + tx := repository.transactionContext.PgTx | ||
| 64 | + var SysMessageConsumeModels []*models.SysMessageConsume | ||
| 65 | + SysMessageConsumes := make([]*domain.SysMessageConsume, 0) | ||
| 66 | + query := NewQuery(tx.Model(&SysMessageConsumeModels), queryOptions). | ||
| 67 | + SetOrder("create_at", "sortByCreateTime") | ||
| 68 | + var err error | ||
| 69 | + if query.AffectRow, err = query.SelectAndCount(); err != nil { | ||
| 70 | + return 0, SysMessageConsumes, err | ||
| 71 | + } | ||
| 72 | + for _, SysMessageConsumeModel := range SysMessageConsumeModels { | ||
| 73 | + if SysMessageConsume, err := repository.transformPgModelToDomainModel(SysMessageConsumeModel); err != nil { | ||
| 74 | + return 0, SysMessageConsumes, err | ||
| 75 | + } else { | ||
| 76 | + SysMessageConsumes = append(SysMessageConsumes, SysMessageConsume) | ||
| 77 | + } | ||
| 78 | + } | ||
| 79 | + return int64(query.AffectRow), SysMessageConsumes, nil | ||
| 80 | +} | ||
| 81 | + | ||
| 82 | +func (repository *SysMessageConsumeRepository) transformPgModelToDomainModel(SysMessageConsumeModel *models.SysMessageConsume) (*domain.SysMessageConsume, error) { | ||
| 83 | + m := &domain.SysMessageConsume{} | ||
| 84 | + err := common.GobModelTransform(m, SysMessageConsumeModel) | ||
| 85 | + return m, err | ||
| 86 | +} | ||
| 87 | + | ||
| 88 | +func NewSysMessageConsumeRepository(transactionContext *transaction.TransactionContext) (*SysMessageConsumeRepository, error) { | ||
| 89 | + if transactionContext == nil { | ||
| 90 | + return nil, fmt.Errorf("transactionContext参数不能为nil") | ||
| 91 | + } | ||
| 92 | + return &SysMessageConsumeRepository{transactionContext: transactionContext}, nil | ||
| 93 | +} |
| 1 | +package repository | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "fmt" | ||
| 5 | + "github.com/tiptok/gocomm/common" | ||
| 6 | + . "github.com/tiptok/gocomm/pkg/orm/pgx" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/domain" | ||
| 8 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/models" | ||
| 9 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction" | ||
| 10 | +) | ||
| 11 | + | ||
| 12 | +type SysMessageProduceRepository struct { | ||
| 13 | + transactionContext *transaction.TransactionContext | ||
| 14 | +} | ||
| 15 | + | ||
| 16 | +func (repository *SysMessageProduceRepository) Save(dm *domain.SysMessageProduce) (*domain.SysMessageProduce, error) { | ||
| 17 | + var ( | ||
| 18 | + err error | ||
| 19 | + m = &models.SysMessageProduce{} | ||
| 20 | + tx = repository.transactionContext.PgTx | ||
| 21 | + ) | ||
| 22 | + if err = common.GobModelTransform(m, dm); err != nil { | ||
| 23 | + return nil, err | ||
| 24 | + } | ||
| 25 | + | ||
| 26 | + if err = tx.Insert(m); err != nil { | ||
| 27 | + return nil, err | ||
| 28 | + } | ||
| 29 | + return dm, nil | ||
| 30 | +} | ||
| 31 | + | ||
| 32 | +func (repository *SysMessageProduceRepository) Remove(SysMessageProduce *domain.SysMessageProduce) (*domain.SysMessageProduce, error) { | ||
| 33 | + var ( | ||
| 34 | + tx = repository.transactionContext.PgTx | ||
| 35 | + SysMessageProduceModel = &models.SysMessageProduce{Id: SysMessageProduce.Identify().(int64)} | ||
| 36 | + ) | ||
| 37 | + if _, err := tx.Model(SysMessageProduceModel).Where("id = ?", SysMessageProduce.Id).Delete(); err != nil { | ||
| 38 | + return SysMessageProduce, err | ||
| 39 | + } | ||
| 40 | + return SysMessageProduce, nil | ||
| 41 | +} | ||
| 42 | + | ||
| 43 | +func (repository *SysMessageProduceRepository) FindOne(queryOptions map[string]interface{}) (*domain.SysMessageProduce, error) { | ||
| 44 | + tx := repository.transactionContext.PgTx | ||
| 45 | + SysMessageProduceModel := new(models.SysMessageProduce) | ||
| 46 | + query := NewQuery(tx.Model(SysMessageProduceModel), queryOptions) | ||
| 47 | + query.SetWhere("id = ?", "id") | ||
| 48 | + if err := query.First(); err != nil { | ||
| 49 | + return nil, fmt.Errorf("query row not found") | ||
| 50 | + } | ||
| 51 | + if SysMessageProduceModel.Id == 0 { | ||
| 52 | + return nil, fmt.Errorf("query row not found") | ||
| 53 | + } | ||
| 54 | + return repository.transformPgModelToDomainModel(SysMessageProduceModel) | ||
| 55 | +} | ||
| 56 | + | ||
| 57 | +func (repository *SysMessageProduceRepository) Find(queryOptions map[string]interface{}) (int64, []*domain.SysMessageProduce, error) { | ||
| 58 | + tx := repository.transactionContext.PgDd | ||
| 59 | + var SysMessageProduceModels []*models.SysMessageProduce | ||
| 60 | + SysMessageProduces := make([]*domain.SysMessageProduce, 0) | ||
| 61 | + query := NewQuery(tx.Model(&SysMessageProduceModels), queryOptions). | ||
| 62 | + SetWhere("status = ?", "status"). | ||
| 63 | + SetOrder("update_time", "sortByUpdateTime") | ||
| 64 | + var err error | ||
| 65 | + if query.AffectRow, err = query.SelectAndCount(); err != nil { | ||
| 66 | + return 0, SysMessageProduces, err | ||
| 67 | + } | ||
| 68 | + for _, SysMessageProduceModel := range SysMessageProduceModels { | ||
| 69 | + if SysMessageProduce, err := repository.transformPgModelToDomainModel(SysMessageProduceModel); err != nil { | ||
| 70 | + return 0, SysMessageProduces, err | ||
| 71 | + } else { | ||
| 72 | + SysMessageProduces = append(SysMessageProduces, SysMessageProduce) | ||
| 73 | + } | ||
| 74 | + } | ||
| 75 | + return int64(query.AffectRow), SysMessageProduces, nil | ||
| 76 | +} | ||
| 77 | + | ||
| 78 | +func (repository *SysMessageProduceRepository) transformPgModelToDomainModel(SysMessageProduceModel *models.SysMessageProduce) (*domain.SysMessageProduce, error) { | ||
| 79 | + m := &domain.SysMessageProduce{} | ||
| 80 | + err := common.GobModelTransform(m, SysMessageProduceModel) | ||
| 81 | + return m, err | ||
| 82 | +} | ||
| 83 | + | ||
| 84 | +func NewSysMessageProduceRepository(transactionContext *transaction.TransactionContext) (*SysMessageProduceRepository, error) { | ||
| 85 | + if transactionContext == nil { | ||
| 86 | + return nil, fmt.Errorf("transactionContext参数不能为nil") | ||
| 87 | + } | ||
| 88 | + return &SysMessageProduceRepository{transactionContext: transactionContext}, nil | ||
| 89 | +} |
| @@ -66,7 +66,7 @@ func (this *OrderController) OrderList() { | @@ -66,7 +66,7 @@ func (this *OrderController) OrderList() { | ||
| 66 | msg = m | 66 | msg = m |
| 67 | return | 67 | return |
| 68 | } | 68 | } |
| 69 | - request.OrderType = domain.OrderReal | 69 | + request.OrderTypes = domain.UserOrderTypes(domain.Career) |
| 70 | header := this.GetRequestHeader(this.Ctx) | 70 | header := this.GetRequestHeader(this.Ctx) |
| 71 | msg = protocol.NewReturnResponse(order.List(header, request)) | 71 | msg = protocol.NewReturnResponse(order.List(header, request)) |
| 72 | } | 72 | } |
| 1 | +package messageHandler | ||
| 2 | + | ||
| 3 | +import "github.com/Shopify/sarama" | ||
| 4 | + | ||
| 5 | +type UcenterMessageCommand struct { | ||
| 6 | +} | ||
| 7 | + | ||
| 8 | +func (c *UcenterMessageCommand) ChangePhoneHandler(message interface{}) error { | ||
| 9 | + msg, ok := message.(*sarama.Message) | ||
| 10 | + if !ok && msg == nil { | ||
| 11 | + return nil | ||
| 12 | + } | ||
| 13 | + return nil | ||
| 14 | +} |
pkg/port/sarama/sarama.go
0 → 100644
| 1 | +package sarama | ||
| 2 | + | ||
| 3 | +import ( | ||
| 4 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant" | ||
| 5 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax" | ||
| 6 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/log" | ||
| 7 | + "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/port/sarama/messageHandler" | ||
| 8 | + //"suplus-message/pkg/constant" | ||
| 9 | + //"suplus-message/pkg/port/sarama/messageHandler" | ||
| 10 | +) | ||
| 11 | + | ||
| 12 | +func Run() { | ||
| 13 | + var ( | ||
| 14 | + ucenterMessage = &messageHandler.UcenterMessageCommand{} | ||
| 15 | + ) | ||
| 16 | + | ||
| 17 | + saramaConsumer := kafkax.NewSaramaConsumer(constant.KAFKA_HOSTS, constant.SERVICE_NAME) | ||
| 18 | + saramaConsumer.WithTopicHandler(constant.TOPIC_UCENT_USER_CHANGE_PHONE, ucenterMessage.ChangePhoneHandler) | ||
| 19 | + | ||
| 20 | + err := saramaConsumer.StartConsume() | ||
| 21 | + if err != nil { | ||
| 22 | + log.Error(err) | ||
| 23 | + } | ||
| 24 | +} |
| @@ -71,6 +71,7 @@ type OrderListRequest struct { | @@ -71,6 +71,7 @@ type OrderListRequest struct { | ||
| 71 | PageIndex int `json:"pageIndex"` | 71 | PageIndex int `json:"pageIndex"` |
| 72 | PageSize int `json:"pageSize" valid:"Required"` | 72 | PageSize int `json:"pageSize" valid:"Required"` |
| 73 | OrderType int `json:"-"` | 73 | OrderType int `json:"-"` |
| 74 | + OrderTypes []int `json:"-"` | ||
| 74 | } | 75 | } |
| 75 | type OrderListResponse struct { | 76 | type OrderListResponse struct { |
| 76 | List []*OrderListItem `json:"list"` | 77 | List []*OrderListItem `json:"list"` |
pkg/protocol/user/user.go
0 → 100644
| 1 | +package user | ||
| 2 | + | ||
| 3 | +type Company struct { | ||
| 4 | + Id int64 `json:"id"` | ||
| 5 | + Name string `json:"name"` | ||
| 6 | + Phone string `json:"phone"` | ||
| 7 | + //合作区域 | ||
| 8 | + District interface{} `json:"district"` | ||
| 9 | + //合作编码 | ||
| 10 | + SerialNo int64 `json:"serialNo"` | ||
| 11 | + //合作时间 | ||
| 12 | + CooperateTime int64 `json:"cooperationTime"` | ||
| 13 | + Salesman interface{} `json:"salesman"` | ||
| 14 | + MiniProgram interface{} `json:"miniProgram"` | ||
| 15 | +} | ||
| 16 | +type User struct { | ||
| 17 | + Id int64 `json:"uid"` | ||
| 18 | + //用户名称 | ||
| 19 | + PartnerName string `json:"uname"` | ||
| 20 | + //手机号 | ||
| 21 | + Phone string `json:"phone"` | ||
| 22 | + //合作公司 | ||
| 23 | + CooperateCompany Company `json:"company"` | ||
| 24 | +} |
-
请 注册 或 登录 后发表评论