message.go 2.3 KB
package factory

import (
	"fmt"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-cooperation/pkg/log"

	"github.com/linmadan/egglib-go/core/application"
	"github.com/linmadan/egglib-go/message/publisher/kafka_message/sarama"
	localMessagePublisher "github.com/linmadan/egglib-go/message/publisher/local_message"
	localMessageReceiver "github.com/linmadan/egglib-go/message/receiver/local_message"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-cooperation/pkg/constant"
)

func CreateMessagePublisher(options map[string]interface{}) (application.MessagePublisher, error) {
	if localMessageOptions, ok := options["localMessageOptions"]; ok {
		var storeType string
		var storeOptions map[string]interface{}
		if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok {
			storeType = value.(string)
		} else {
			return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeType")
		}
		if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok {
			storeOptions = value.(map[string]interface{})
		} else {
			return nil, fmt.Errorf("LocalMessagePublisher缺少参数storeOptions")
		}
		return localMessagePublisher.NewLocalMessagePublisher(storeType, storeOptions)
	} else {
		return sarama.NewKafkaSaramaMessagePublisher(constant.KAFKA_HOSTS, log.Logger)
	}
}

func CreateMessageReceiver(options map[string]interface{}) (application.MessageReceiver, error) {
	if localMessageOptions, ok := options["localMessageOptions"]; ok {
		var converterType string
		if value, ok := localMessageOptions.(map[string]interface{})["converterType"]; ok {
			converterType = value.(string)
		} else {
			return nil, fmt.Errorf("LocalMessageReceiver缺少参数converterType")
		}
		var storeType string
		var storeOptions map[string]interface{}
		if value, ok := localMessageOptions.(map[string]interface{})["storeType"]; ok {
			storeType = value.(string)
		} else {
			return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeType")
		}
		if value, ok := localMessageOptions.(map[string]interface{})["storeOptions"]; ok {
			storeOptions = value.(map[string]interface{})
		} else {
			return nil, fmt.Errorf("LocalMessageReceiver缺少参数storeOptions")
		}
		return localMessageReceiver.NewLocalMessageReceiver(converterType, nil, storeType, storeOptions)
	} else {
		return nil, fmt.Errorf("缺少参数localMessageOptions")
	}
}