作者 yangfu

refactor:优化日志记录,mqtt重连修改

@@ -39,7 +39,7 @@ func main() { @@ -39,7 +39,7 @@ func main() {
39 log.Logger.Info("server start ....") 39 log.Logger.Info("server start ....")
40 log.Logger.Debug(fmt.Sprintf("ENABLE_KAFKA_LOG:%v", constant.ENABLE_KAFKA_LOG)) 40 log.Logger.Debug(fmt.Sprintf("ENABLE_KAFKA_LOG:%v", constant.ENABLE_KAFKA_LOG))
41 41
42 - go mqtt.Start() 42 + go mqtt.Start(log.Logger)
43 go task.Run() 43 go task.Run()
44 cron := crontab.NewCrontabService(nil) 44 cron := crontab.NewCrontabService(nil)
45 cron.StartCrontabTask() 45 cron.StartCrontabTask()
@@ -15,7 +15,7 @@ import ( @@ -15,7 +15,7 @@ import (
15 func AutoApproveProductAttendanceRecord(ctx context.Context) error { 15 func AutoApproveProductAttendanceRecord(ctx context.Context) error {
16 defer func() { 16 defer func() {
17 if r := recover(); r != nil { 17 if r := recover(); r != nil {
18 - log.Logger.Error(fmt.Sprintf("%v", r)) 18 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时审核生产考勤记录"})
19 } 19 }
20 }() 20 }()
21 transactionContext, err := factory.CreateTransactionContext(nil) 21 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -36,12 +36,14 @@ func AutoApproveProductAttendanceRecord(ctx context.Context) error { @@ -36,12 +36,14 @@ func AutoApproveProductAttendanceRecord(ctx context.Context) error {
36 var records []*domain.ProductAttendanceRecord 36 var records []*domain.ProductAttendanceRecord
37 _, records, err = attendanceRecordDao.RecentUnApprovedAttendanceRecord(24, 2) 37 _, records, err = attendanceRecordDao.RecentUnApprovedAttendanceRecord(24, 2)
38 if err != nil { 38 if err != nil {
  39 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时审核生产考勤记录"})
39 return err 40 return err
40 } 41 }
41 42
42 approveAttendanceRecordsService, _ := domainService.NewPGApproveAttendanceRecordsService(transactionContext.(*pgTransaction.TransactionContext)) 43 approveAttendanceRecordsService, _ := domainService.NewPGApproveAttendanceRecordsService(transactionContext.(*pgTransaction.TransactionContext))
43 44
44 if _, err = approveAttendanceRecordsService.BatchApproveAttendanceRecords(nil, records, 0, 0, domain.AttendanceAutoApproved); err != nil { 45 if _, err = approveAttendanceRecordsService.BatchApproveAttendanceRecords(nil, records, 0, 0, domain.AttendanceAutoApproved); err != nil {
  46 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时审核生产考勤记录"})
45 return err 47 return err
46 } 48 }
47 if err = transactionContext.CommitTransaction(); err != nil { 49 if err = transactionContext.CommitTransaction(); err != nil {
@@ -15,7 +15,7 @@ import ( @@ -15,7 +15,7 @@ import (
15 func AutoApproveProductRecord(ctx context.Context) error { 15 func AutoApproveProductRecord(ctx context.Context) error {
16 defer func() { 16 defer func() {
17 if r := recover(); r != nil { 17 if r := recover(); r != nil {
18 - log.Logger.Error(fmt.Sprintf("%v", r)) 18 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时审核生产记录"})
19 } 19 }
20 }() 20 }()
21 transactionContext, err := factory.CreateTransactionContext(nil) 21 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -42,6 +42,7 @@ func AutoApproveProductRecord(ctx context.Context) error { @@ -42,6 +42,7 @@ func AutoApproveProductRecord(ctx context.Context) error {
42 approveAttendanceRecordsService, _ := domainService.NewPGProductRecordService(transactionContext.(*pgTransaction.TransactionContext)) 42 approveAttendanceRecordsService, _ := domainService.NewPGProductRecordService(transactionContext.(*pgTransaction.TransactionContext))
43 43
44 if _, err = approveAttendanceRecordsService.BatchApprove(records, 0, 0, domain.AttendanceAutoApproved); err != nil { 44 if _, err = approveAttendanceRecordsService.BatchApprove(records, 0, 0, domain.AttendanceAutoApproved); err != nil {
  45 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时审核生产记录"})
45 return err 46 return err
46 } 47 }
47 if err = transactionContext.CommitTransaction(); err != nil { 48 if err = transactionContext.CommitTransaction(); err != nil {
@@ -13,7 +13,7 @@ import ( @@ -13,7 +13,7 @@ import (
13 func AutoFlushDeviceDailyRunningRecord(ctx context.Context) error { 13 func AutoFlushDeviceDailyRunningRecord(ctx context.Context) error {
14 defer func() { 14 defer func() {
15 if r := recover(); r != nil { 15 if r := recover(); r != nil {
16 - log.Logger.Error(fmt.Sprintf("%v", r)) 16 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时刷新设备每日运行记录"})
17 } 17 }
18 }() 18 }()
19 transactionContext, err := factory.CreateTransactionContext(nil) 19 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -18,7 +18,7 @@ import ( @@ -18,7 +18,7 @@ import (
18 func AutoFlushDeviceDailyRunningRecordOEE(ctx context.Context) error { 18 func AutoFlushDeviceDailyRunningRecordOEE(ctx context.Context) error {
19 defer func() { 19 defer func() {
20 if r := recover(); r != nil { 20 if r := recover(); r != nil {
21 - log.Logger.Error(fmt.Sprintf("%v", r)) 21 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时刷新设备每日运行记录"})
22 } 22 }
23 }() 23 }()
24 transactionContext, err := factory.CreateTransactionContext(nil) 24 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -92,7 +92,7 @@ func AutoFlushDeviceDailyRunningRecordOEE(ctx context.Context) error { @@ -92,7 +92,7 @@ func AutoFlushDeviceDailyRunningRecordOEE(ctx context.Context) error {
92 continue 92 continue
93 } 93 }
94 if err := redis.SaveDeviceDailyRunningRecord(v); err != nil { 94 if err := redis.SaveDeviceDailyRunningRecord(v); err != nil {
95 - log.Logger.Error(err.Error()) 95 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时刷新设备每日运行记录"})
96 continue 96 continue
97 } 97 }
98 log.Logger.Debug(fmt.Sprintf("【定时刷新设备每日运行记录-OEE】 刷新记录 %v", v)) 98 log.Logger.Debug(fmt.Sprintf("【定时刷新设备每日运行记录-OEE】 刷新记录 %v", v))
@@ -15,7 +15,7 @@ import ( @@ -15,7 +15,7 @@ import (
15 func AutoWorkshopPlanCompletionRecord(ctx context.Context) error { 15 func AutoWorkshopPlanCompletionRecord(ctx context.Context) error {
16 defer func() { 16 defer func() {
17 if r := recover(); r != nil { 17 if r := recover(); r != nil {
18 - log.Logger.Error(fmt.Sprintf("%v", r)) 18 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时刷新车间计划完成纪录"})
19 } 19 }
20 }() 20 }()
21 transactionContext, err := factory.CreateTransactionContext(nil) 21 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -38,6 +38,7 @@ func AutoWorkshopPlanCompletionRecord(ctx context.Context) error { @@ -38,6 +38,7 @@ func AutoWorkshopPlanCompletionRecord(ctx context.Context) error {
38 approveAttendanceRecordsService, _ := domainService.NewPGWorkshopPlanCompletionRecordService(transactionContext.(*pgTransaction.TransactionContext)) 38 approveAttendanceRecordsService, _ := domainService.NewPGWorkshopPlanCompletionRecordService(transactionContext.(*pgTransaction.TransactionContext))
39 39
40 if err = approveAttendanceRecordsService.WorkshopPlanCompletion(begin, end); err != nil { 40 if err = approveAttendanceRecordsService.WorkshopPlanCompletion(begin, end); err != nil {
  41 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时刷新车间计划完成纪录"})
41 return err 42 return err
42 } 43 }
43 44
@@ -74,6 +75,7 @@ func AutoTodayWorkshopPlanCompletionRecord(ctx context.Context) error { @@ -74,6 +75,7 @@ func AutoTodayWorkshopPlanCompletionRecord(ctx context.Context) error {
74 approveAttendanceRecordsService, _ := domainService.NewPGWorkshopPlanCompletionRecordService(transactionContext.(*pgTransaction.TransactionContext)) 75 approveAttendanceRecordsService, _ := domainService.NewPGWorkshopPlanCompletionRecordService(transactionContext.(*pgTransaction.TransactionContext))
75 76
76 if err = approveAttendanceRecordsService.WorkshopPlanCompletion(begin, end); err != nil { 77 if err = approveAttendanceRecordsService.WorkshopPlanCompletion(begin, end); err != nil {
  78 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时刷新当天车间计划完成纪录"})
77 return err 79 return err
78 } 80 }
79 81
@@ -8,6 +8,7 @@ import ( @@ -8,6 +8,7 @@ import (
8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata"
9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis" 9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
10 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" 10 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
  11 + "runtime/debug"
11 "time" 12 "time"
12 ) 13 )
13 14
@@ -15,7 +16,7 @@ import ( @@ -15,7 +16,7 @@ import (
15 func SyncProduct(ctx context.Context) error { 16 func SyncProduct(ctx context.Context) error {
16 defer func() { 17 defer func() {
17 if r := recover(); r != nil { 18 if r := recover(); r != nil {
18 - log.Logger.Error(fmt.Sprintf("%v", r)) 19 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时同步产品", "stack": string(debug.Stack())})
19 } 20 }
20 }() 21 }()
21 transactionContext, err := factory.CreateTransactionContext(nil) 22 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -36,9 +37,11 @@ func SyncProduct(ctx context.Context) error { @@ -36,9 +37,11 @@ func SyncProduct(ctx context.Context) error {
36 pullK3CloudService := syncdata.PullDataK3CloudService{} 37 pullK3CloudService := syncdata.PullDataK3CloudService{}
37 t := ctx.Value("fromTime") 38 t := ctx.Value("fromTime")
38 var fromTime time.Time 39 var fromTime time.Time
39 - if t != nil {  
40 - if v, ok := t.(time.Time); ok {  
41 - fromTime = v 40 + if ctx != nil {
  41 + if t != nil {
  42 + if v, ok := t.(time.Time); ok {
  43 + fromTime = v
  44 + }
42 } 45 }
43 } 46 }
44 if fromTime.IsZero() { 47 if fromTime.IsZero() {
@@ -49,7 +52,7 @@ func SyncProduct(ctx context.Context) error { @@ -49,7 +52,7 @@ func SyncProduct(ctx context.Context) error {
49 } 52 }
50 } 53 }
51 if err := pullK3CloudService.SyncDataProduct(transactionContext.(*pg.TransactionContext), fromTime); err != nil { 54 if err := pullK3CloudService.SyncDataProduct(transactionContext.(*pg.TransactionContext), fromTime); err != nil {
52 - log.Logger.Error(err.Error()) 55 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时同步产品"})
53 return nil 56 return nil
54 } 57 }
55 if err = transactionContext.CommitTransaction(); err != nil { 58 if err = transactionContext.CommitTransaction(); err != nil {
@@ -7,6 +7,7 @@ import ( @@ -7,6 +7,7 @@ import (
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory"
8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata"
9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" 9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
  10 + "runtime/debug"
10 "time" 11 "time"
11 ) 12 )
12 13
@@ -14,7 +15,7 @@ import ( @@ -14,7 +15,7 @@ import (
14 func SyncProductPlan(ctx context.Context) error { 15 func SyncProductPlan(ctx context.Context) error {
15 defer func() { 16 defer func() {
16 if r := recover(); r != nil { 17 if r := recover(); r != nil {
17 - log.Logger.Error(fmt.Sprintf("%v", r)) 18 + log.Logger.Error(fmt.Sprintf("%v", r), map[string]interface{}{"task": "定时同步车间计划", "stack": debug.Stack()})
18 } 19 }
19 }() 20 }()
20 transactionContext, err := factory.CreateTransactionContext(nil) 21 transactionContext, err := factory.CreateTransactionContext(nil)
@@ -33,15 +34,18 @@ func SyncProductPlan(ctx context.Context) error { @@ -33,15 +34,18 @@ func SyncProductPlan(ctx context.Context) error {
33 34
34 log.Logger.Debug("【定时同步车间计划】 启动") 35 log.Logger.Debug("【定时同步车间计划】 启动")
35 pullK3CloudService := syncdata.PullDataK3CloudService{} 36 pullK3CloudService := syncdata.PullDataK3CloudService{}
36 - t := ctx.Value("fromTime")  
37 var fromTime time.Time 37 var fromTime time.Time
38 - if t != nil {  
39 - if v, ok := t.(time.Time); ok {  
40 - fromTime = v 38 + if ctx != nil {
  39 + t := ctx.Value("fromTime")
  40 + if t != nil {
  41 + if v, ok := t.(time.Time); ok {
  42 + fromTime = v
  43 + }
41 } 44 }
42 } 45 }
  46 +
43 if err := pullK3CloudService.SyncDataProductPlan(transactionContext.(*pg.TransactionContext), fromTime); err != nil { 47 if err := pullK3CloudService.SyncDataProductPlan(transactionContext.(*pg.TransactionContext), fromTime); err != nil {
44 - log.Logger.Error(err.Error()) 48 + log.Logger.Error(err.Error(), map[string]interface{}{"task": "定时同步车间计划"})
45 return nil 49 return nil
46 } 50 }
47 if err = transactionContext.CommitTransaction(); err != nil { 51 if err = transactionContext.CommitTransaction(); err != nil {
@@ -3,8 +3,9 @@ package mqtt @@ -3,8 +3,9 @@ package mqtt
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 pahomqtt "github.com/eclipse/paho.mqtt.golang" 5 pahomqtt "github.com/eclipse/paho.mqtt.golang"
  6 + "github.com/linmadan/egglib-go/log"
6 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
7 - "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" 8 + "runtime/debug"
8 "time" 9 "time"
9 ) 10 )
10 11
@@ -12,12 +13,15 @@ type SubscribeClient struct { @@ -12,12 +13,15 @@ type SubscribeClient struct {
12 topic string 13 topic string
13 handler pahomqtt.MessageHandler 14 handler pahomqtt.MessageHandler
14 client pahomqtt.Client 15 client pahomqtt.Client
  16 + log log.Logger
15 } 17 }
16 18
17 type MessageHandler pahomqtt.MessageHandler 19 type MessageHandler pahomqtt.MessageHandler
18 20
19 -func NewSubscribeClient() *SubscribeClient {  
20 - return &SubscribeClient{} 21 +func NewSubscribeClient(log log.Logger) *SubscribeClient {
  22 + return &SubscribeClient{
  23 + log: log,
  24 + }
21 } 25 }
22 26
23 func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions { 27 func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
@@ -34,28 +38,32 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions { @@ -34,28 +38,32 @@ func (subscribeClient *SubscribeClient) options() *pahomqtt.ClientOptions {
34 38
35 func (subscribeClient *SubscribeClient) Connect() *SubscribeClient { 39 func (subscribeClient *SubscribeClient) Connect() *SubscribeClient {
36 opts := subscribeClient.options() 40 opts := subscribeClient.options()
37 - fmt.Println("start connect......") 41 + subscribeClient.log.Info("mqtt start connect......")
38 opts.OnConnectionLost = func(c pahomqtt.Client, err error) { 42 opts.OnConnectionLost = func(c pahomqtt.Client, err error) {
39 - defer func() {  
40 - if r := recover(); r != nil {  
41 - log.Logger.Error(fmt.Sprintf("%v", r))  
42 - }  
43 - }()  
44 - fmt.Println("Connect error:", err)  
45 - for {  
46 - fmt.Println("reconnect server")  
47 - token := subscribeClient.client.Connect()  
48 - token.Wait()  
49 - fmt.Println("server Connect status:", subscribeClient.client.IsConnectionOpen())  
50 - if subscribeClient.client.IsConnectionOpen() {  
51 - break  
52 - }  
53 - time.Sleep(3 * time.Second)  
54 - } 43 + //defer func() {
  44 + // if r := recover(); r != nil {
  45 + // subscribeClient.log.Info(fmt.Sprintf("%v %s", r, debug.Stack()))
  46 + // }
  47 + //}()
  48 + subscribeClient.log.Info("mqtt connect lost,error:" + err.Error())
  49 + //for {
  50 + // subscribeClient.log.Info("reconnect server")
  51 + // token := subscribeClient.client.Connect()
  52 + // token.Wait()
  53 + // subscribeClient.log.Info(fmt.Sprintf("server Connect status:%v", subscribeClient.client.IsConnectionOpen()))
  54 + // if subscribeClient.client.IsConnectionOpen() {
  55 + // break
  56 + // }
  57 + // time.Sleep(3 * time.Second)
  58 + //}
55 } 59 }
56 opts.OnConnect = func(c pahomqtt.Client) { 60 opts.OnConnect = func(c pahomqtt.Client) {
  61 + subscribeClient.log.Info("mqtt reconnected")
57 c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler) 62 c.Subscribe(subscribeClient.topic, 0, subscribeClient.handler)
58 } 63 }
  64 + opts.OnReconnecting = func(client pahomqtt.Client, options *pahomqtt.ClientOptions) {
  65 + subscribeClient.log.Info("mqtt reconnecting...")
  66 + }
59 subscribeClient.client = pahomqtt.NewClient(opts) 67 subscribeClient.client = pahomqtt.NewClient(opts)
60 token := subscribeClient.client.Connect() 68 token := subscribeClient.client.Connect()
61 token.Wait() 69 token.Wait()
@@ -70,13 +78,13 @@ func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler p @@ -70,13 +78,13 @@ func (subscribeClient *SubscribeClient) Subscribe(topic string, messageHandler p
70 token.Done() 78 token.Done()
71 } 79 }
72 80
73 -func StartSubscribe(topic string, handler MessageHandler) { 81 +func StartSubscribe(topic string, handler MessageHandler, log log.Logger) {
74 defer func() { 82 defer func() {
75 if err := recover(); err != nil { 83 if err := recover(); err != nil {
76 - fmt.Println(err)  
77 - StartSubscribe(topic, handler) 84 + log.Error(fmt.Sprintf("%s", debug.Stack()))
  85 + StartSubscribe(topic, handler, log)
78 } 86 }
79 }() 87 }()
80 - fmt.Println("start subscribe...")  
81 - NewSubscribeClient().Connect().Subscribe(topic, pahomqtt.MessageHandler(handler)) 88 + log.Info("mqtt start subscribe...")
  89 + NewSubscribeClient(log).Connect().Subscribe(topic, pahomqtt.MessageHandler(handler))
82 } 90 }
1 package log 1 package log
2 2
3 import ( 3 import (
  4 + "github.com/linmadan/egglib-go/log"
4 "github.com/sirupsen/logrus" 5 "github.com/sirupsen/logrus"
5 "github.com/sirupsen/logrus/hooks/writer" 6 "github.com/sirupsen/logrus/hooks/writer"
6 "io" 7 "io"
@@ -47,7 +48,7 @@ func (logger *LogrusLogger) AddHook(w io.Writer) { @@ -47,7 +48,7 @@ func (logger *LogrusLogger) AddHook(w io.Writer) {
47 level := logger.logrus.Level 48 level := logger.logrus.Level
48 var levels []logrus.Level 49 var levels []logrus.Level
49 // 默认已经添加了一个当前log level的hook,所以此处 level+1 50 // 默认已经添加了一个当前log level的hook,所以此处 level+1
50 - for i := 0; i < (int(level)); i++ { 51 + for i := 0; i <= (int(level)); i++ {
51 levels = append(levels, logrus.Level(i)) 52 levels = append(levels, logrus.Level(i))
52 } 53 }
53 logger.logrus.AddHook(&writer.Hook{ 54 logger.logrus.AddHook(&writer.Hook{
@@ -120,3 +121,5 @@ func NewLogrusLogger() *LogrusLogger { @@ -120,3 +121,5 @@ func NewLogrusLogger() *LogrusLogger {
120 logrus: logger, 121 logrus: logger,
121 } 122 }
122 } 123 }
  124 +
  125 +var _ log.Logger = (*LogrusLogger)(nil)
@@ -2,6 +2,7 @@ package mqtt @@ -2,6 +2,7 @@ package mqtt
2 2
3 import ( 3 import (
4 pahomqtt "github.com/eclipse/paho.mqtt.golang" 4 pahomqtt "github.com/eclipse/paho.mqtt.golang"
  5 + logimp "github.com/linmadan/egglib-go/log"
5 "github.com/linmadan/egglib-go/utils/json" 6 "github.com/linmadan/egglib-go/utils/json"
6 "github.com/tidwall/gjson" 7 "github.com/tidwall/gjson"
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/deviceCollection/command" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/deviceCollection/command"
@@ -14,8 +15,8 @@ import ( @@ -14,8 +15,8 @@ import (
14 "time" 15 "time"
15 ) 16 )
16 17
17 -func Start() {  
18 - mqtt.StartSubscribe(constant.MQTT_TOPIC, OnReceiveData) 18 +func Start(log logimp.Logger) {
  19 + mqtt.StartSubscribe(constant.MQTT_TOPIC, OnReceiveData, log)
19 } 20 }
20 21
21 func OnReceiveData(client pahomqtt.Client, message pahomqtt.Message) { 22 func OnReceiveData(client pahomqtt.Client, message pahomqtt.Message) {