consumer.go 5.2 KB
package consumer

import (
	"context"
	"errors"
	"fmt"

	"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"

	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
	cluster "github.com/bsm/sarama-cluster"
)

//MessageConsumer 消息消费者
type MessageConsumer struct {
	ready         chan struct{}
	kafkaHosts    []string
	groupId       string
	topics        []string
	topicsHandles map[string]TopicHandle
	// beforeHandles []TopicHandle
	// afterHandles  []TopicHandle
}

//实现对应的接口
var _ sarama.ConsumerGroupHandler = (*MessageConsumer)(nil)

func (c *MessageConsumer) Setup(groupSession sarama.ConsumerGroupSession) error {
	close(c.ready)
	return nil
}

func (c *MessageConsumer) Cleanup(groupSession sarama.ConsumerGroupSession) error {
	return nil
}

func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
	groupClaim sarama.ConsumerGroupClaim) error {
	var (
		topicHandle TopicHandle
		err         error
	)
	for message := range groupClaim.Messages() {
		logs.Debug("Done Message claimed:  timestamp = %v, topic = %s offset = %v value = %v \n",
			message.Timestamp, message.Topic, message.Offset, string(message.Value))
		if topicHandle, err = c.FindTopichandle(groupClaim.Topic()); err != nil {
			logs.Error("FindTopichandle err:%s \n", err)
			continue
		}
		if err = topicHandle(message); err != nil {
			logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
		}
		groupSession.MarkMessage(message, "")
	}
	return nil
}

func (c *MessageConsumer) FindTopichandle(topic string) (TopicHandle, error) {
	if v, ok := c.topicsHandles[topic]; ok {
		return v, nil
	}
	return nil, errors.New("TopicHandle not found")
}

type Runer struct {
	msgConsumer   *MessageConsumer
	consumerGroup sarama.ConsumerGroup
	Consumer      *cluster.Consumer
}

func NewRuner() *Runer {
	topics := []string{}
	for key := range TopicHandleRouters {
		topics = append(topics, key)
	}
	r := &Runer{
		msgConsumer: &MessageConsumer{
			ready:         make(chan struct{}),
			kafkaHosts:    configs.Cfg.Servers,
			groupId:       configs.Cfg.ConsumerId,
			topicsHandles: TopicHandleRouters,
			topics:        topics,
			// beforeHandles: BeforeHandles,
			// afterHandles:  AfterHandles,
		},
	}
	logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts,
		r.msgConsumer.topics, r.msgConsumer.groupId)
	return r
}

// func (r *Runer) InitConsumer() error {
// 	config := sarama.NewConfig()
// 	//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// 	config.Consumer.Offsets.Initial = sarama.OffsetOldest
// 	config.Version = sarama.V0_10_2_1
// 	if err := config.Validate(); err != nil {
// 		msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
// 		logs.Error(msg)
// 		panic(msg)
// 	}

// 	consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
// 	if err != nil {
// 		return err
// 	}
// 	r.consumerGroup = consumerGroup
// 	return nil
// }

func (r *Runer) InitConsumer() error {
	clusterCfg := cluster.NewConfig()
	clusterCfg.Consumer.Return.Errors = true
	clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
	clusterCfg.Group.Return.Notifications = true
	clusterCfg.Version = sarama.V0_10_2_1
	khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
	groupid := "partnermg_dev"
	topic := []string{"topic_test"}
	logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics)
	// consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
	consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
	if err != nil {
		msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
		logs.Error(msg)
		panic(msg)
	}
	r.Consumer = consumer
	return nil
}

// func (r *Runer) Start(ctx context.Context) {
// 	defer func() {
// 		if e := recover(); e != nil {
// 			logs.Error(e)
// 		}
// 	}()
// 	for {
// 		select {
// 		case <-ctx.Done():
// 			logs.Warning("ctx cancel;consumerGroup.Close()")
// 			r.consumerGroup.Close()
// 			return
// 		default:
// 			if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
// 				logs.Error("consumerGroup err:%s \n", err)
// 				//等待重试
// 				timer := time.NewTimer(5 * time.Second)
// 				<-timer.C
// 			}
// 			r.msgConsumer.ready = make(chan struct{})
// 		}

// 	}
// }

func (r *Runer) Start(ctx context.Context) {
	for {
		select {
		case msg, more := <-r.Consumer.Messages():
			if more {
				logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
				r.Consumer.MarkOffset(msg, "") // mark message as processed
			}
		case err, more := <-r.Consumer.Errors():
			if more {
				logs.Info("Kafka consumer error: %v", err.Error())
			}
		case ntf, more := <-r.Consumer.Notifications():
			if more {
				logs.Info("Kafka consumer rebalance: %v", ntf)
			}
		case <-ctx.Done():
			logs.Info("Stop consumer server...")
			r.Consumer.Close()
			return
		}
	}
}

func (r *Runer) IsReady() <-chan struct{} {
	return r.msgConsumer.ready
}