topic_handles.go 1.0 KB
package consumer

import (
	"encoding/json"
	"fmt"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	syncOrderCmd "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/command"
	syncOrderSrv "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/service"
)

//SyncBestshopOrder 同步
func SyncBestshopOrder(message *sarama.ConsumerMessage) error {
	logs.Info("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
		message.Timestamp, message.Topic, message.Offset, string(message.Value))
	var (
		cmd syncOrderCmd.CreateOrderFromBestshop
		err error
	)
	err = json.Unmarshal(message.Value, &cmd)
	if err != nil {
		return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
	}
	if cmd.PartnerId <= 0 {
		logs.Info("[Consumer][SyncBestshopOrder] PartnerId<=0 ,不处理消息")
		return nil
	}
	srv := syncOrderSrv.NewOrderInfoService(nil)
	err = srv.SyncOrderFromBestshop(cmd)
	e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
	return e
}