作者 唐旭辉

更新

... ... @@ -19,16 +19,16 @@ func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
go func() {
logs.Info("应用启动")
beego.Run()
}()
closeConsumer, err := consumer.StartConsumer(ctx)
if err != nil {
fmt.Printf("启动kafka消息消费者失败 err%s \n", err)
logs.Error("启动kafka消息消费者失败:%s", err)
return
}
go func() {
logs.Info("应用启动")
beego.Run()
}()
for {
select {
case <-sigs:
... ...
... ... @@ -78,7 +78,7 @@ func CreateBusinessBonusRepository(options map[string]interface{}) (domain.Busin
return repository.NewBusinessBonusRepository(transactionContext)
}
//CreateOrderGoodBestshopRepository 海鲜干货订单商品信息
//CreateOrderGoodBestshopRepository 小米(海鲜干货改)的订单商品信息
func CreateOrderGoodBestshopRepository(options map[string]interface{}) (domain.OrderGoodBestshopRepository, error) {
var transactionContext *transaction.TransactionContext
if value, ok := options["transactionContext"]; ok {
... ... @@ -87,7 +87,7 @@ func CreateOrderGoodBestshopRepository(options map[string]interface{}) (domain.O
return repository.NewOrderGoodBestshopRepository(transactionContext)
}
//CreateOrderGoodBestshopRepository 海鲜干货订单信息
//CreateOrderGoodBestshopRepository 小米(海鲜干货改)订单信息
func CreateOrderBestshopRepository(options map[string]interface{}) (domain.OrderBestshopRepository, error) {
var transactionContext *transaction.TransactionContext
if value, ok := options["transactionContext"]; ok {
... ... @@ -96,7 +96,7 @@ func CreateOrderBestshopRepository(options map[string]interface{}) (domain.Order
return repository.NewOrderBestshopRepository(transactionContext)
}
//CreateOrderGoodBestshopRepository 海鲜干货订单信息
//CreateOrderGoodBestshopRepository小米(海鲜干货改)订单信息
func CreateOrderLogRepository(options map[string]interface{}) (domain.OrderLogRepository, error) {
var transactionContext *transaction.TransactionContext
if value, ok := options["transactionContext"]; ok {
... ...
... ... @@ -28,13 +28,14 @@ type OrderBestShop struct {
//d订单总额
OrderAmount float64 `json:"orderAmount"`
//发货时间
DeliveryTime time.Time `json:"deliveryTime"`
DeliveryTime string `json:"deliveryTime"`
//创建时间
CreateTime time.Time `json:"createTime"`
PartnerId int64 `json:"partnerId"`
Goods []OrderGoodBestShop `json:"goods"`
//是否将数据同步到 order_base ,order_good
IsCopy bool `json:"isCopy"`
CompanyId int64 `json:"companyId"`
}
func (order OrderBestShop) CopyToOrderBase(o *OrderBase) {
... ... @@ -51,7 +52,7 @@ func (order OrderBestShop) CopyToOrderBase(o *OrderBase) {
o.OrderCode = order.OrderCode
o.OrderCompute.PlanOrderAmount = order.OrderAmount
o.OrderCompute.PlanOrderCount = order.OrderCount
o.DeliveryTime = order.DeliveryTime
o.DeliveryTime, _ = time.Parse("2006-01-02 15:04:05", order.DeliveryTime)
return
}
... ...
... ... @@ -28,10 +28,11 @@ type OrderBestshop struct {
//d订单总额
OrderAmount float64
//发货时间
DeliveryTime time.Time
DeliveryTime string
//创建时间
CreateTime time.Time
PartnerId int64
//是否将数据同步到 order_base ,order_good
IsCopy bool `pg:",use_zero"`
CompanyId int64
}
... ...
... ... @@ -42,6 +42,7 @@ func (respository OrderBestshopRepository) transformPgModelToDomainModel(orderMo
CreateTime: orderModel.CreateTime,
PartnerId: orderModel.PartnerId,
IsCopy: orderModel.IsCopy,
CompanyId: orderModel.CompanyId,
}, nil
}
... ... @@ -63,6 +64,7 @@ func (respository OrderBestshopRepository) Add(order *domain.OrderBestShop) erro
CreateTime: time.Now(),
PartnerId: order.PartnerId,
IsCopy: order.IsCopy,
CompanyId: order.CompanyId,
}
_, err := tx.Model(&m).Insert()
order.Id = m.Id
... ... @@ -88,6 +90,7 @@ func (respository OrderBestshopRepository) Edit(order *domain.OrderBestShop) err
CreateTime: order.CreateTime,
PartnerId: order.PartnerId,
IsCopy: order.IsCopy,
CompanyId: order.CompanyId,
}
_, err := tx.Model(&m).Where("id=?", order.Id).Update()
order.Id = m.Id
... ...
package configs
import (
"os"
"strings"
)
type MqConfig struct {
Servers []string `json:"servers"`
ConsumerId string `json:"consumerGroup"`
... ... @@ -8,10 +13,15 @@ type MqConfig struct {
var Cfg MqConfig
func init() {
Cfg = MqConfig{
Servers: []string{"106.52.15.41:9092"},
ConsumerId: "partnermg",
}
if os.Getenv("KAFKA_HOST") != "" {
kafkaHost := os.Getenv("KAFKA_HOST")
Cfg.Servers = strings.Split(kafkaHost, ";")
}
}
// "192.168.190.136:9092",
... ...
... ... @@ -16,5 +16,5 @@ var TopicHandleRouters = map[string]TopicHandle{
message.Timestamp, message.Topic, message.Offset, string(message.Value))
return nil
},
"bestshop_order": SyncBestshopOrder,
"xiangmi-orders": SyncBestshopOrder,
}
... ...