producer.go 5.1 KB
package kafkax

import (
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
	"log"
	"strings"
	"time"
)

// sarame kafka 消息生产
type KafkaMessageProducer struct {
	KafkaHosts string
	LogInfo    models.LogInfo
}

// 同步发送
func (engine *KafkaMessageProducer) Publish(messages []*models.Message, option map[string]interface{}) (*models.MessagePublishResult, error) {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Retry.Max = 10
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Version = sarama.V0_11_0_0
	brokerList := strings.Split(engine.KafkaHosts, ",")
	producer, err := sarama.NewSyncProducer(brokerList, config)
	if err != nil {
		return nil, err
	}
	defer func() {
		if err := producer.Close(); err != nil {
			log.Println(err)
		}
	}()
	var successMessageIds []int64
	var errMessageIds []int64
	for _, message := range messages {
		if value, err := json.Marshal(message); err == nil {
			msg := &sarama.ProducerMessage{
				Topic:     message.Topic,
				Value:     sarama.StringEncoder(value),
				Timestamp: time.Now(),
			}
			partition, offset, err := producer.SendMessage(msg)
			if err != nil {
				errMessageIds = append(errMessageIds, message.Id)
				log.Println(err)
			} else {
				successMessageIds = append(successMessageIds, message.Id)
				var append = make(map[string]interface{})
				append["topic"] = message.Topic
				append["partition"] = partition
				append["offset"] = offset
				log.Println("kafka消息发送", append)
			}
		}
	}
	return &models.MessagePublishResult{SuccessMessageIds: successMessageIds, ErrorMessageIds: errMessageIds}, nil
}

// 消息调度器
type MessageDispatcher struct {
	notifications     chan struct{}
	messageChan       chan *models.Message
	dispatchTicker    *time.Ticker
	messageRepository models.MessageRepository
	producer          models.MessageProducer
}

func (dispatcher *MessageDispatcher) MessagePublishedNotice() error {
	time.Sleep(time.Second * 2)
	dispatcher.notifications <- struct{}{}
	return nil
}

func (dispatcher *MessageDispatcher) MessagePublish(messages []*models.Message) error {
	for i := range messages {
		dispatcher.messageChan <- messages[i]
	}
	return nil
}

// go dispatcher.Dispatch() 启动一个独立协程
func (dispatcher *MessageDispatcher) Dispatch() {
	for {
		select {
		case <-dispatcher.dispatchTicker.C:
			go func(dispatcher *MessageDispatcher) {
				dispatcher.notifications <- struct{}{}
			}(dispatcher)
		case <-dispatcher.notifications:
			if dispatcher.messageRepository == nil {
				continue
			}
			messages, _ := dispatcher.messageRepository.FindNoPublishedStoredMessages()
			var messagesInProcessIds []int64
			for i := range messages {
				messagesInProcessIds = append(messagesInProcessIds, messages[i].Id)
			}
			if messages != nil && len(messages) > 0 {
				dispatcher.messageRepository.FinishMessagesStatus(messagesInProcessIds, int(models.InProcess))

				reuslt, err := dispatcher.producer.Publish(messages, nil)
				if err == nil && len(reuslt.SuccessMessageIds) > 0 {
					dispatcher.messageRepository.FinishMessagesStatus(reuslt.SuccessMessageIds, int(models.Finished))
				}
				//发送失败的消息ID列表 更新状态 进行中->未开始
				if len(reuslt.ErrorMessageIds) > 0 {
					dispatcher.messageRepository.FinishMessagesStatus(reuslt.ErrorMessageIds, int(models.UnFinished))
				}
			}
		case msg := <-dispatcher.messageChan:
			dispatcher.producer.Publish([]*models.Message{msg}, nil)
		}
	}
}

type MessageDirector struct {
	messageRepository models.MessageRepository
	dispatcher        *MessageDispatcher
}

func (d *MessageDirector) PublishMessages(messages []*models.Message) error {
	if d.dispatcher == nil {
		return fmt.Errorf("dispatcher还没有启动")
	}
	if d.messageRepository == nil {
		d.dispatcher.MessagePublish(messages)
		return nil
	}
	for _, message := range messages {
		if err := d.messageRepository.SaveMessage(message); err != nil {
			return err
		}
	}
	if err := d.dispatcher.MessagePublishedNotice(); err != nil {
		return err
	}
	return nil
}

// 消息发布器
// options["kafkaHosts"]="localhost:9092"
// options["timeInterval"]=time.Second*60*5
func NewMessageDirector(messageRepository models.MessageRepository, options map[string]interface{}) *MessageDirector {
	dispatcher := &MessageDispatcher{
		notifications:     make(chan struct{}),
		messageRepository: messageRepository,
		messageChan:       make(chan *models.Message, 100),
	}

	var hosts string
	if kafkaHosts, ok := options["kafkaHosts"]; ok {
		hosts = kafkaHosts.(string)
	} else {
		hosts = "localhost:9092"
	}
	dispatcher.producer = &KafkaMessageProducer{KafkaHosts: hosts, LogInfo: models.DefaultLog}

	if interval, ok := options["timeInterval"]; ok {
		dispatcher.dispatchTicker = time.NewTicker(interval.(time.Duration))
	} else {
		dispatcher.dispatchTicker = time.NewTicker(time.Second * 60 * 5)
	}
	go dispatcher.Dispatch()

	return &MessageDirector{
		messageRepository: messageRepository,
		dispatcher:        dispatcher,
	}
}