作者 yangfu

Merge remote-tracking branch 'origin/dev'

@@ -14,7 +14,7 @@ require ( @@ -14,7 +14,7 @@ require (
14 //github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e 14 //github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e
15 //github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5 15 //github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5
16 //github.com/openzipkin/zipkin-go v0.2.5 16 //github.com/openzipkin/zipkin-go v0.2.5
17 - github.com/tiptok/gocomm v1.0.6 17 + github.com/tiptok/gocomm v1.0.9
18 ) 18 )
19 19
20 -//replace github.com/tiptok/gocomm v1.0.5 => F:\go\src\learn_project\gocomm 20 +//replace github.com/tiptok/gocomm v1.0.8 => F:\go\src\learn_project\gocomm
@@ -57,5 +57,8 @@ func main() { @@ -57,5 +57,8 @@ func main() {
57 //} 57 //}
58 log.Debug(constant.REDIS_HOST, constant.REDIS_PORT) 58 log.Debug(constant.REDIS_HOST, constant.REDIS_PORT)
59 59
  60 + //go message.RunConsumer()
  61 + //go message.RunPublish()
  62 +
60 beego.Run() 63 beego.Run()
61 } 64 }
@@ -10,9 +10,9 @@ type CreateClientVersionRequest struct { @@ -10,9 +10,9 @@ type CreateClientVersionRequest struct {
10 // 提交人 10 // 提交人
11 //Commiter int64 `json:"commiter,omitempty"` 11 //Commiter int64 `json:"commiter,omitempty"`
12 // 项目名称 12 // 项目名称
13 - ProjectName string `json:"projectName,omitempty"` 13 + ProjectName string `json:"projectName,omitempty" valid:"Required"`
14 // 版本号 14 // 版本号
15 - Version string `json:"version,omitempty"` 15 + Version string `json:"version,omitempty" valid:"Required"`
16 // 标题 16 // 标题
17 Title string `json:"title,omitempty"` 17 Title string `json:"title,omitempty"`
18 // 其他备注信息 18 // 其他备注信息
@@ -163,11 +163,15 @@ func (svr *ClientVersionService) ListClientVersion(header *protocol.RequestHeade @@ -163,11 +163,15 @@ func (svr *ClientVersionService) ListClientVersion(header *protocol.RequestHeade
163 if total, clientVersion, err = ClientVersionRepository.Find(common.ObjectToMap(request)); err != nil { 163 if total, clientVersion, err = ClientVersionRepository.Find(common.ObjectToMap(request)); err != nil {
164 return 164 return
165 } 165 }
166 - rsp = map[string]interface{}{  
167 - "totalRow": total,  
168 - "pageNumber": (request.Offset + request.Limit) / request.Limit,  
169 - "lists": utils.LoadCustomField(clientVersion, "Id", "ProjectName", "Version", "Title", "Remark", "CreateTime"), 166 + rspData := map[string]interface{}{
  167 + "totalRow": total,
  168 + "lists": utils.LoadCustomField(clientVersion, "Id", "ProjectName", "Version", "Title", "Remark", "ClientPackageInfo", "CreateTime"),
170 } 169 }
  170 + if request.Limit > 0 {
  171 + rspData["pageNumber"] = (request.Offset + request.Limit) / request.Limit
  172 + }
  173 + rsp = rspData
  174 +
171 err = transactionContext.CommitTransaction() 175 err = transactionContext.CommitTransaction()
172 return 176 return
173 } 177 }
  1 +package message
  2 +
  3 +import (
  4 + "github.com/tiptok/gocomm/pkg/broker"
  5 + "github.com/tiptok/gocomm/pkg/broker/local"
  6 + "github.com/tiptok/gocomm/pkg/broker/models"
  7 + "github.com/tiptok/gocomm/pkg/log"
  8 + "gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/domain"
  9 + "gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/infrastructure/pg"
  10 + "time"
  11 +)
  12 +
  13 +func RunPublish() {
  14 + var (
  15 + err error
  16 + )
  17 +
  18 + producer := broker.NewMessageProducer(
  19 + models.WithKafkaHost(KAFKA_HOSTS),
  20 + models.WithMessageProduceRepository(local.NewPgMessageProduceRepository(pg.DB, nil)),
  21 + )
  22 + for {
  23 + messages := []interface{}{
  24 + domain.Users{
  25 + Id: 20200129,
  26 + Name: "user_2020",
  27 + Phone: "18860183058",
  28 + Status: 1,
  29 + Roles: []int64{7, 8},
  30 + CreateTime: time.Now(),
  31 + UpdateTime: time.Now(),
  32 + },
  33 + }
  34 + err = producer.Publish(TOPIC_BUSSINESS_ADMIN, messages, models.WithMessageProduceRepository(local.NewPgMessageProduceRepository(pg.DB, nil)))
  35 + if err != nil {
  36 + log.Error(err)
  37 + }
  38 + log.Debug("publish :", messages)
  39 + time.Sleep(time.Hour * 24)
  40 + }
  41 +}
  1 +package message
  2 +
  3 +import (
  4 + "github.com/tiptok/gocomm/common"
  5 + "github.com/tiptok/gocomm/pkg/broker/kafkax"
  6 + "github.com/tiptok/gocomm/pkg/broker/local"
  7 + "github.com/tiptok/gocomm/pkg/broker/models"
  8 + "github.com/tiptok/gocomm/pkg/log"
  9 + "gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/domain"
  10 + "gitlab.fjmaimaimai.com/mmm-go/godevp/pkg/infrastructure/pg"
  11 +)
  12 +
  13 +const (
  14 + KAFKA_HOSTS = "106.52.15.41:9092" // "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092" //"106.52.15.41:9092" //
  15 + GroupId = "godevp_test" // "calf_basic_test" //"godevp"
  16 + TOPIC_BUSSINESS_ADMIN = "mmm_business_test"
  17 +)
  18 +
  19 +func RunConsumer() {
  20 + saramaConsumer := kafkax.NewSaramaConsumer(KAFKA_HOSTS, GroupId) //models.WithHandlerOriginalMessageFlag(true) models.WithVersion("0.10.2.1")
  21 + saramaConsumer.WithTopicHandler(TOPIC_BUSSINESS_ADMIN, UserLoginHandler)
  22 + saramaConsumer.WithMessageReceiver(local.NewPgMessageReceiverRepository(pg.DB, nil)) // 持久化
  23 +
  24 + err := saramaConsumer.StartConsume()
  25 + if err != nil {
  26 + log.Error(err)
  27 + }
  28 +}
  29 +
  30 +func UserLoginHandler(message interface{}) error {
  31 + msg, ok := message.(*models.Message)
  32 + if ok {
  33 + var user = &domain.Users{}
  34 + common.JsonUnmarshal(string(msg.Value), user)
  35 + log.Info("消费消息:", msg.Id, msg.Topic, msg.Value)
  36 + log.Info("登录用户信息:", user.Id, user.Name)
  37 + }
  38 + return nil
  39 +}