正在显示
6 个修改的文件
包含
92 行增加
和
13 行删除
@@ -2,15 +2,18 @@ package constant | @@ -2,15 +2,18 @@ package constant | ||
2 | 2 | ||
3 | import "os" | 3 | import "os" |
4 | 4 | ||
5 | +var MQTT_TOPIC = "/MQTT" | ||
5 | //设备商提供的测试地址 | 6 | //设备商提供的测试地址 |
6 | -var MQTT_HOST = "175.24.122.87" | 7 | +//var MQTT_HOST = "175.24.122.87" |
8 | +//var MQTT_PORT = "1883" | ||
9 | +//var MQTT_USER = "user111" | ||
10 | +//var MQTT_PASSWORD = "user111" | ||
7 | //内网测试地址 | 11 | //内网测试地址 |
8 | -//var MQTT_HOST = "192.168.100.222" | 12 | +var MQTT_HOST = "192.168.100.222" |
9 | var MQTT_PORT = "1883" | 13 | var MQTT_PORT = "1883" |
14 | +var MQTT_USER = "admin" | ||
15 | +var MQTT_PASSWORD = "123456" | ||
10 | 16 | ||
11 | -var MQTT_USER = "" | ||
12 | - | ||
13 | -var MQTT_PASSWORD = "" | ||
14 | 17 | ||
15 | func init(){ | 18 | func init(){ |
16 | if os.Getenv("MQTT_HOST") != "" { | 19 | if os.Getenv("MQTT_HOST") != "" { |
@@ -14,6 +14,8 @@ var ( | @@ -14,6 +14,8 @@ var ( | ||
14 | REDIS_ADDRESS = "" | 14 | REDIS_ADDRESS = "" |
15 | // redis 考勤机打卡消息队列 | 15 | // redis 考勤机打卡消息队列 |
16 | REDIS_ZKTECO_KEY = "allied-creation-zkteco" | 16 | REDIS_ZKTECO_KEY = "allied-creation-zkteco" |
17 | + // redis 车间数据消息队列 | ||
18 | + REDIS_WORKSHOP_KEY = "allied-creation-workshop" | ||
17 | ) | 19 | ) |
18 | 20 | ||
19 | func init() { | 21 | func init() { |
pkg/domain/device_workshop.go
0 → 100644
1 | +package domain | ||
2 | + | ||
3 | +import "time" | ||
4 | + | ||
5 | +type DeviceWorkShop struct { | ||
6 | + WorkShop string `json:"WorkShop"` // 车间名 | ||
7 | + DeviceSn string `json:"DeviceSn"` // 设备名称 | ||
8 | + CurrTime time.Time `json:"CurrTime"` // 当前时间 | ||
9 | + StartupState int64 `json:"StartupState"` // 启动状态:1:启动,0:停止 | ||
10 | + ComStatus int64 `json:"ComStatus"` // 通讯状态:1:通讯正常,0:设备未上电或与采集端通讯故障 | ||
11 | + InterSpeed int64 `json:"InterSpeed"` // 内包材速度:内包材运行速率 | ||
12 | + ExterSpeed int64 `json:"ExterSpeed"` // 外包材速度:内包材运行速率 | ||
13 | + KnifeSpeed int64 `json:"KnifeSpeed"` // 切刀速度:切刀运行速率 | ||
14 | + TransSpeed int64 `json:"TransSpeed"` // 输送速度:输送带运行速率 | ||
15 | + FrontTemp float64 `json:"FontTemp"` // 炸机前段温度:炸机前段当前温度 | ||
16 | + BackTemp float64 `json:"BackTemp"` // 炸机后段温度:炸机后段当前温度 | ||
17 | + TankTemp float64 `json:"TankTemp"` // 储油罐温度 :储油罐当前温度 | ||
18 | + TubeTemp float64 `json:"TubeTemp"` // 管路温度:管路当前温度 | ||
19 | + Temp1 float64 `json:"Temp1"` // 温度1:温度1当前温度 | ||
20 | + Temp2 float64 `json:"Temp2"` // 温度2:温度2当前温度 | ||
21 | + Temp3 float64 `json:"Temp3"` // 温度3:温度3当前温度 | ||
22 | + Year string `json:"Year"` // 年 | ||
23 | + Month string `json:"Month"` // 月 | ||
24 | + Day string `json:"Day"` // 日 | ||
25 | + ProductType string `json:"ProductType"` // 产品类型:当前产品种类 | ||
26 | + CurrTemp float64 `json:"CurrTemp"` // 当前温度:当前温度 | ||
27 | +} |
@@ -25,6 +25,9 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions { | @@ -25,6 +25,9 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions { | ||
25 | opts.SetPassword(constant.MQTT_PASSWORD) | 25 | opts.SetPassword(constant.MQTT_PASSWORD) |
26 | opts.SetKeepAlive(2 * time.Second) | 26 | opts.SetKeepAlive(2 * time.Second) |
27 | opts.SetPingTimeout(1 * time.Second) | 27 | opts.SetPingTimeout(1 * time.Second) |
28 | + opts.CleanSession = false | ||
29 | + opts.SetClientID("test") | ||
30 | + //opts.Order = true | ||
28 | return opts | 31 | return opts |
29 | } | 32 | } |
30 | 33 |
@@ -37,11 +37,12 @@ func (controller *DeviceZKTecoController) PostCdata() { | @@ -37,11 +37,12 @@ func (controller *DeviceZKTecoController) PostCdata() { | ||
37 | 37 | ||
38 | func (controller *DeviceZKTecoController) GetCdata() { | 38 | func (controller *DeviceZKTecoController) GetCdata() { |
39 | //sn := controller.Ctx.Input.Query("SN") | 39 | //sn := controller.Ctx.Input.Query("SN") |
40 | - | 40 | + controller.Ctx.WriteString("OK") |
41 | } | 41 | } |
42 | 42 | ||
43 | func (controller *DeviceZKTecoController) GetRequest() { | 43 | func (controller *DeviceZKTecoController) GetRequest() { |
44 | - | 44 | + //controller.Ctx.WriteString("C:11:DATA\tQUERY\tUSERINFO\tPIN=10086") |
45 | + controller.Ctx.WriteString("OK") | ||
45 | } | 46 | } |
46 | 47 | ||
47 | func (controller *DeviceZKTecoController) Ping() { | 48 | func (controller *DeviceZKTecoController) Ping() { |
1 | package mqtt | 1 | package mqtt |
2 | 2 | ||
3 | import ( | 3 | import ( |
4 | - "fmt" | 4 | + "encoding/json" |
5 | pahomqtt "github.com/eclipse/paho.mqtt.golang" | 5 | pahomqtt "github.com/eclipse/paho.mqtt.golang" |
6 | + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" | ||
7 | + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain" | ||
6 | "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/mqtt" | 8 | "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/mqtt" |
7 | - "strconv" | 9 | + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis" |
10 | + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" | ||
8 | "time" | 11 | "time" |
9 | ) | 12 | ) |
10 | 13 | ||
11 | -func Start(){ | ||
12 | - mqtt.StartSubscribe("test", func(client pahomqtt.Client, message pahomqtt.Message) { | ||
13 | - fmt.Println(time.Now(),"Topic:"+message.Topic()+" MessageId:"+ strconv.Itoa(int(message.MessageID()))+" Message:"+ string(message.Payload())) | 14 | +func Start() { |
15 | + mqtt.StartSubscribe(constant.MQTT_TOPIC, func(client pahomqtt.Client, message pahomqtt.Message) { | ||
16 | + payload := make(map[string]interface{}) | ||
17 | + err := json.Unmarshal(message.Payload(), &payload) | ||
18 | + if err != nil { | ||
19 | + log.Logger.Error("车间数据json解析失败:" + err.Error()) | ||
20 | + return | ||
21 | + } | ||
22 | + if workShop, ok := payload["WorkShop"]; ok { | ||
23 | + for key, item := range payload { | ||
24 | + if key == "WorkShop" { | ||
25 | + continue | ||
26 | + } | ||
27 | + deviceWorkShop := &domain.DeviceWorkShop{} | ||
28 | + mBytes, err := json.Marshal(item) | ||
29 | + if err != nil { | ||
30 | + continue | ||
31 | + } | ||
32 | + err = json.Unmarshal(mBytes, deviceWorkShop) | ||
33 | + if err != nil { | ||
34 | + continue | ||
35 | + } | ||
36 | + // 获取当前时间 | ||
37 | + deviceWorkShop.CurrTime = time.Now() | ||
38 | + // 车间名称 | ||
39 | + deviceWorkShop.WorkShop = workShop.(string) | ||
40 | + // 设备名称 | ||
41 | + deviceWorkShop.DeviceSn = key | ||
42 | + workShopBytes, err := json.Marshal(deviceWorkShop) | ||
43 | + if err != nil { | ||
44 | + continue | ||
45 | + } | ||
46 | + err = redis.GetRedis().LPush(constant.REDIS_WORKSHOP_KEY, string(workShopBytes)).Err() | ||
47 | + if err != nil { | ||
48 | + log.Logger.Error("车间设备数据加入redis失败:" + err.Error()) | ||
49 | + } | ||
50 | + } | ||
51 | + } | ||
52 | + log.Logger.Info("MQTT", map[string]interface{}{ | ||
53 | + "Topic": message.Topic(), | ||
54 | + "MessageId": message.MessageID(), | ||
55 | + "Message": payload, | ||
56 | + }) | ||
14 | }) | 57 | }) |
15 | -} | ||
58 | +} |
-
请 注册 或 登录 后发表评论