xiangmi.go 1.6 KB
package handles

import (
	"encoding/json"
	"fmt"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	syncOrderCmd "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/command"
	syncOrderSrv "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/service"
)

type DataFromMessage struct {
	Module string          `json:"module"`
	Action string          `json:"action"`
	Data   json.RawMessage `json:"data"`
}

func DataFromXiangMi(message *sarama.ConsumerMessage) error {
	logs.Info("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
		message.Timestamp, message.Topic, message.Offset, string(message.Value))
	var (
		msgData DataFromMessage
		err     error
	)
	err = json.Unmarshal(message.Value, &msgData)
	if err != nil {
		return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
	}
	err = SyncBestshopOrder(msgData.Data)
	if err != nil {
		e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
		return e
	}
	return nil
}

//SyncBestshopOrder 同步
func SyncBestshopOrder(data []byte) error {

	var (
		cmd syncOrderCmd.CreateOrderFromBestshop
		err error
	)
	err = json.Unmarshal(data, &cmd)
	if err != nil {
		return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
	}
	if cmd.PartnerId <= 0 {
		logs.Info("[Consumer][SyncBestshopOrder] PartnerId<=0 ,不处理消息")
		return nil
	}
	srv := syncOrderSrv.NewOrderInfoService(nil)
	err = srv.SyncOrderFromBestshop(cmd)
	if err != nil {
		e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
		return e
	}
	return err
}