xiangmi.go 1.9 KB
package handles

import (
	"encoding/json"
	"fmt"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	syncOrderCmd "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/command"
	syncOrderSrv "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/service"
)

func DataFromXiangMi(message *sarama.ConsumerMessage) error {
	logs.Info("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
		message.Timestamp, message.Topic, message.Offset, string(message.Value))

	var (
		msgData DataFromMessage
		err     error
	)

	err = json.Unmarshal(message.Value, &msgData)
	if err != nil {
		return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
	}

	// TODO 使用小程序id作为module
	dataAction := msgData.Module + "/" + msgData.Action
	switch dataAction {
	case "xiangmi.order/ship":
		err = syncBestshopOrder(msgData.Data)
		if err != nil {
			e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
			return e
		}
	// TODO 统一操作小程序订单
	case "wxapplet.order/ship":
		err = syncBestshopOrder(msgData.Data)
		if err != nil {
			e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
			return e
		}
	default:
		logs.Error("未找到执行动作:Module=%s,Action=%s", msgData.Module, msgData.Action)
	}
	return nil
}

//SyncBestshopOrder 同步
func syncBestshopOrder(data []byte) error {
	var (
		cmd syncOrderCmd.CreateOrderFromBestshop
		err error
	)
	err = json.Unmarshal(data, &cmd)
	if err != nil {
		return fmt.Errorf("[Consumer][syncBestshopOrder] 解析kafka数据失败;%s", err)
	}
	if cmd.PartnerId <= 0 {
		logs.Info("[Consumer][syncBestshopOrder] PartnerId<=0 ,不处理消息")
		return nil
	}
	srv := syncOrderSrv.NewOrderInfoService(nil)
	err = srv.SyncOrderFromBestshop(cmd)
	if err != nil {
		e := fmt.Errorf("[Consumer][syncBestshopOrder] %s", err)
		return e
	}
	return err
}