package consumer import ( "encoding/json" "fmt" "github.com/Shopify/sarama" "github.com/astaxie/beego/logs" syncOrderCmd "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/command" syncOrderSrv "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/application/syncOrder/service" ) //SyncBestshopOrder 同步 func SyncBestshopOrder(message *sarama.ConsumerMessage) error { logs.Info("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n", message.Timestamp, message.Topic, message.Offset, string(message.Value)) var ( cmd syncOrderCmd.CreateOrderFromBestshop err error ) err = json.Unmarshal(message.Value, &cmd) if err != nil { return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err) } if cmd.PartnerId <= 0 { logs.Info("[Consumer][SyncBestshopOrder] PartnerId<=0 ,不处理消息") return nil } srv := syncOrderSrv.NewOrderInfoService(nil) err = srv.SyncOrderFromBestshop(cmd) if err != nil { e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err) return e } return err }