作者 庄敏学

fixed

... ... @@ -58,17 +58,16 @@ func main() {
db.Migrate(ctx.DB)
//启动消费队列
startConsume(c)
startConsume(ctx)
logx.Infof("Starting server at %s:%d... \n", c.Host, c.Port)
server.Start()
}
func startConsume(c config.Config) {
func startConsume(svcCtx *svc.ServiceContext) {
//kafka消费队列 处理字库推送事件
go func() {
svcCtx := svc.NewServiceContext(c)
queue, err := kq.NewQueue(c.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
queue, err := kq.NewQueue(svcCtx.Config.KqConsumerConf, consumer.NewByteNoticeLogic(svcCtx))
if err != nil {
panic(err)
} else {
... ... @@ -78,8 +77,7 @@ func startConsume(c config.Config) {
//redis消费队列 处理表数据存储到本地
go func() {
for {
svcCtx := svc.NewServiceContext(c)
str, err := svcCtx.Redis.Rpop(c.Name + ":table_data")
str, err := svcCtx.Redis.Rpop(svcCtx.Config.Name + ":table_data")
if err == nil {
_ = consumer.NewByteTableDataLogic(svcCtx).Sync(str)
}
... ...