作者 yangfu

feat:设备数据建模

... ... @@ -2,6 +2,7 @@ package main
import (
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/mqtt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/task"
"github.com/beego/beego/v2/server/web"
... ... @@ -9,7 +10,6 @@ import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/mqtt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/crontab"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
... ...
package query
import (
"fmt"
"reflect"
"strings"
"github.com/beego/beego/v2/core/validation"
)
type CommonStatisticsQuery struct {
Action string `cname:"查询类别" json:"actionType" valid:"Required"`
QueryOptions map[string]interface{} `json:"queryOptions"`
}
func (checkUndertakerQuery *CommonStatisticsQuery) Valid(validation *validation.Validation) {
//validation.SetError("CustomValid", "未实现的自定义认证")
}
func (checkUndertakerQuery *CommonStatisticsQuery) ValidateQuery() error {
valid := validation.Validation{}
b, err := valid.Valid(checkUndertakerQuery)
if err != nil {
return err
}
if !b {
elem := reflect.TypeOf(checkUndertakerQuery).Elem()
for _, validErr := range valid.Errors {
field, isExist := elem.FieldByName(validErr.Field)
if isExist {
return fmt.Errorf(strings.Replace(validErr.Message, validErr.Field, field.Tag.Get("cname"), -1))
} else {
return fmt.Errorf(validErr.Message)
}
}
}
return nil
}
... ...
package service
import (
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/statistics/query"
)
// CommonStatisticsService 通用的统计服务
type CommonStatisticsService struct {
}
// CommonStatisticsService 通用的统计服务
func (svr *CommonStatisticsService) CommonStatisticsService(contractStatisticsQuery *query.CommonStatisticsQuery) (interface{}, error) {
if err := contractStatisticsQuery.ValidateQuery(); err != nil {
return nil, application.ThrowError(application.ARG_ERROR, err.Error())
}
var err error
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
_ = transactionContext.RollbackTransaction()
}()
//statisticsService, err := factory.CreateCooperationStatisticsService(map[string]interface{}{
// "transactionContext": transactionContext,
//})
//var res interface{}
//switch contractStatisticsQuery.Action {
//case domain_service.SearchContractDividends:
// res, err = statisticsService.SearchContractDividends(contractStatisticsQuery.QueryOptions)
//case domain_service.GetContractDividends:
// res, err = statisticsService.GetContractDividends(contractStatisticsQuery.QueryOptions)
//case domain_service.CooperationGoodsStatistics:
// res, err = statisticsService.CooperationGoodsStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CooperationModeStatistics:
// res, err = statisticsService.CooperationModeStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CompanyDividendsStatistics:
// res, err = statisticsService.CompanyDividendsStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CompanyCooperationUsersStatistics:
// res, err = statisticsService.CompanyCooperationUsersStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CompanyPaymentHistoryStatistics:
// res, err = statisticsService.CompanyPaymentHistoryStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CompanyCooperationProjectContracts:
// res, err = statisticsService.CompanyCooperationProjectContracts(contractStatisticsQuery.QueryOptions)
//case domain_service.PaymentHistoryHistogramStatistics:
// res, err = statisticsService.PaymentHistoryHistogramStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.CooperationUserModeStatistics:
// res, err = statisticsService.CooperationUserModeStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.DividendsStatistics:
// res, err = statisticsService.DividendsStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.SearchDividendsEstimates:
// res, err = statisticsService.SearchDividendsEstimates(contractStatisticsQuery.QueryOptions)
//case domain_service.CooperationCompanyStatistics:
// res, err = statisticsService.CooperationCompanyStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.PersonCooperationContractStatistics:
// res, err = statisticsService.PersonCooperationContractStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.PersonCompanyPaymentHistoryStatistics:
// res, err = statisticsService.PersonCompanyPaymentHistoryStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.PersonCooperationProjectSharedInfo:
// res, err = statisticsService.PersonCooperationProjectSharedInfo(contractStatisticsQuery.QueryOptions)
//case domain_service.PersonCooperationProjectSharedInfoAttachment:
// res, err = statisticsService.PersonCooperationProjectSharedInfoAttachment(contractStatisticsQuery.QueryOptions)
//case domain_service.PersonCooperationCompany:
// res, err = statisticsService.PersonCooperationCompany(contractStatisticsQuery.QueryOptions)
//case domain_service.CreditAccountStatistics:
// res, err = statisticsService.CreditAccountStatistics(contractStatisticsQuery.QueryOptions)
//case domain_service.RelevantCooperationContractNumbers:
// res, err = statisticsService.RelevantCooperationContractNumbers(contractStatisticsQuery.QueryOptions)
//}
if err != nil {
return nil, application.ThrowError(application.BUSINESS_ERROR, err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return struct {
}{}, nil
}
... ...
... ... @@ -3,6 +3,7 @@ package constant
import "os"
var MQTT_TOPIC = "/MQTT"
//设备商提供的测试地址
//var MQTT_HOST = "175.24.122.87"
//var MQTT_PORT = "1883"
... ... @@ -14,8 +15,7 @@ var MQTT_PORT = "1883"
var MQTT_USER = "admin"
var MQTT_PASSWORD = "123456"
func init(){
func init() {
if os.Getenv("MQTT_HOST") != "" {
MQTT_HOST = os.Getenv("MQTT_HOST")
}
... ... @@ -28,4 +28,4 @@ func init(){
if os.Getenv("MQTT_PASSWORD") != "" {
MQTT_PASSWORD = os.Getenv("MQTT_PASSWORD")
}
}
\ No newline at end of file
}
... ...
... ... @@ -7,7 +7,7 @@ type DeviceRunningData struct {
// 通讯状态:1:通讯正常,0:设备未上电或与采集端通讯故障
ComStatus int `json:"comStatus"`
// 匹配数目
Count int `json:"count,string"`
Count int `json:"count"`
// 炸机前段温度:炸机前段当前温度 YZJ1 油炸机
FrontTemp float64 `json:"frontTemp"`
// 炸机前段温度:炸机前段当前温度 YZJ2 油炸机
... ...
package domainService
import (
"fmt"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/utils"
)
const (
// 时段产能统计
HourProductiveStatistics = "HourProductiveStatistics"
)
type PGCommonStatisticsService struct {
transactionContext *pgTransaction.TransactionContext
}
func (ptr *PGCommonStatisticsService) CommonStatistics(actionType, queryOptions map[string]interface{}) (interface{}, error) {
switch actionType {
}
return nil, nil
}
// 时段产能-统计 (传串设备)
func (ptr *PGCommonStatisticsService) HourProductiveStatistics(queryOptions map[string]interface{}) (interface{}, error) {
var request = &HourProductiveStatisticsRequest{}
if err := utils.LoadQueryObject(queryOptions, &request); err != nil {
return nil, err
}
return nil, nil
}
type HourProductiveStatisticsRequest struct {
CompanyId int `json:"companyId" valid:"Required"`
OrgId int `json:"orgId" valid:"Required"`
WorkshopId int `json:"workshopId" valid:"Required"`
}
func NewPGCommonStatisticsService(transactionContext *pgTransaction.TransactionContext) (*PGCommonStatisticsService, error) {
if transactionContext == nil {
return nil, fmt.Errorf("transactionContext参数不能为nil")
} else {
return &PGCommonStatisticsService{
transactionContext: transactionContext,
}, nil
}
}
... ...
... ... @@ -62,7 +62,7 @@ func SendWorkshopWorkTimeStaticJob(productRecord *domain.ProductAttendanceRecord
return SendAsyncJob(domain.TaskKeyWorkshopWorkTimeRecordStatics(), productRecord)
}
func SenDeviceZkTecoReportJob(productRecord *domain.DeviceZkTeco) error {
func SendDeviceZkTecoReportJob(productRecord *domain.DeviceZkTeco) error {
return SendAsyncJob(domain.TaskDeviceZkTecoReport(), productRecord)
}
... ...
... ... @@ -4,6 +4,7 @@ import (
"fmt"
pahomqtt "github.com/eclipse/paho.mqtt.golang"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"time"
)
... ... @@ -26,21 +27,26 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.CleanSession = false
opts.SetClientID("test")
//opts.SetClientID("test")
//opts.Order = true
return opts
}
func (subscribeClient *SubscribeClient) Connect() *SubscribeClient{
func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
opts := subscribeClient.options()
fmt.Println("start connect......")
opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
defer func() {
if r := recover(); r != nil {
log.Logger.Error(fmt.Sprintf("%v", r))
}
}()
fmt.Println("Connect error:", err)
for {
fmt.Println("reconnect server")
token := subscribeClient.client.Connect()
token.Wait()
fmt.Println("server Connect status:",subscribeClient.client.IsConnectionOpen())
fmt.Println("server Connect status:", subscribeClient.client.IsConnectionOpen())
if subscribeClient.client.IsConnectionOpen() {
break
}
... ... @@ -48,7 +54,7 @@ func (subscribeClient *SubscribeClient) Connect() *SubscribeClient{
}
}
opts.OnConnect = func(c pahomqtt.Client) {
c.Subscribe(subscribeClient.topic,0,subscribeClient.handler)
c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
}
subscribeClient.client = pahomqtt.NewClient(opts)
token := subscribeClient.client.Connect()
... ... @@ -56,21 +62,21 @@ func (subscribeClient *SubscribeClient) Connect() *SubscribeClient{
return subscribeClient
}
func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler){
func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler pahomqtt.MessageHandler) {
subscribeClient.topic = topic
subscribeClient.handler = messageHandler
token := subscribeClient.client.Subscribe(topic,0,messageHandler)
token := subscribeClient.client.Subscribe(topic, 0, messageHandler)
token.Wait()
token.Done()
}
func StartSubscribe(topic string,handler MessageHandler){
func StartSubscribe(topic string, handler MessageHandler) {
defer func() {
if err := recover();err != nil {
if err := recover(); err != nil {
fmt.Println(err)
StartSubscribe(topic,handler)
StartSubscribe(topic, handler)
}
}()
fmt.Println("start subscribe...")
NewSubscribeClient().Connect().Subscribe(topic,pahomqtt.MessageHandler(handler))
}
\ No newline at end of file
NewSubscribeClient().Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
}
... ...
... ... @@ -28,7 +28,7 @@ func (controller *DeviceZKTecoController) PostCdata() {
data.ActionTime = mTime
//mBytes, _ := json.Marshal(data)
//redis.GetRedis().LPush(domain.TaskDeviceZkTecoReport(), mBytes)
domainService.SenDeviceZkTecoReportJob(data)
domainService.SendDeviceZkTecoReportJob(data)
}
}
controller.Response(data, nil)
... ...