作者 庄敏学

add mqtt

@@ -5,6 +5,7 @@ go 1.16 @@ -5,6 +5,7 @@ go 1.16
5 require ( 5 require (
6 github.com/ajg/form v1.5.1 // indirect 6 github.com/ajg/form v1.5.1 // indirect
7 github.com/beego/beego/v2 v2.0.1 7 github.com/beego/beego/v2 v2.0.1
  8 + github.com/eclipse/paho.mqtt.golang v1.3.5
8 github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072 // indirect 9 github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072 // indirect
9 github.com/fatih/structs v1.1.0 // indirect 10 github.com/fatih/structs v1.1.0 // indirect
10 github.com/gavv/httpexpect v2.0.0+incompatible 11 github.com/gavv/httpexpect v2.0.0+incompatible
@@ -68,6 +68,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 @@ -68,6 +68,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
68 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= 68 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
69 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= 69 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
70 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= 70 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
  71 +github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
  72 +github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
71 github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= 73 github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
72 github.com/elastic/go-elasticsearch/v6 v6.8.5/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI= 74 github.com/elastic/go-elasticsearch/v6 v6.8.5/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
73 github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= 75 github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
@@ -474,6 +476,7 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL @@ -474,6 +476,7 @@ golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLL
474 golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 476 golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
475 golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 477 golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
476 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= 478 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
  479 +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
477 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= 480 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
478 golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= 481 golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
479 golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= 482 golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" 9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
  10 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/mqtt"
10 11
11 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" 12 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
12 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg" 13 _ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg"
@@ -30,7 +31,7 @@ func main() { @@ -30,7 +31,7 @@ func main() {
30 }) 31 })
31 log.Logger.AddHook(bw) 32 log.Logger.AddHook(bw)
32 redis.InitRedis() 33 redis.InitRedis()
33 - 34 + go mqtt.Start()
34 log.Logger.Info("server start!") 35 log.Logger.Info("server start!")
35 web.Run() 36 web.Run()
36 } 37 }
  1 +package constant
  2 +
  3 +import "os"
  4 +
  5 +//设备商提供的测试地址
  6 +var MQTT_HOST = "175.24.122.87"
  7 +//内网测试地址
  8 +//var MQTT_HOST = "192.168.100.222"
  9 +var MQTT_PORT = "1883"
  10 +
  11 +var MQTT_USER = ""
  12 +
  13 +var MQTT_PASSWORD = ""
  14 +
  15 +func init(){
  16 + if os.Getenv("MQTT_HOST") != "" {
  17 + MQTT_HOST = os.Getenv("MQTT_HOST")
  18 + }
  19 + if os.Getenv("MQTT_PORT") != "" {
  20 + MQTT_PORT = os.Getenv("MQTT_PORT")
  21 + }
  22 + if os.Getenv("MQTT_USER") != "" {
  23 + MQTT_USER = os.Getenv("MQTT_USER")
  24 + }
  25 + if os.Getenv("MQTT_PASSWORD") != "" {
  26 + MQTT_PASSWORD = os.Getenv("MQTT_PASSWORD")
  27 + }
  28 +}
  1 +package mqtt
  2 +
  3 +import (
  4 + "fmt"
  5 + pahomqtt "github.com/eclipse/paho.mqtt.golang"
  6 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
  7 + "time"
  8 +)
  9 +
  10 +type ProduceClient struct {
  11 + client pahomqtt.Client
  12 +}
  13 +
  14 +func NewProduceClient() *ProduceClient {
  15 + return &ProduceClient{}
  16 +}
  17 +
  18 +func (produceClient *ProduceClient) options() *pahomqtt.ClientOptions {
  19 + opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
  20 + opts.SetUsername(constant.MQTT_USER)
  21 + opts.SetPassword(constant.MQTT_PASSWORD)
  22 + opts.SetKeepAlive(2 * time.Second)
  23 + opts.SetPingTimeout(1 * time.Second)
  24 + return opts
  25 +}
  26 +
  27 +func (produceClient *ProduceClient) connect(){
  28 + opts := produceClient.options()
  29 + produceClient.client = pahomqtt.NewClient(opts)
  30 + token := produceClient.client.Connect()
  31 + token.Wait()
  32 +}
  33 +
  34 +func (produceClient *ProduceClient) Publish(topic string,data interface{}) error {
  35 + produceClient.connect()
  36 + token := produceClient.client.Publish(topic,0,false,data)
  37 + token.Wait()
  38 + return token.Error()
  39 +}
  1 +package mqtt
  2 +
  3 +import (
  4 + "fmt"
  5 + pahomqtt "github.com/eclipse/paho.mqtt.golang"
  6 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
  7 + "time"
  8 +)
  9 +
  10 +type SubscribeClient struct {
  11 + topic string
  12 + handler pahomqtt.MessageHandler
  13 + client pahomqtt.Client
  14 +}
  15 +
  16 +type MessageHandler pahomqtt.MessageHandler
  17 +
  18 +func NewSubscribeClient() *SubscribeClient {
  19 + return &SubscribeClient{}
  20 +}
  21 +
  22 +func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
  23 + opts := pahomqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%v:%v", constant.MQTT_HOST, constant.MQTT_PORT))
  24 + opts.SetUsername(constant.MQTT_USER)
  25 + opts.SetPassword(constant.MQTT_PASSWORD)
  26 + opts.SetKeepAlive(2 * time.Second)
  27 + opts.SetPingTimeout(1 * time.Second)
  28 + return opts
  29 +}
  30 +
  31 +func (subscribeClient *SubscribeClient) Connect() *SubscribeClient{
  32 + opts := subscribeClient.options()
  33 + fmt.Println("start connect......")
  34 + opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
  35 + fmt.Println("Connect error:", err)
  36 + for {
  37 + fmt.Println("reconnect server")
  38 + token := subscribeClient.client.Connect()
  39 + token.Wait()
  40 + fmt.Println("server Connect status:",subscribeClient.client.IsConnectionOpen())
  41 + if subscribeClient.client.IsConnectionOpen() {
  42 + break
  43 + }
  44 + time.Sleep(3 * time.Second)
  45 + }
  46 + }
  47 + opts.OnConnect = func(c pahomqtt.Client) {
  48 + c.Subscribe(subscribeClient.topic,0,subscribeClient.handler)
  49 + }
  50 + subscribeClient.client = pahomqtt.NewClient(opts)
  51 + token := subscribeClient.client.Connect()
  52 + token.Wait()
  53 + return subscribeClient
  54 +}
  55 +
  56 +func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler){
  57 + subscribeClient.topic = topic
  58 + subscribeClient.handler = messageHandler
  59 + token := subscribeClient.client.Subscribe(topic,0,messageHandler)
  60 + token.Wait()
  61 + token.Done()
  62 +}
  63 +
  64 +func StartSubscribe(topic string,handler MessageHandler){
  65 + defer func() {
  66 + if err := recover();err != nil {
  67 + fmt.Println(err)
  68 + StartSubscribe(topic,handler)
  69 + }
  70 + }()
  71 + fmt.Println("start subscribe...")
  72 + NewSubscribeClient().Connect().Subscribe(topic,pahomqtt.MessageHandler(handler))
  73 +}
  1 +package mqtt
  2 +
  3 +import (
  4 + "fmt"
  5 + pahomqtt "github.com/eclipse/paho.mqtt.golang"
  6 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/mqtt"
  7 + "strconv"
  8 + "time"
  9 +)
  10 +
  11 +func Start(){
  12 + mqtt.StartSubscribe("test", func(client pahomqtt.Client, message pahomqtt.Message) {
  13 + fmt.Println(time.Now(),"Topic:"+message.Topic()+" MessageId:"+ strconv.Itoa(int(message.MessageID()))+" Message:"+ string(message.Payload()))
  14 + })
  15 +}