handler_chunk_pg.go 3.0 KB
package handler

import (
	"fmt"
	"log"
	"time"

	"github.com/go-pg/pg/v10"
	"github.com/go-pg/pg/v10/orm"
	jsoniter "github.com/json-iterator/go"
	"github.com/tal-tech/go-stash/stash/config"
	"github.com/tal-tech/go-stash/stash/filter"
	"github.com/tal-tech/go-zero/core/executors"
	"github.com/tal-tech/go-zero/core/threading"
)

const timestampKey = "time"

type MessageChunkHandlerPG struct {
	filters  []filter.FilterFunc
	db       *pg.DB
	conf     config.PostgresqlConf
	inserter *executors.ChunkExecutor
}

func NewMessageChunkHandlerPG(constant config.PostgresqlConf) *MessageChunkHandlerPG {
	DB := pg.Connect(&pg.Options{
		User:     constant.User,
		Password: constant.Password,
		Database: constant.DBName,
		Addr:     fmt.Sprintf("%s:%s", constant.Host, constant.Port),
	})
	handler := &MessageChunkHandlerPG{
		db:   DB,
		conf: constant,
	}
	handler.inserter = executors.NewChunkExecutor(handler.execute, executors.WithChunkBytes(constant.BatchSize*1024)) //大小 1KB * 100
	go handler.TimerCreateLogTable()
	return handler
}

func (mh *MessageChunkHandlerPG) AddFilters(filters ...filter.FilterFunc) {
	mh.filters = append(mh.filters, filters...)
}

func (mh *MessageChunkHandlerPG) Consume(_, val string) error {
	var m map[string]interface{}
	if err := jsoniter.Unmarshal([]byte(val), &m); err != nil {
		return err
	}

	for _, proc := range mh.filters {
		if m = proc(m); m == nil {
			return nil
		}
	}
	return mh.inserter.Add(m, len([]byte(val)))
}

func (mh *MessageChunkHandlerPG) execute(tasks []interface{}) {
	begin :=time.Now()
	var logs []*Logs
	for i := 0; i < len(tasks); i++ {
		item := tasks[i]
		if m, ok := item.(map[string]interface{}); ok {
			logs = append(logs, &Logs{
				Log:     m,
				LogTime: getTime(m),
			})
		}
	}
	if len(logs) > 0 {
		if _, err := mh.db.Model(&logs).Insert(); err != nil {
			fmt.Println("[logstash] Insert Error:", err)
		}

		end :=time.Now()
		fmt.Printf("[logstash] Insert task:%v cost:%v \n", len(tasks),end.Sub(begin))
	}
}

func (mh *MessageChunkHandlerPG) TimerCreateLogTable() {
	t := time.NewTimer(time.Hour * 6)
	fmt.Printf("[logstash] Begin TimerCreateLogTable \n")
	mh.createLogTable()
	for range t.C {
		threading.RunSafe(
			func() {
				mh.createLogTable()
			},
		)
	}
}

func (mh *MessageChunkHandlerPG) createLogTable() {
	var err error
	// creates database schema for Log models.
	err = mh.db.Model(&Logs{}).CreateTable(&orm.CreateTableOptions{
		IfNotExists: true,
	})
	if err != nil {
		log.Fatal(err)
	}

	logStartTime := time.Now()
	logEndTime := logStartTime.AddDate(0, 3, 0)
	for logStartTime.Unix() <= logEndTime.Unix() {
		// Before insert, always try create partition
		err = createNewPartition(mh.db, logStartTime)
		if err != nil {
			log.Fatal(err)
		}
		logStartTime = logStartTime.AddDate(0, 1, 0)
	}
}

func getTime(m map[string]interface{}) time.Time {
	if ti, ok := m[timestampKey]; ok {
		if ts, ok := ti.(string); ok {
			if t, err := time.Parse(time.RFC3339, ts); err == nil {
				return t
			}
		}
	}

	return time.Now()
}