作者 yangfu

fet: support batch message handler

@@ -33,6 +33,7 @@ type ( @@ -33,6 +33,7 @@ type (
33 BatchSize int `json:",default=100"` 33 BatchSize int `json:",default=100"`
34 ThreadSize int `json:",default=2"` 34 ThreadSize int `json:",default=2"`
35 Interval int `json:",default=5"` 35 Interval int `json:",default=5"`
  36 + SleepInterval int `json:",default=2"`
36 } 37 }
37 38
38 Filter struct { 39 Filter struct {
@@ -35,13 +35,14 @@ func NewMessageHandlerPG(constant config.PostgresqlConf) *MessageHandlerPG { @@ -35,13 +35,14 @@ func NewMessageHandlerPG(constant config.PostgresqlConf) *MessageHandlerPG {
35 db: DB, 35 db: DB,
36 conf: constant, 36 conf: constant,
37 queue: collection.NewQueue(constant.MaxQueueSize), 37 queue: collection.NewQueue(constant.MaxQueueSize),
38 - limitCount: int32(float64(constant.MaxQueueSize) * 0.9), 38 + limitCount: int32(float64(constant.MaxQueueSize) * 0.8),
39 } 39 }
40 40
41 for i := 0; i < constant.ThreadSize; i++ { 41 for i := 0; i < constant.ThreadSize; i++ {
42 - go handler.TimerConsume() 42 + thread :=i
  43 + go handler.TimerConsume(thread)
43 } 44 }
44 - go handler.timerCreateLogTable() 45 + go handler.TimerCreateLogTable()
45 return handler 46 return handler
46 } 47 }
47 48
@@ -63,22 +64,22 @@ func (mh *MessageHandlerPG) Consume(_, val string) error { @@ -63,22 +64,22 @@ func (mh *MessageHandlerPG) Consume(_, val string) error {
63 mh.queue.Put(m) 64 mh.queue.Put(m)
64 atomic.AddInt32(&mh.count, 1) 65 atomic.AddInt32(&mh.count, 1)
65 if mh.count > mh.limitCount { 66 if mh.count > mh.limitCount {
66 - if _, ok := mh.queue.Take(); ok {  
67 - atomic.AddInt32(&mh.count, -1)  
68 - } 67 + time.Sleep(time.Second * time.Duration(mh.conf.SleepInterval))
69 } 68 }
70 return nil 69 return nil
71 } 70 }
72 71
73 -func (mh *MessageHandlerPG) TimerConsume() { 72 +func (mh *MessageHandlerPG) TimerConsume(threadId int) {
  73 + fmt.Printf("[logstash] Begin TimerConsume : %v \n",threadId)
74 t := time.Tick(time.Second * time.Duration(mh.conf.Interval)) 74 t := time.Tick(time.Second * time.Duration(mh.conf.Interval))
75 for range t { 75 for range t {
76 threading.RunSafe( 76 threading.RunSafe(
77 func() { 77 func() {
  78 + for mh.count > 0 {
78 var logs []*Logs 79 var logs []*Logs
79 for i := 0; i < mh.conf.BatchSize; i++ { 80 for i := 0; i < mh.conf.BatchSize; i++ {
80 item, ok := mh.queue.Take() 81 item, ok := mh.queue.Take()
81 - if !ok { 82 + if !ok || item == nil {
82 break 83 break
83 } 84 }
84 atomic.AddInt32(&mh.count, -1) 85 atomic.AddInt32(&mh.count, -1)
@@ -90,27 +91,33 @@ func (mh *MessageHandlerPG) TimerConsume() { @@ -90,27 +91,33 @@ func (mh *MessageHandlerPG) TimerConsume() {
90 } 91 }
91 } 92 }
92 if len(logs) > 0 { 93 if len(logs) > 0 {
93 - _, err := mh.db.Model(&logs).Insert()  
94 - fmt.Println(err) 94 + if _, err := mh.db.Model(&logs).Insert(); err != nil {
  95 + fmt.Println("[logstash] Insert Error:", err)
  96 + }
  97 + }
  98 + if mh.count>0{
  99 + fmt.Printf("[logstash] Thread:%v Queue:%v \n",threadId, mh.count)
  100 + }
95 } 101 }
96 - fmt.Printf("logstash thread:%v queue:%v \n", threading.RoutineId(), mh.count)  
97 }, 102 },
98 ) 103 )
99 } 104 }
100 } 105 }
101 106
102 -func (mh *MessageHandlerPG) timerCreateLogTable() { 107 +func (mh *MessageHandlerPG) TimerCreateLogTable() {
103 t := time.NewTimer(time.Hour * 6) 108 t := time.NewTimer(time.Hour * 6)
  109 + fmt.Printf("[logstash] Begin TimerCreateLogTable \n")
  110 + mh.createLogTable()
104 for range t.C { 111 for range t.C {
105 threading.RunSafe( 112 threading.RunSafe(
106 func() { 113 func() {
107 - mh.timerCreateLogTable() 114 + mh.createLogTable()
108 }, 115 },
109 ) 116 )
110 } 117 }
111 } 118 }
112 119
113 -func (mh *MessageHandlerPG) TimerCreateLogTable() { 120 +func (mh *MessageHandlerPG) createLogTable() {
114 var err error 121 var err error
115 // creates database schema for Log models. 122 // creates database schema for Log models.
116 err = mh.db.Model(&Logs{}).CreateTable(&orm.CreateTableOptions{ 123 err = mh.db.Model(&Logs{}).CreateTable(&orm.CreateTableOptions{
@@ -128,6 +135,7 @@ func (mh *MessageHandlerPG) TimerCreateLogTable() { @@ -128,6 +135,7 @@ func (mh *MessageHandlerPG) TimerCreateLogTable() {
128 if err != nil { 135 if err != nil {
129 log.Fatal(err) 136 log.Fatal(err)
130 } 137 }
  138 +
131 // logData := &Logs{ 139 // logData := &Logs{
132 // Log: map[string]interface{}{"msg":"test"}, 140 // Log: map[string]interface{}{"msg":"test"},
133 // LogTime: logStartTime, 141 // LogTime: logStartTime,
@@ -152,7 +160,7 @@ func createNewPartition(db *pg.DB, currentTime time.Time) error { @@ -152,7 +160,7 @@ func createNewPartition(db *pg.DB, currentTime time.Time) error {
152 firstOfMonth.Format(time.RFC3339Nano), 160 firstOfMonth.Format(time.RFC3339Nano),
153 firstOfNextMonth.Format(time.RFC3339Nano), 161 firstOfNextMonth.Format(time.RFC3339Nano),
154 ) 162 )
155 - 163 + fmt.Println("[logstash] Create Partition:",sql)
156 _, err := db.Exec(sql) 164 _, err := db.Exec(sql)
157 return err 165 return err
158 } 166 }
@@ -80,8 +80,6 @@ func main() { @@ -80,8 +80,6 @@ func main() {
80 if len(processor.Output.Postgresql.Host) > 0 { 80 if len(processor.Output.Postgresql.Host) > 0 {
81 handle := handler.NewMessageHandlerPG(processor.Output.Postgresql) 81 handle := handler.NewMessageHandlerPG(processor.Output.Postgresql)
82 handle.AddFilters(filters...) 82 handle.AddFilters(filters...)
83 - handle.TimerCreateLogTable()  
84 -  
85 83
86 for _, k := range toKqConf(processor.Input.Kafka) { 84 for _, k := range toKqConf(processor.Input.Kafka) {
87 group.Add(kq.MustNewQueue(k, handle)) 85 group.Add(kq.MustNewQueue(k, handle))