作者 唐旭辉

提交

@@ -96,7 +96,7 @@ func NewRuner() *Runer { @@ -96,7 +96,7 @@ func NewRuner() *Runer {
96 func (r *Runer) InitConsumer() error { 96 func (r *Runer) InitConsumer() error {
97 config := sarama.NewConfig() 97 config := sarama.NewConfig()
98 //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin 98 //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
99 - config.Consumer.Offsets.Initial = sarama.OffsetNewest 99 + config.Consumer.Offsets.Initial = sarama.OffsetOldest
100 config.Version = sarama.V0_10_2_0 100 config.Version = sarama.V0_10_2_0
101 consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config) 101 consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
102 if err != nil { 102 if err != nil {