作者 唐旭辉

更新

... ... @@ -22,3 +22,6 @@ func init() {
KafkaCfg.Servers = strings.Split(kafkaHost, ";")
}
}
// "192.168.190.136:9092",
// "106.52.15.41:9092"
... ...
package configs
import (
"os"
"strings"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/constant"
)
... ... @@ -17,17 +14,5 @@ var Cfg = MqConfig{
ConsumerId: constant.KafkaCfg.ConsumerId,
}
func init() {
Cfg = MqConfig{
Servers: []string{"106.52.15.41:9092"},
ConsumerId: "partnermg",
}
if os.Getenv("KAFKA_HOST") != "" {
kafkaHost := os.Getenv("KAFKA_HOST")
Cfg.Servers = strings.Split(kafkaHost, ";")
}
}
// "192.168.190.136:9092",
// "106.52.15.41:9092"
... ...
... ... @@ -59,9 +59,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
}
if err = topicHandle(message); err != nil {
logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
} else {
groupSession.MarkMessage(message, "")
}
groupSession.MarkMessage(message, "")
}
return nil
}
... ...
... ... @@ -20,13 +20,14 @@ func SyncBestshopOrder(message *sarama.ConsumerMessage) error {
)
err = json.Unmarshal(message.Value, &cmd)
if err != nil {
return fmt.Errorf("[SyncBestshopOrder] 解析kafka数据失败;%s", err)
return fmt.Errorf("[Consumer][SyncBestshopOrder] 解析kafka数据失败;%s", err)
}
if cmd.PartnerId <= 0 {
logs.Info("[SyncBestshopOrder] PartnerId<=0 ,不处理消息")
logs.Info("[Consumer][SyncBestshopOrder] PartnerId<=0 ,不处理消息")
return nil
}
srv := syncOrderSrv.NewOrderInfoService(nil)
err = srv.SyncOrderFromBestshop(cmd)
return err
e := fmt.Errorf("[Consumer][SyncBestshopOrder] %s", err)
return e
}
... ...