作者 唐旭辉

调试

... ... @@ -95,9 +95,9 @@ func NewRuner() *Runer {
func (r *Runer) InitConsumer() error {
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V0_10_2_1
config.Version = sarama.V0_10_2_0
consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
if err != nil {
return err
... ...
... ... @@ -21,7 +21,7 @@ func init() {
var err error
mqConfig := sarama.NewConfig()
mqConfig.Producer.Return.Successes = true
mqConfig.Version = sarama.V0_10_2_1
mqConfig.Version = sarama.V0_10_2_0
if err = mqConfig.Validate(); err != nil {
msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
logs.Info(msg)
... ...