作者 yangfu

pg chunk message

  1 +package handler
  2 +
  3 +import (
  4 + "fmt"
  5 + "log"
  6 + "time"
  7 +
  8 + "github.com/go-pg/pg/v10"
  9 + "github.com/go-pg/pg/v10/orm"
  10 + jsoniter "github.com/json-iterator/go"
  11 + "github.com/tal-tech/go-stash/stash/config"
  12 + "github.com/tal-tech/go-stash/stash/filter"
  13 + "github.com/tal-tech/go-zero/core/executors"
  14 + "github.com/tal-tech/go-zero/core/threading"
  15 +)
  16 +
  17 +const timestampKey = "time"
  18 +
  19 +type MessageChunkHandlerPG struct {
  20 + filters []filter.FilterFunc
  21 + db *pg.DB
  22 + conf config.PostgresqlConf
  23 + inserter *executors.ChunkExecutor
  24 +}
  25 +
  26 +func NewMessageChunkHandlerPG(constant config.PostgresqlConf) *MessageChunkHandlerPG {
  27 + DB := pg.Connect(&pg.Options{
  28 + User: constant.User,
  29 + Password: constant.Password,
  30 + Database: constant.DBName,
  31 + Addr: fmt.Sprintf("%s:%s", constant.Host, constant.Port),
  32 + })
  33 + handler := &MessageChunkHandlerPG{
  34 + db: DB,
  35 + conf: constant,
  36 + }
  37 + handler.inserter = executors.NewChunkExecutor(handler.execute, executors.WithChunkBytes(constant.BatchSize*1024)) //大小 1KB * 100
  38 + go handler.TimerCreateLogTable()
  39 + return handler
  40 +}
  41 +
  42 +func (mh *MessageChunkHandlerPG) AddFilters(filters ...filter.FilterFunc) {
  43 + mh.filters = append(mh.filters, filters...)
  44 +}
  45 +
  46 +func (mh *MessageChunkHandlerPG) Consume(_, val string) error {
  47 + var m map[string]interface{}
  48 + if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
  49 + return err
  50 + }
  51 +
  52 + for _, proc := range mh.filters {
  53 + if m = proc(m); m == nil {
  54 + return nil
  55 + }
  56 + }
  57 + return mh.inserter.Add(m, len([]byte(val)))
  58 +}
  59 +
  60 +func (mh *MessageChunkHandlerPG) execute(tasks []interface{}) {
  61 + begin :=time.Now()
  62 + var logs []*Logs
  63 + for i := 0; i < len(tasks); i++ {
  64 + item := tasks[i]
  65 + if m, ok := item.(map[string]interface{}); ok {
  66 + logs = append(logs, &Logs{
  67 + Log: m,
  68 + LogTime: getTime(m),
  69 + })
  70 + }
  71 + }
  72 + if len(logs) > 0 {
  73 + if _, err := mh.db.Model(&logs).Insert(); err != nil {
  74 + fmt.Println("[logstash] Insert Error:", err)
  75 + }
  76 +
  77 + end :=time.Now()
  78 + fmt.Printf("[logstash] Insert task:%v cost:%v \n", len(tasks),end.Sub(begin))
  79 + }
  80 +}
  81 +
  82 +func (mh *MessageChunkHandlerPG) TimerCreateLogTable() {
  83 + t := time.NewTimer(time.Hour * 6)
  84 + fmt.Printf("[logstash] Begin TimerCreateLogTable \n")
  85 + mh.createLogTable()
  86 + for range t.C {
  87 + threading.RunSafe(
  88 + func() {
  89 + mh.createLogTable()
  90 + },
  91 + )
  92 + }
  93 +}
  94 +
  95 +func (mh *MessageChunkHandlerPG) createLogTable() {
  96 + var err error
  97 + // creates database schema for Log models.
  98 + err = mh.db.Model(&Logs{}).CreateTable(&orm.CreateTableOptions{
  99 + IfNotExists: true,
  100 + })
  101 + if err != nil {
  102 + log.Fatal(err)
  103 + }
  104 +
  105 + logStartTime := time.Now()
  106 + logEndTime := logStartTime.AddDate(0, 3, 0)
  107 + for logStartTime.Unix() <= logEndTime.Unix() {
  108 + // Before insert, always try create partition
  109 + err = createNewPartition(mh.db, logStartTime)
  110 + if err != nil {
  111 + log.Fatal(err)
  112 + }
  113 + logStartTime = logStartTime.AddDate(0, 1, 0)
  114 + }
  115 +}
  116 +
  117 +func getTime(m map[string]interface{}) time.Time {
  118 + if ti, ok := m[timestampKey]; ok {
  119 + if ts, ok := ti.(string); ok {
  120 + if t, err := time.Parse(time.RFC3339, ts); err == nil {
  121 + return t
  122 + }
  123 + }
  124 + }
  125 +
  126 + return time.Now()
  127 +}
@@ -82,7 +82,7 @@ func main() { @@ -82,7 +82,7 @@ func main() {
82 } 82 }
83 } 83 }
84 if len(processor.Output.Postgresql.Host) > 0 { 84 if len(processor.Output.Postgresql.Host) > 0 {
85 - handle := handler.NewMessageHandlerPG(processor.Output.Postgresql) 85 + handle := handler.NewMessageChunkHandlerPG(processor.Output.Postgresql)
86 handle.AddFilters(filters...) 86 handle.AddFilters(filters...)
87 87
88 for _, k := range toKqConf(processor.Input.Kafka) { 88 for _, k := range toKqConf(processor.Input.Kafka) {
@@ -98,7 +98,7 @@ func main() { @@ -98,7 +98,7 @@ func main() {
98 } 98 }
99 99
100 var Logger log.Logger 100 var Logger log.Logger
101 -const kafkaHost = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"//"192.168.139.129:9092" 101 +const kafkaHost = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"//
102 const topic ="go_stash_dev" 102 const topic ="go_stash_dev"
103 103
104 func InitLog() { 104 func InitLog() {