作者 yangfu

feat:1.MQTT数据汇报处理

... ... @@ -26,6 +26,8 @@ type CreateDeviceCollectionCommand struct {
ComStatus int64 `cname:"通讯状态 1-通讯正常 0-设备未上电或与采集端通讯故障" json:"comStatus"`
// 设备数据值
Values map[string]interface{} `cname:"设备数据值" json:"values" valid:"Required"`
// 预查
PreCheck bool `json:"check"`
}
func (createDeviceCollectionCommand *CreateDeviceCollectionCommand) Valid(validation *validation.Validation) {
... ...
package service
import (
"errors"
"fmt"
"github.com/linmadan/egglib-go/core/application"
"github.com/linmadan/egglib-go/utils/tool_funs"
... ... @@ -8,26 +9,27 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/deviceCollection/query"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"strconv"
"sync"
"time"
)
type DeviceCollectionService struct {
}
var (
DeviceDataCache = NewDeviceDataInstance()
)
// 创建
func (deviceCollectionService *DeviceCollectionService) CreateDeviceCollection(createDeviceCollectionCommand *command.CreateDeviceCollectionCommand) (interface{}, error) {
if err := createDeviceCollectionCommand.ValidateCommand(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
newDeviceCollection := &domain.DeviceCollection{
//DeviceCollectionId: createDeviceCollectionCommand.DeviceCollectionId,
WorkShopName: createDeviceCollectionCommand.WorkShopName,
... ... @@ -38,6 +40,31 @@ func (deviceCollectionService *DeviceCollectionService) CreateDeviceCollection(c
CollectionTime: createDeviceCollectionCommand.CollectionTime,
Values: createDeviceCollectionCommand.Values,
}
var lastDeviceCollectionRecord = &domain.DeviceCollection{}
var err error
if createDeviceCollectionCommand.PreCheck {
//前置验证,限制设备上报速率
if lastDeviceCollectionRecord, err = DeviceDataCache.Add(newDeviceCollection.DeviceSn, newDeviceCollection.DeviceType, newDeviceCollection); err != nil {
//log.Logger.Error(err.Error()+newDeviceCollection.DeviceSn)
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if lastDeviceCollectionRecord == nil {
//log.Logger.Error("未找到上一条设备数据")
return nil, nil
}
}
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
var deviceCollectionRepository domain.DeviceCollectionRepository
if value, err := factory.CreateDeviceCollectionRepository(map[string]interface{}{
"transactionContext": transactionContext,
... ... @@ -46,16 +73,49 @@ func (deviceCollectionService *DeviceCollectionService) CreateDeviceCollection(c
} else {
deviceCollectionRepository = value
}
if deviceCollection, err := deviceCollectionRepository.Save(newDeviceCollection); err != nil {
deviceCollection, err := deviceCollectionRepository.Save(newDeviceCollection)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//处理设备数据
switch deviceCollection.DeviceType {
case domain.DeviceTypeBaoXianJi, domain.DeviceTypeChuanChuanJi, domain.DeviceTypeFengKouJi, domain.DeviceTypeFengXiangJi:
if v, ok := deviceCollection.Values["Count"]; ok {
curCount, errCurCount := strconv.Atoi(fmt.Sprintf("%v", v))
v, ok = lastDeviceCollectionRecord.Values["Count"]
if ok {
lastCount, errLastCount := strconv.Atoi(fmt.Sprintf("%v", v))
if errLastCount == nil && errCurCount == nil && lastCount <= curCount {
deviceCollection.Values["Count"] = curCount - lastCount
} else {
deviceCollection.Values["Count"] = 0
/*
设备统计的数量超过一定范围会重置为0,特殊处理0操作
*/
if lastCount > 10000000 && curCount < 1000 {
deviceCollection.Values["Count"] = curCount
}
}
} else {
deviceCollection.Values["Count"] = 0
}
}
break
}
err = domainService.SendWorkshopDeviceData(deviceCollection)
if err != nil {
log.Logger.Error("车间设备数据加入redis失败:" + err.Error())
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return map[string]interface{}{
"deviceCollection": deviceCollection,
}, nil
}
}
// 返回
... ... @@ -219,3 +279,47 @@ func NewDeviceCollectionService(options map[string]interface{}) *DeviceCollectio
newDeviceCollectionService := &DeviceCollectionService{}
return newDeviceCollectionService
}
const DefaultReceiveSpan = 60 // 60 sec
type DeviceDataInstance struct {
//deviceDataCache sync.Map
deviceDataLastTime sync.Map
}
func NewDeviceDataInstance() *DeviceDataInstance {
return &DeviceDataInstance{
deviceDataLastTime: sync.Map{},
}
}
func (d *DeviceDataInstance) Add(deviceSn, deviceType string, data interface{}) (*domain.DeviceCollection, error) {
// 获取数据上一次的
var v interface{}
var ok bool
var now = time.Now().Unix()
var t = now
if v, ok = d.deviceDataLastTime.Load(deviceSn); ok {
t = v.(int64)
} else {
d.deviceDataLastTime.Store(deviceSn, t)
redis.SaveDeviceRealTimeData(deviceSn, data, true)
return nil, errors.New(fmt.Sprintf("ingnore this record"))
}
// 60秒接受一次数据,暂时不用太多数据
if now < t+DefaultReceiveSpan {
return nil, errors.New(fmt.Sprintf("receive too fast wait %v sec", t+DefaultReceiveSpan-now))
}
// 从redis获取最后的数据进行处理
var result = &domain.DeviceCollection{}
if err := redis.GetDeviceRealTimeData(deviceSn, result); err != nil {
if err == domain.ErrorNotFound {
redis.SaveDeviceRealTimeData(deviceSn, data, true)
return nil, nil
}
return nil, err
}
//log.Logger.Debug("",map[string]interface{}{"t":t,"device":deviceSn})
d.deviceDataLastTime.Store(deviceSn, now)
return result, redis.SaveDeviceRealTimeData(deviceSn, data, false)
}
... ...
... ... @@ -2,6 +2,7 @@ package query
import (
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
"reflect"
"strings"
... ... @@ -31,18 +32,18 @@ type SearchProductGroupQuery struct {
LineName string `cname:"生产线名称" json:"lineName,omitempty"`
}
func (listProductGroupQuery *SearchProductGroupQuery) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
func (cmd *SearchProductGroupQuery) Valid(validation *validation.Validation) {
cmd.Offset, cmd.Limit = domain.Pagination(cmd.PageNumber, cmd.PageSize)
}
func (listProductGroupQuery *SearchProductGroupQuery) ValidateQuery() error {
func (cmd *SearchProductGroupQuery) ValidateQuery() error {
valid := validation.Validation{}
b, err := valid.Valid(listProductGroupQuery)
b, err := valid.Valid(cmd)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(listProductGroupQuery).Elem()
elem := reflect.TypeOf(cmd).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
... ...
... ... @@ -27,10 +27,11 @@ type DeviceYouZhaJi2 struct {
// 串串机
type DeviceChuanChuanJi struct {
Count int64 `json:"Count"` // 生产计数:生产统计数量
Year string `json:"Year"` // 年
Month string `json:"Month"` // 月
Day string `json:"Day"` // 日
ProductType string `json:"ProductType"` // 产品类型:当前产品种类
Year int `json:"Year"` // 年
Month int `json:"Month"` // 月
Day int `json:"Day"` // 日
ProductType int `json:"ProductType"`
ProductType1 string `json:"ProductType1"` // 产品类型:当前产品种类
}
// 速冻线
... ... @@ -41,17 +42,19 @@ type DeviceSuDongXian struct {
// 封口机
type DeviceFengKouJi struct {
Count int64 `json:"Count"` // 生产计数:生产统计数量
Year string `json:"Year"` // 年
Month string `json:"Month"` // 月
Day string `json:"Day"` // 日
ProductType string `json:"ProductType"` // 产品类型:当前产品种类
Year int `json:"Year"` // 年
Month int `json:"Month"` // 月
Day int `json:"Day"` // 日
ProductType int `json:"ProductType"`
ProductType1 string `json:"ProductType1"` // 产品类型:当前产品种类
}
// 封箱机
type DeviceFengXiangJi struct {
Count int64 `json:"Count"` // 生产计数:生产统计数量
Year string `json:"Year"` // 年
Month string `json:"Month"` // 月
Day string `json:"Day"` // 日
ProductType string `json:"ProductType"` // 产品类型:当前产品种类
Year int `json:"Year"` // 年
Month int `json:"Month"` // 月
Day int `json:"Day"` // 日
ProductType int `json:"ProductType"`
ProductType1 string `json:"ProductType1"` // 产品类型:当前产品种类
}
... ...
... ... @@ -10,7 +10,6 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/repository"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/utils"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"strconv"
"time"
)
... ... @@ -127,11 +126,11 @@ func (ptr *PGWorkshopDataConsumeService) newDeviceRunningData(record *domain.Dev
if mBytes, err = json.Marshal(record.Values); err != nil {
return nil, err
}
var formatDate = func(y, m, d string) (string, error) {
yd, _ := strconv.Atoi(y)
md, _ := strconv.Atoi(m)
dd, _ := strconv.Atoi(d)
t := time.Date(yd, time.Month(md), dd, 0, 0, 0, 0, time.Local)
var formatDate = func(y, m, d int) (string, error) {
//yd, _ := strconv.Atoi(y)
//md, _ := strconv.Atoi(m)
//dd, _ := strconv.Atoi(d)
t := time.Date(y, time.Month(m), d, 0, 0, 0, 0, time.Local)
return t.Local().Format("2006-01-02"), nil
}
switch record.DeviceType {
... ... @@ -172,7 +171,7 @@ func (ptr *PGWorkshopDataConsumeService) newDeviceRunningData(record *domain.Dev
break
}
data.Count = int(deviceChuanChuanJi.Count)
data.ProductType = deviceChuanChuanJi.ProductType
data.ProductType = deviceChuanChuanJi.ProductType1
if data.Date, err = formatDate(deviceChuanChuanJi.Year, deviceChuanChuanJi.Month, deviceChuanChuanJi.Day); err != nil {
return nil, err
}
... ... @@ -195,7 +194,7 @@ func (ptr *PGWorkshopDataConsumeService) newDeviceRunningData(record *domain.Dev
break
}
data.Count = int(deviceFengKouJi.Count)
data.ProductType = deviceFengKouJi.ProductType
data.ProductType = deviceFengKouJi.ProductType1
if data.Date, err = formatDate(deviceFengKouJi.Year, deviceFengKouJi.Month, deviceFengKouJi.Day); err != nil {
return nil, err
}
... ... @@ -208,7 +207,7 @@ func (ptr *PGWorkshopDataConsumeService) newDeviceRunningData(record *domain.Dev
break
}
data.Count = int(deviceFengXiangJi.Count)
data.ProductType = deviceFengXiangJi.ProductType
data.ProductType = deviceFengXiangJi.ProductType1
if data.Date, err = formatDate(deviceFengXiangJi.Year, deviceFengXiangJi.Month, deviceFengXiangJi.Day); err != nil {
return nil, err
}
... ...
... ... @@ -26,8 +26,8 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
opts.SetPassword(constant.MQTT_PASSWORD)
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.CleanSession = false
//opts.SetClientID("test")
//opts.CleanSession = false
opts.SetClientID(constant.SERVICE_NAME)
//opts.Order = true
return opts
}
... ...
package redis
import (
"fmt"
"github.com/go-redis/redis"
"github.com/linmadan/egglib-go/utils/json"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
)
/*
保存实时数据
device 设备号
record 记录
saveNotExists true:不存在才保存 false:直接覆盖
*/
func SaveDeviceRealTimeData(device string, record interface{}, saveNotExists bool) error {
client := GetRedis()
recordData, err := json.Marshal(record)
if err != nil {
return err
}
key := DeviceRealTimeDataKey(device)
// 已存在的不做保存
if saveNotExists {
if exists, err := client.Exists(key).Result(); exists > 0 && err == nil {
return nil
}
}
result := client.Set(key, recordData, 0)
_, err = result.Result()
return err
}
/*
获取实时数据
*/
func GetDeviceRealTimeData(device string, val interface{}) error {
client := GetRedis()
key := DeviceRealTimeDataKey(device)
result := client.Get(key)
data, err := result.Bytes()
if err == redis.Nil {
return domain.ErrorNotFound
}
if err = json.Unmarshal(data, val); err != nil {
return err
}
return nil
}
func DeviceRealTimeDataKey(deviceType string) string {
str := fmt.Sprintf("%v:device-realtime-data:%v", constant.CACHE_PREFIX, deviceType)
return str
}
... ...
... ... @@ -2,7 +2,6 @@ package mqtt
import (
"encoding/json"
"fmt"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/linmadan/egglib-go/utils/tool_funs"
"github.com/tidwall/gjson"
... ... @@ -11,14 +10,16 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/mqtt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/utils"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"time"
)
func Start() {
mqtt.StartSubscribe(constant.MQTT_TOPIC, func(client pahomqtt.Client, message pahomqtt.Message) {
mqtt.StartSubscribe(constant.MQTT_TOPIC, OnReceiveData)
}
func OnReceiveData(client pahomqtt.Client, message pahomqtt.Message) {
payload := make(map[string]interface{})
err := json.Unmarshal(message.Payload(), &payload)
if err != nil {
... ... @@ -79,6 +80,7 @@ func Start() {
deviceChuanChuanJi := &domain.DeviceChuanChuanJi{}
err = json.Unmarshal(mBytes, deviceChuanChuanJi)
if err != nil {
log.Logger.Error(err.Error())
continue
}
deviceCollection.Values = tool_funs.SimpleStructToMap(deviceChuanChuanJi)
... ... @@ -112,10 +114,15 @@ func Start() {
break
//打浆机 //面包屑机
case domain.DeviceTypeDaJiangJi:
case domain.DeviceTypeChuanChuanJi:
default:
}
if deviceType != domain.DeviceTypeChuanChuanJi {
continue
}
// 发送数据
deviceCollectionService := service.NewDeviceCollectionService(nil)
resp, err := deviceCollectionService.CreateDeviceCollection(&command.CreateDeviceCollectionCommand{
_, err := deviceCollectionService.CreateDeviceCollection(&command.CreateDeviceCollectionCommand{
WorkShopName: deviceCollection.WorkShopName,
StartupStatus: deviceCollection.StartupStatus,
CollectionTime: deviceCollection.CollectionTime,
... ... @@ -123,37 +130,11 @@ func Start() {
DeviceType: deviceCollection.DeviceType,
ComStatus: deviceCollection.ComStatus,
Values: deviceCollection.Values,
PreCheck: true,
})
if err != nil {
continue
}
result := resp.(map[string]interface{})
if deviceCollectionResult, ok := result["deviceCollection"]; ok {
fmt.Println(deviceCollectionResult)
deviceCollection.DeviceCollectionId = deviceCollectionResult.(*domain.DeviceCollection).DeviceCollectionId
workShopBytes, err := json.Marshal(deviceCollection)
if err != nil {
continue
}
err = redis.GetRedis().LPush(constant.REDIS_WORKSHOP_KEY, string(workShopBytes)).Err()
if err != nil {
log.Logger.Error("车间设备数据加入redis失败:" + err.Error())
}
//workShopBytes, err := json.Marshal(deviceCollection)
//if err != nil {
// continue
//}
//err = redis.GetRedis().LPush(constant.REDIS_WORKSHOP_KEY, string(workShopBytes)).Err()
//if err != nil {
// log.Logger.Error("车间设备数据加入redis失败:" + err.Error())
//}
//err = domainService.SendWorkshopDeviceData(deviceCollection)
//if err != nil {
// log.Logger.Error("车间设备数据加入redis失败:" + err.Error())
//
//}
}
}
log.Logger.Info("MQTT", map[string]interface{}{
... ... @@ -162,6 +143,4 @@ func Start() {
"Message": payload,
})
}
}
})
}
... ...