作者 陈志颖

合并分支 'dev' 到 'test'

Dev



查看合并请求 !53
@@ -3,11 +3,12 @@ package factory @@ -3,11 +3,12 @@ package factory
3 import ( 3 import (
4 "github.com/linmadan/egglib-go/core/application" 4 "github.com/linmadan/egglib-go/core/application"
5 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg" 5 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg"
6 - "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg/transaction" 6 + pG "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg/transaction"
7 ) 7 )
8 8
9 func CreateTransactionContext(options map[string]interface{}) (application.TransactionContext, error) { 9 func CreateTransactionContext(options map[string]interface{}) (application.TransactionContext, error) {
10 - return &transaction.TransactionContext{  
11 - PgDd: pg.DB,  
12 - }, nil 10 + //return &transaction.TransactionContext{
  11 + // PgDd: pg.DB,
  12 + //}, nil
  13 + return pG.NewPGTransactionContext(pg.DB), nil
13 } 14 }
@@ -29,6 +29,8 @@ type CreateOrderFromBestshop struct { @@ -29,6 +29,8 @@ type CreateOrderFromBestshop struct {
29 DeliveryTime string `json:"deliveryTime"` 29 DeliveryTime string `json:"deliveryTime"`
30 PartnerId int64 `json:"partnerId"` 30 PartnerId int64 `json:"partnerId"`
31 OrderArea string `json:"orderArea"` 31 OrderArea string `json:"orderArea"`
  32 + //小程序id
  33 + WxAppletId string `json:"wxAppletId"`
32 Goods []struct { 34 Goods []struct {
33 Id int64 `json:"id"` 35 Id int64 `json:"id"`
34 //货品编号 36 //货品编号
@@ -37,7 +37,7 @@ func (s SyncOrderService) SyncOrderFromBestshop(cmd command.CreateOrderFromBests @@ -37,7 +37,7 @@ func (s SyncOrderService) SyncOrderFromBestshop(cmd command.CreateOrderFromBests
37 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error()) 37 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
38 } 38 }
39 defer func() { 39 defer func() {
40 - transactionContext.RollbackTransaction() 40 + _ = transactionContext.RollbackTransaction()
41 }() 41 }()
42 42
43 //检查账号是否存在 43 //检查账号是否存在
@@ -101,7 +101,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes @@ -101,7 +101,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes
101 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error()) 101 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
102 } 102 }
103 defer func() { 103 defer func() {
104 - transactionContext.RollbackTransaction() 104 + _ = transactionContext.RollbackTransaction()
105 }() 105 }()
106 var ( 106 var (
107 orderBestshopRepository domain.OrderBestshopRepository 107 orderBestshopRepository domain.OrderBestshopRepository
@@ -117,6 +117,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes @@ -117,6 +117,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes
117 }); err != nil { 117 }); err != nil {
118 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error()) 118 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, err.Error())
119 } 119 }
  120 + // TODO 增加小程序id
120 order := domain.OrderBestShop{ 121 order := domain.OrderBestShop{
121 OrderCode: cmd.OrderCode, 122 OrderCode: cmd.OrderCode,
122 OrderTime: cmd.OrderTime, 123 OrderTime: cmd.OrderTime,
@@ -135,6 +136,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes @@ -135,6 +136,7 @@ func (s SyncOrderService) CreateOrderFromBestshop(cmd command.CreateOrderFromBes
135 IsCopy: false, 136 IsCopy: false,
136 CompanyId: cmd.CompanyId, 137 CompanyId: cmd.CompanyId,
137 OrderArea: cmd.OrderArea, 138 OrderArea: cmd.OrderArea,
  139 + WxAppletId: cmd.WxAppletId,
138 } 140 }
139 err = orderBestshopRepository.Add(&order) 141 err = orderBestshopRepository.Add(&order)
140 if err != nil { 142 if err != nil {
@@ -232,11 +234,16 @@ func (s SyncOrderService) copyOrderBestshopToOrderBase(orderBestshop *domain.Ord @@ -232,11 +234,16 @@ func (s SyncOrderService) copyOrderBestshopToOrderBase(orderBestshop *domain.Ord
232 e := fmt.Sprintf("未找到指定的合伙人的公司(partner_id=%d,company_id=%d)数据,%s", orderBestshop.PartnerId, partnerData.CompanyId, err) 234 e := fmt.Sprintf("未找到指定的合伙人的公司(partner_id=%d,company_id=%d)数据,%s", orderBestshop.PartnerId, partnerData.CompanyId, err)
233 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, e) 235 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, e)
234 } 236 }
  237 + // TODO 判断多小程序
235 for _, v := range companyData.Applets { 238 for _, v := range companyData.Applets {
236 //BEST_SHOP_UNIONID string = "gh_18eb644002fb" //香米小程序原始id 239 //BEST_SHOP_UNIONID string = "gh_18eb644002fb" //香米小程序原始id
237 //接收香米小程序的订单数据 240 //接收香米小程序的订单数据
238 - if len(v.Id) > 0 { 241 + //if len(v.Id) > 0 {
  242 + // canCopyOrder = true
  243 + //}
  244 + if v.Id == orderBestshop.WxAppletId {
239 canCopyOrder = true 245 canCopyOrder = true
  246 + break
240 } 247 }
241 } 248 }
242 if !canCopyOrder { 249 if !canCopyOrder {
@@ -329,7 +336,7 @@ func (s SyncOrderService) UpdateOrderFromBestshop(cmd command.CreateOrderFromBes @@ -329,7 +336,7 @@ func (s SyncOrderService) UpdateOrderFromBestshop(cmd command.CreateOrderFromBes
329 if err != nil { 336 if err != nil {
330 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "获取orderBestshop(order_code=%s)数据失败,err=%s", cmd.OrderCode, err.Error()) 337 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "获取orderBestshop(order_code=%s)数据失败,err=%s", cmd.OrderCode, err.Error())
331 } 338 }
332 - 339 + // TODO 增加小程序id
333 orderData.OrderCode = cmd.OrderCode 340 orderData.OrderCode = cmd.OrderCode
334 orderData.OrderTime = cmd.OrderTime 341 orderData.OrderTime = cmd.OrderTime
335 orderData.OrderState = cmd.OrderState 342 orderData.OrderState = cmd.OrderState
@@ -344,6 +351,7 @@ func (s SyncOrderService) UpdateOrderFromBestshop(cmd command.CreateOrderFromBes @@ -344,6 +351,7 @@ func (s SyncOrderService) UpdateOrderFromBestshop(cmd command.CreateOrderFromBes
344 orderData.DeliveryState = cmd.DeliveryState 351 orderData.DeliveryState = cmd.DeliveryState
345 orderData.DeliveryTime = cmd.DeliveryTime 352 orderData.DeliveryTime = cmd.DeliveryTime
346 orderData.CompanyId = cmd.CompanyId 353 orderData.CompanyId = cmd.CompanyId
  354 + orderData.WxAppletId = cmd.WxAppletId
347 err = orderBestshopRepository.Edit(orderData) 355 err = orderBestshopRepository.Edit(orderData)
348 if err != nil { 356 if err != nil {
349 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "编辑order_bestshop失败:"+err.Error()) 357 return lib.ThrowError(lib.INTERNAL_SERVER_ERROR, "编辑order_bestshop失败:"+err.Error())
@@ -41,6 +41,8 @@ type OrderBestShop struct { @@ -41,6 +41,8 @@ type OrderBestShop struct {
41 CompanyId int64 `json:"companyId"` 41 CompanyId int64 `json:"companyId"`
42 //订单区域 42 //订单区域
43 OrderArea string `json:"orderArea"` 43 OrderArea string `json:"orderArea"`
  44 + // 微信小程序id
  45 + WxAppletId string `jsons:"wxAppletId"`
44 } 46 }
45 47
46 func (order OrderBestShop) CopyToOrderBase(o *OrderBase) { 48 func (order OrderBestShop) CopyToOrderBase(o *OrderBase) {
@@ -21,7 +21,7 @@ type OrderBestshop struct { @@ -21,7 +21,7 @@ type OrderBestshop struct {
21 BuyerAddress string 21 BuyerAddress string
22 //买家备注 22 //买家备注
23 BuyerRemark string 23 BuyerRemark string
24 - // 24 + //买家id
25 BuyerId int64 25 BuyerId int64
26 //订单总数 26 //订单总数
27 OrderCount int 27 OrderCount int
@@ -31,9 +31,14 @@ type OrderBestshop struct { @@ -31,9 +31,14 @@ type OrderBestshop struct {
31 DeliveryTime string 31 DeliveryTime string
32 //创建时间 32 //创建时间
33 CreateTime time.Time 33 CreateTime time.Time
  34 + //合伙人id
34 PartnerId int64 35 PartnerId int64
35 //是否将数据同步到 order_base ,order_good 36 //是否将数据同步到 order_base ,order_good
36 IsCopy bool `pg:",use_zero"` 37 IsCopy bool `pg:",use_zero"`
  38 + //公司id
37 CompanyId int64 39 CompanyId int64
  40 + //订单区域
38 OrderArea string 41 OrderArea string
  42 + //微信小程序id
  43 + WxAppletId string
39 } 44 }
@@ -44,6 +44,7 @@ func (respository OrderBestshopRepository) transformPgModelToDomainModel(orderMo @@ -44,6 +44,7 @@ func (respository OrderBestshopRepository) transformPgModelToDomainModel(orderMo
44 IsCopy: orderModel.IsCopy, 44 IsCopy: orderModel.IsCopy,
45 CompanyId: orderModel.CompanyId, 45 CompanyId: orderModel.CompanyId,
46 OrderArea: orderModel.OrderArea, 46 OrderArea: orderModel.OrderArea,
  47 + WxAppletId: orderModel.WxAppletId,
47 }, nil 48 }, nil
48 } 49 }
49 50
@@ -67,6 +68,7 @@ func (respository OrderBestshopRepository) Add(order *domain.OrderBestShop) erro @@ -67,6 +68,7 @@ func (respository OrderBestshopRepository) Add(order *domain.OrderBestShop) erro
67 IsCopy: order.IsCopy, 68 IsCopy: order.IsCopy,
68 CompanyId: order.CompanyId, 69 CompanyId: order.CompanyId,
69 OrderArea: order.OrderArea, 70 OrderArea: order.OrderArea,
  71 + WxAppletId: order.WxAppletId,
70 } 72 }
71 _, err := tx.Model(&m).Insert() 73 _, err := tx.Model(&m).Insert()
72 order.Id = m.Id 74 order.Id = m.Id
@@ -94,6 +96,7 @@ func (respository OrderBestshopRepository) Edit(order *domain.OrderBestShop) err @@ -94,6 +96,7 @@ func (respository OrderBestshopRepository) Edit(order *domain.OrderBestShop) err
94 IsCopy: order.IsCopy, 96 IsCopy: order.IsCopy,
95 CompanyId: order.CompanyId, 97 CompanyId: order.CompanyId,
96 OrderArea: order.OrderArea, 98 OrderArea: order.OrderArea,
  99 + WxAppletId: order.WxAppletId,
97 } 100 }
98 _, err := tx.Model(&m).Where("id=?", order.Id).Update() 101 _, err := tx.Model(&m).Where("id=?", order.Id).Update()
99 order.Id = m.Id 102 order.Id = m.Id
@@ -13,14 +13,18 @@ import ( @@ -13,14 +13,18 @@ import (
13 func DataFromXiangMi(message *sarama.ConsumerMessage) error { 13 func DataFromXiangMi(message *sarama.ConsumerMessage) error {
14 logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", 14 logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
15 message.Timestamp, message.Topic, message.Offset, string(message.Value)) 15 message.Timestamp, message.Topic, message.Offset, string(message.Value))
  16 +
16 var ( 17 var (
17 msgData DataFromMessage 18 msgData DataFromMessage
18 err error 19 err error
19 ) 20 )
  21 +
20 err = json.Unmarshal(message.Value, &msgData) 22 err = json.Unmarshal(message.Value, &msgData)
21 if err != nil { 23 if err != nil {
22 return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err) 24 return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
23 } 25 }
  26 +
  27 + // TODO 使用小程序id作为module
24 dataAction := msgData.Module + "/" + msgData.Action 28 dataAction := msgData.Module + "/" + msgData.Action
25 switch dataAction { 29 switch dataAction {
26 case "xiangmi.order/ship": 30 case "xiangmi.order/ship":
@@ -29,6 +33,13 @@ func DataFromXiangMi(message *sarama.ConsumerMessage) error { @@ -29,6 +33,13 @@ func DataFromXiangMi(message *sarama.ConsumerMessage) error {
29 e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err) 33 e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
30 return e 34 return e
31 } 35 }
  36 + // TODO 统一操作小程序订单
  37 + case "wxapplet.order/ship":
  38 + err = syncBestshopOrder(msgData.Data)
  39 + if err != nil {
  40 + e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
  41 + return e
  42 + }
32 default: 43 default:
33 logs.Error("未找到执行动作:Module=%s,Action=%s", msgData.Module, msgData.Action) 44 logs.Error("未找到执行动作:Module=%s,Action=%s", msgData.Module, msgData.Action)
34 } 45 }
  1 +/**
  2 + @author: stevechan
  3 + @date: 2021/3/11
  4 + @note:
  5 +**/
  6 +
  7 +package configs
  8 +
  9 +type MqConfig struct {
  10 + Topics []string `json:"topics"`
  11 + Servers []string `json:"servers"`
  12 + ConsumerId string `json:"consumerGroup"`
  13 +}
  14 +
  15 +var Cfg = MqConfig{
  16 + Topics: []string{"xiangmi_project_dev"},
  17 + Servers: []string{"127.0.0.1:9092"},
  18 + ConsumerId: "test",
  19 +}
  1 +/**
  2 + @author: stevechan
  3 + @date: 2021/3/11
  4 + @note:
  5 +**/
  6 +
  7 +package sync_order
  8 +
  9 +import (
  10 + "encoding/json"
  11 + "fmt"
  12 + "github.com/Shopify/sarama"
  13 + "github.com/gavv/httpexpect"
  14 + "github.com/go-pg/pg"
  15 + pg2 "github.com/go-pg/pg/v10"
  16 + . "github.com/onsi/ginkgo"
  17 + . "github.com/onsi/gomega"
  18 + pG "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg"
  19 + "gitlab.fjmaimaimai.com/mmm-go/partnermg/test/integration/consumer/sync_order/configs"
  20 + "net/http"
  21 + "strconv"
  22 + "time"
  23 +)
  24 +
  25 +var (
  26 + cfg *configs.MqConfig
  27 + producer sarama.SyncProducer
  28 +)
  29 +
  30 +func produce(topic string, key string, content interface{}) error {
  31 + data, _ := json.Marshal(content)
  32 +
  33 + msg := &sarama.ProducerMessage{
  34 + Topic: topic,
  35 + Key: sarama.StringEncoder(key),
  36 + Value: sarama.ByteEncoder(data),
  37 + Timestamp: time.Now(),
  38 + }
  39 +
  40 + _, _, err2 := producer.SendMessage(msg)
  41 + if err2 != nil {
  42 + msg1 := fmt.Sprintf("Send Error topic: %v. key: %v. content: %v", topic, key, content)
  43 + fmt.Println(msg1)
  44 + return err2
  45 + }
  46 +
  47 + fmt.Printf("Send OK topic:%s key:%s value:%s\n", topic, key, content)
  48 +
  49 + return nil
  50 +}
  51 +
  52 +var _ = Describe("同步小程序订单", func() {
  53 + var companyId int64
  54 + var partnerId int64
  55 + BeforeEach(func() {
  56 + fmt.Print("init kafka producer, it may take a few seconds to init the connection\n")
  57 + var err error
  58 + cfg = &configs.Cfg
  59 + mqConfig := sarama.NewConfig()
  60 + mqConfig.Producer.Return.Successes = true
  61 + mqConfig.Version = sarama.V0_10_2_1
  62 + if err = mqConfig.Validate(); err != nil {
  63 + msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", *cfg, err)
  64 + fmt.Println(msg)
  65 + panic(msg)
  66 + }
  67 + producer, err = sarama.NewSyncProducer(cfg.Servers, mqConfig)
  68 + if err != nil {
  69 + msg := fmt.Sprintf("Kafak producer create fail. err: %v", err)
  70 + fmt.Println(msg)
  71 + panic(msg)
  72 + }
  73 +
  74 + // 新增公司
  75 + _, err2 := pG.DB.QueryOne(
  76 + pg.Scan(&companyId),
  77 + "INSERT INTO company (name, phone, logo, admin_company, status, create_at, update_at, delete_at, remarks, enable, abbreviation, applets) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
  78 + "测试公司", "13162672794", "http://suplus-file-dev.fjmaimaimai.com/upload/image/2020091709304822611.jpg", 431, Default, "2020-09-17 09:30:05.588579+08", "2020-09-17 09:30:05.588579+08", pg2.NullTime{}, "", "测试", 1, "公司", []map[string]interface{}{{"id": "gh_18eb644002fb", "url": "pages/index/index", "name": "海鲜干货啦啦啦啦啦", "imageUrl": ""}})
  79 + Expect(err2).NotTo(HaveOccurred())
  80 +
  81 + // 新增合伙人
  82 + _, err3 := pG.DB.QueryOne(
  83 + pg.Scan(&partnerId),
  84 + "INSERT INTO partner_info (partner_name, account, password, status, partner_category, region_info, cooperate_time, create_at, update_at, salesman, partner_category_infos, company_id, remark) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING id",
  85 + "test_partner", "19121619631", "7c4a8d09ca3762af61e59520943dc26494f8941b", 1, 1, map[string]interface{}{"regionName": "123"}, "2020-09-01 00:00:00+08", "2020-09-01 00:00:00+08", "2020-09-01 00:00:00+08", []map[string]interface{}{{"name": "123", "telephone": "13322223333"}}, []map[string]interface{}{{"id": 2, "code": "001"}}, 1, "测试合伙人")
  86 + Expect(err3).NotTo(HaveOccurred())
  87 +
  88 + key := strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
  89 + value := map[string]interface{}{
  90 + "module": "wxapplet.order",
  91 + "action": "ship",
  92 + "data": map[string]interface{}{
  93 + "orderCode": "1234567895",
  94 + "orderCount": 1,
  95 + "orderAmount": 2,
  96 + "orderTime": "2021-03-05 12:01:54",
  97 + "orderState": 1,
  98 + "deliveryState": 1,
  99 + "buyerName": "陈志颖",
  100 + "buyerPhone": "19121619631",
  101 + "buyerAddress": "福建省 福州市 仓山区",
  102 + "buyerRemark": "不辣",
  103 + "buyerId": 123,
  104 + "partnerId": partnerId,
  105 + "goods": []map[string]interface{}{
  106 + {
  107 + "sn": "wertyuf",
  108 + "id": 1,
  109 + "bn": "a;slkdjgaj",
  110 + "name": "地瓜",
  111 + "price": 2,
  112 + "nums": 1,
  113 + "amount": 2,
  114 + },
  115 + },
  116 + "companyId": companyId,
  117 + "deliveryTime": "2021-03-05 12:01:54",
  118 + "orderArea": "福建省 福州市 台江区",
  119 + "wxAppletId": "gh_18eb644002fb",
  120 + },
  121 + }
  122 + err4 := produce(cfg.Topics[0], key, value)
  123 + Expect(err4).NotTo(HaveOccurred())
  124 + })
  125 + Describe("提交小程序订单消息", func() {
  126 + Context("提交正确的小程序订单", func() {
  127 + It("返回正确的订单数据", func() {
  128 + httpExpect := httpexpect.New(GinkgoT(), server.URL)
  129 + httpExpect.GET("").
  130 + Expect().
  131 + Status(http.StatusOK).
  132 + JSON().
  133 + Object().
  134 + ContainsKey("code").ValueEqual("code", 0)
  135 + })
  136 + })
  137 + })
  138 + AfterEach(func() {
  139 + _, err1 := pG.DB.Exec("DELETE FROM order_bestshop WHERE true")
  140 + Expect(err1).NotTo(HaveOccurred())
  141 + _, err2 := pG.DB.Exec("DELETE FROM company WHERE true")
  142 + Expect(err2).NotTo(HaveOccurred())
  143 + _, err3 := pG.DB.Exec("DELETE FROM partner_info WHERE true")
  144 + Expect(err3).NotTo(HaveOccurred())
  145 + })
  146 +})
  1 +/**
  2 + @author: stevechan
  3 + @date: 2021/3/11
  4 + @note:
  5 +**/
  6 +
  7 +package sync_order
  8 +
  9 +import (
  10 + "github.com/astaxie/beego"
  11 + "net/http"
  12 + "net/http/httptest"
  13 + "testing"
  14 +
  15 + . "github.com/onsi/ginkgo"
  16 + . "github.com/onsi/gomega"
  17 + _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/infrastructure/pg"
  18 + _ "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/beego"
  19 +)
  20 +
  21 +func TestColumnSetting(t *testing.T) {
  22 + RegisterFailHandler(Fail)
  23 + RunSpecs(t, "Kafka Consumer Sync Order Correlations Test Case Suite")
  24 +}
  25 +
  26 +var handler http.Handler
  27 +var server *httptest.Server
  28 +
  29 +// 初始化kafka
  30 +var _ = BeforeSuite(func() {
  31 + handler = beego.BeeApp.Handlers
  32 + server = httptest.NewServer(handler)
  33 +})
  34 +
  35 +var _ = AfterSuite(func() {
  36 + server.Close()
  37 +})