producer_test.go 3.2 KB
package message

import (
	"github.com/go-pg/pg/v10"
	"github.com/tiptok/gocomm/identity/idgen"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/constant"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/kafkax"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/message/models"
	pgDB "gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/pg/transaction"
	"gitlab.fjmaimaimai.com/mmm-go/partner/pkg/infrastructure/utils"
	"testing"
	"time"
)

// 发布消息本地持久化
func TestNewMessageProducer(t *testing.T) {
	var (
		transactionContext = transaction.NewPGTransactionContext(pgDB.DB)
		err                error
	)

	producer := NewMessageProducer(NewPgMessageRepository(transactionContext), map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS})
	err = producer.PublishMessages([]*models.Message{
		&models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0},
	})

	if err != nil {
		return
	}
	time.Sleep(time.Second * 2)
}

// 发布消息无本地持久化
func TestNewMessageProducerNoRepository(t *testing.T) {
	var (
		err error
	)

	producer := NewMessageProducer(nil, map[string]interface{}{"kafkaHosts": constant.KAFKA_HOSTS})
	err = producer.PublishMessages([]*models.Message{
		&models.Message{Id: idgen.Next(), Topic: "chat", MsgTime: time.Now().Unix(), Value: "hello world! tip tip!", FinishStatus: 0},
	})

	if err != nil {
		return
	}
	time.Sleep(time.Second * 2)
}

// 简单发布消息
func TestSampleProducer(t *testing.T) {
	var producer models.MessageProducer = &kafkax.KafkaMessageProducer{
		KafkaHosts: constant.KAFKA_HOSTS,
	}
	_, err := producer.Publish([]*models.Message{{Id: 22, Topic: "mmm_xcx_orders", MsgTime: time.Now().Unix(), Value: "hello ccc20201009!"}}, nil)
	if err != nil {
		t.Fatal(err)
	}
}

type PgMessageRepository struct {
	transactionContext *transaction.TransactionContext
}

func (repository *PgMessageRepository) SaveMessage(message *models.Message) error {
	sql := `insert into sys_message_produce (id,topic,value,msg_time,status)values(?,?,?,?,?)`
	_, err := repository.transactionContext.PgDd.Exec(sql, message.Id, message.Topic, utils.JsonAssertString(message), message.MsgTime, int64(models.UnFinished))
	return err
}
func (repository *PgMessageRepository) FindNoPublishedStoredMessages() ([]*models.Message, error) {
	sql := `select value from sys_message_produce where status=?`
	var values []string
	_, e := repository.transactionContext.PgDd.Query(&values, sql, int64(models.UnFinished))
	var messages = make([]*models.Message, 0)
	if e != nil {
		return messages, nil
	}
	for _, v := range values {
		item := &models.Message{}
		utils.JsonUnmarshal(v, item)
		if item.Id != 0 {
			messages = append(messages, item)
		}
	}
	return messages, nil
}
func (repository *PgMessageRepository) FinishMessagesStatus(messageIds []int64, finishStatus int) error {
	_, err := repository.transactionContext.PgDd.Exec("update sys_message_produce set status=? where id in (?)", finishStatus, pg.In(messageIds))
	return err
}
func NewPgMessageRepository(transactionContext *transaction.TransactionContext) *PgMessageRepository {
	return &PgMessageRepository{
		transactionContext: transactionContext,
	}
}