作者 yangfu

补充消息修改,榜单榜作业多线程

@@ -248,7 +248,7 @@ from user_msg where receive_user_id=? and source_type=1 and msg_type=? and send @@ -248,7 +248,7 @@ from user_msg where receive_user_id=? and source_type=1 and msg_type=? and send
248 func GetChanceReviseMsg(uid, lastId int64, pageSize int, msgType int, v interface{}) (total int, err error) { 248 func GetChanceReviseMsg(uid, lastId int64, pageSize int, msgType int, v interface{}) (total int, err error) {
249 sql := `select a.*,b.images,b.speechs,b.videos from ( 249 sql := `select a.*,b.images,b.speechs,b.videos from (
250 select a.*,b.source_content,b.enable_status,b.user_id chance_user_id,b.create_at,b.review_status,b.status from ( 250 select a.*,b.source_content,b.enable_status,b.user_id chance_user_id,b.create_at,b.review_status,b.status from (
251 -select id msg_id,message content,source_type,source_id,is_read,create_at msg_time,chance_id,receive_user_id,sender_user_id from user_msg 251 +select id msg_id,message,source_type,source_id,is_read,create_at msg_time,chance_id,receive_user_id,sender_user_id from user_msg
252 where receive_user_id =? and (?=0 or id<?) and msg_type=? 252 where receive_user_id =? and (?=0 or id<?) and msg_type=?
253 )a left outer join chance b on a.chance_id = b.id 253 )a left outer join chance b on a.chance_id = b.id
254 )a left outer join chance_data b on a.chance_id = b.chance_id 254 )a left outer join chance_data b on a.chance_id = b.chance_id
@@ -343,6 +343,7 @@ type ChanceReviseItemOrm struct { @@ -343,6 +343,7 @@ type ChanceReviseItemOrm struct {
343 type MsgItemOrm struct { 343 type MsgItemOrm struct {
344 MsgId int64 `orm:"column(msg_id)"` 344 MsgId int64 `orm:"column(msg_id)"`
345 MsgTime time.Time `orm:"column(msg_time)"` //收藏时间 345 MsgTime time.Time `orm:"column(msg_time)"` //收藏时间
  346 + Message string `orm:"column(message)"`
346 //评论对象类型 347 //评论对象类型
347 SourceType int `orm:"column(source_type)"` 348 SourceType int `orm:"column(source_type)"`
348 SourceId int64 `orm:"column(source_id)"` 349 SourceId int64 `orm:"column(source_id)"`
@@ -10,6 +10,7 @@ import ( @@ -10,6 +10,7 @@ import (
10 "opp/models" 10 "opp/models"
11 "opp/protocol" 11 "opp/protocol"
12 "reflect" 12 "reflect"
  13 + "sync"
13 "sync/atomic" 14 "sync/atomic"
14 "time" 15 "time"
15 ) 16 )
@@ -63,22 +64,49 @@ func ComputeRankScore() (err error) { @@ -63,22 +64,49 @@ func ComputeRankScore() (err error) {
63 defer func() { 64 defer func() {
64 atomic.CompareAndSwapInt32(&ComputeRankScoreFlag, 1, 0) 65 atomic.CompareAndSwapInt32(&ComputeRankScoreFlag, 1, 0)
65 }() 66 }()
66 - //TODO:启用多个协程执行  
67 - if err = computeRankScore(); err != nil {  
68 - log.Error(err) 67 + //启用多个协程执行
  68 + var wg sync.WaitGroup
  69 + for i := 0; i < RankGoroutineNum; i++ {
  70 + index := i
  71 + work := func() {
  72 + if err = computeRankScore(index, wg); err != nil {
  73 + log.Error(err)
  74 + }
  75 + }
  76 + go work()
69 } 77 }
  78 + wg.Wait()
70 79
71 //更新状态 80 //更新状态
72 - //结束过期的到期的榜单 81 + //结束进行中 已到期的榜单
  82 + updateRankPeriodStatus(protocol.RankPeriodBegin, protocol.RankPeriodEnd)
73 //开始等待的榜单 83 //开始等待的榜单
  84 + updateRankPeriodStatus(protocol.RankPeriodWaiting, protocol.RankPeriodBegin)
74 return 85 return
75 } 86 }
76 -func computeRankScore() (err error) { 87 +func updateRankPeriodStatus(fromStatus int, toStatus int) {
  88 + var (
  89 + sql = `update rank_period set status=?,update_at=NOW() where status=? and UNIX_TIMESTAMP(end_time)<? `
  90 + )
  91 + if toStatus == protocol.RankPeriodBegin {
  92 + sql = `update rank_period set status=?,update_at=NOW() where status=? and UNIX_TIMESTAMP(begin_time)>? `
  93 + }
  94 + orm := orm.NewOrm()
  95 + if err := utils.ExecuteSQLWithOrmer(orm, sql, toStatus, fromStatus, time.Now().Unix()); err != nil {
  96 + log.Error(err)
  97 + return
  98 + }
  99 +}
  100 +
  101 +func computeRankScore(index int, wg sync.WaitGroup) (err error) {
77 var ( 102 var (
78 periods []*models.RankPeriod 103 periods []*models.RankPeriod
79 rankRanges []*models.RankRange 104 rankRanges []*models.RankRange
80 rankRangeDatas []*models.RankRangeData 105 rankRangeDatas []*models.RankRangeData
81 ) 106 )
  107 + wg.Add(1)
  108 + defer wg.Done()
  109 +
82 defer func() { 110 defer func() {
83 if p := recover(); p != nil { 111 if p := recover(); p != nil {
84 log.Error(p) 112 log.Error(p)
@@ -91,6 +119,9 @@ func computeRankScore() (err error) { @@ -91,6 +119,9 @@ func computeRankScore() (err error) {
91 //2.查询对应 rank_type_id 的rank_data 119 //2.查询对应 rank_type_id 的rank_data
92 for i := range periods { 120 for i := range periods {
93 period := periods[i] 121 period := periods[i]
  122 + if (period.Id % RankGoroutineNum) != index {
  123 + return
  124 + }
94 if rankRanges, err = models.GetRankRanges(period.CompanyId, period.RankTypeId); err == orm.ErrNoRows { 125 if rankRanges, err = models.GetRankRanges(period.CompanyId, period.RankTypeId); err == orm.ErrNoRows {
95 continue 126 continue
96 } 127 }
@@ -125,6 +156,7 @@ func computeRankScore() (err error) { @@ -125,6 +156,7 @@ func computeRankScore() (err error) {
125 } 156 }
126 updateRankByRelationIds(relationIds, period, rankRange) 157 updateRankByRelationIds(relationIds, period, rankRange)
127 } 158 }
  159 + logProcessInfo(period)
128 } 160 }
129 161
130 //4.调用统计接口列表 162 //4.调用统计接口列表
@@ -134,6 +166,10 @@ func computeRankScore() (err error) { @@ -134,6 +166,10 @@ func computeRankScore() (err error) {
134 return nil 166 return nil
135 } 167 }
136 168
  169 +func logProcessInfo(period *models.RankPeriod) {
  170 + log.Debug(fmt.Sprintf("【排行榜统计】 周期编号:%v 赛季名称:%v (榜单类型:%v) ", period.Id, period.SeasonName, period.RankTypeId))
  171 +}
  172 +
137 //更新排行榜按关联id 173 //更新排行榜按关联id
138 func updateRankByRelationIds(relationIds []int64, period *models.RankPeriod, rankRange *models.RankRange) { 174 func updateRankByRelationIds(relationIds []int64, period *models.RankPeriod, rankRange *models.RankRange) {
139 var option RankOption 175 var option RankOption
@@ -31,6 +31,9 @@ func MessageCenter(header *protocol.RequestHeader, request *protocol.MessageCent @@ -31,6 +31,9 @@ func MessageCenter(header *protocol.RequestHeader, request *protocol.MessageCent
31 if request.MsgType&protocol.MsgTypeAuditBy == 0 { 31 if request.MsgType&protocol.MsgTypeAuditBy == 0 {
32 request.MsgType |= protocol.MsgTypeAuditBy 32 request.MsgType |= protocol.MsgTypeAuditBy
33 } 33 }
  34 + if request.MsgType&protocol.MsgTypeChanceRevise == 0 {
  35 + request.MsgType |= protocol.MsgTypeChanceRevise
  36 + }
34 } 37 }
35 err = models.GetUserMsgTotals(header.UserId, header.CompanyId, request.MsgType, &rsp.Totals) 38 err = models.GetUserMsgTotals(header.UserId, header.CompanyId, request.MsgType, &rsp.Totals)
36 if rsp.Totals == nil || len(rsp.Totals) == 0 { 39 if rsp.Totals == nil || len(rsp.Totals) == 0 {
@@ -620,6 +623,7 @@ func MsgChanceRevise(header *protocol.RequestHeader, request *protocol.MsgChance @@ -620,6 +623,7 @@ func MsgChanceRevise(header *protocol.RequestHeader, request *protocol.MsgChance
620 commItem.Chance.Provider = provider 623 commItem.Chance.Provider = provider
621 commItem.ChanceId = chance.ChanceId 624 commItem.ChanceId = chance.ChanceId
622 commItem.ReviewStatus = chance.ReviewStatus 625 commItem.ReviewStatus = chance.ReviewStatus
  626 + commItem.Message = chance.Message
623 rsp.List = append(rsp.List, commItem) 627 rsp.List = append(rsp.List, commItem)
624 } 628 }
625 return 629 return