package main

import (
	"flag"
	"net/http"
	"time"

	"github.com/beego/beego/v2/server/web"
	"github.com/linmadan/egglib-go/log"
	"github.com/linmadan/egglib-go/log/logrus"

	"github.com/olivere/elastic/v7"
	"github.com/tal-tech/go-queue/kq"
	"github.com/tal-tech/go-stash/stash/config"
	"github.com/tal-tech/go-stash/stash/es"
	"github.com/tal-tech/go-stash/stash/filter"
	"github.com/tal-tech/go-stash/stash/handler"
	"github.com/tal-tech/go-zero/core/conf"
	"github.com/tal-tech/go-zero/core/logx"
	"github.com/tal-tech/go-zero/core/proc"
	"github.com/tal-tech/go-zero/core/service"
)

var configFile = flag.String("f", "etc/config.yaml", "Specify the config file")

func toKqConf(c config.KafkaConf) []kq.KqConf {
	var ret []kq.KqConf

	for _, topic := range c.Topics {
		ret = append(ret, kq.KqConf{
			ServiceConf: c.ServiceConf,
			Brokers:     c.Brokers,
			Group:       c.Group,
			Topic:       topic,
			Offset:      c.Offset,
			Conns:       c.Conns,
			Consumers:   c.Consumers,
			Processors:  c.Processors,
			MinBytes:    c.MinBytes,
			MaxBytes:    c.MaxBytes,
		})
	}

	return ret
}

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)
	proc.SetTimeToForceQuit(c.GracePeriod)

	group := service.NewServiceGroup()
	defer group.Stop()

	for _, processor := range c.Clusters {
		filters := filter.CreateFilters(processor)
		if len(processor.Output.ElasticSearch.Hosts) > 0 {
			client, err := elastic.NewClient(
				elastic.SetSniff(false),
				elastic.SetURL(processor.Output.ElasticSearch.Hosts...),
			)
			logx.Must(err)

			writer, err := es.NewWriter(processor.Output.ElasticSearch)
			logx.Must(err)

			var loc *time.Location
			if len(processor.Output.ElasticSearch.TimeZone) > 0 {
				loc, err = time.LoadLocation(processor.Output.ElasticSearch.TimeZone)
				logx.Must(err)
			} else {
				loc = time.Local
			}
			indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
			handle := handler.NewHandler(writer, indexer)
			handle.AddFilters(filters...)
			handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
			for _, k := range toKqConf(processor.Input.Kafka) {
				group.Add(kq.MustNewQueue(k, handle))
			}
		}
		if len(processor.Output.Postgresql.Host) > 0 {
			handle := handler.NewMessageChunkHandlerPG(processor.Output.Postgresql)
			handle.AddFilters(filters...)

			for _, k := range toKqConf(processor.Input.Kafka) {
				group.Add(kq.MustNewQueue(k, handle))
			}
		}
	}

	InitLog()
	go BeegoServeHttp()

	group.Start()
}

var Logger log.Logger
const kafkaHost = "192.168.0.250:9092,192.168.0.251:9092,192.168.0.252:9092"//
const topic ="go_stash_dev"

func InitLog() {
	Logger = logrus.NewLogrusLogger()
	Logger.SetServiceName("log-stash-test")
	Logger.SetLevel("debug")
	w, _ := logrus.NewKafkaWriter(kafkaHost, topic, false)
	Logger.AddHook(w)
}

func BeegoServeHttp() {
	web.Handler("/debug", http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
		msg := r.URL.Query().Get("msg")
		if len(msg) == 0 {
			msg = "default msg"
		}
		Logger.Debug(msg, map[string]interface{}{"method": r.Method, "query": r.URL.RawQuery, "@timestamp": time.Now()})
	}))

	web.Run(":8082")
}

func ServeHttp() {
	http.HandleFunc("/", func(rw http.ResponseWriter, r *http.Request) {
		msg := r.URL.Query().Get("msg")
		if len(msg) == 0 {
			msg = "default msg"
		}
		Logger.Debug(msg, map[string]interface{}{"method": r.Method, "query": r.URL.RawQuery, "@timestamp": time.Now()})
	})

	http.ListenAndServe(":8082", http.DefaultServeMux)
}