like.go 10 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/thumbup/model"
  10. xmdl "go-common/app/service/main/thumbup/model"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. xtime "go-common/library/time"
  15. )
  16. func newLikeMsg(msg *databus.Message) (res interface{}, err error) {
  17. likeMsg := new(xmdl.LikeMsg)
  18. if err = json.Unmarshal(msg.Value, &likeMsg); err != nil {
  19. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  20. return
  21. }
  22. log.Info("get like event msg: %+v", likeMsg)
  23. res = likeMsg
  24. return
  25. }
  26. func likeSplit(msg *databus.Message, data interface{}) int {
  27. lm, ok := data.(*xmdl.LikeMsg)
  28. if !ok {
  29. log.Error("get like event msg: not ok %s", msg.Value)
  30. return 0
  31. }
  32. return int(lm.Mid)
  33. }
  34. func (s *Service) likeDo(ms []interface{}) {
  35. for _, m := range ms {
  36. lm, ok := m.(*xmdl.LikeMsg)
  37. if !ok {
  38. log.Error("get like event msg: not ok %+v", m)
  39. continue
  40. }
  41. var (
  42. oldState int8
  43. err error
  44. businessID int64
  45. ctx = context.Background()
  46. )
  47. if businessID, err = s.checkBusinessOrigin(lm.Business, lm.OriginID); err != nil {
  48. log.Warn("like event: business(%v, %v) err: +v", lm.Business, lm.OriginID)
  49. continue
  50. }
  51. if oldState, err = s.dao.LikeState(ctx, lm.Mid, businessID, lm.OriginID, lm.MessageID); err != nil {
  52. log.Warn("like event: likeState(%+v) err: +v", lm)
  53. time.Sleep(time.Millisecond * 50)
  54. continue
  55. }
  56. var newState, likeType int8
  57. if newState, likeType, err = s.checkState(ctx, oldState, lm); err != nil {
  58. log.Warn("repeat like mid(%d) likeType(%d) oldState(%d) newState(%d) bid(%d) oid(%d) messageID(%d)",
  59. lm.Mid, likeType, oldState, newState, businessID, lm.OriginID, lm.MessageID)
  60. continue
  61. }
  62. var stat model.Stats
  63. if stat, err = s.dao.UpdateLikeState(ctx, lm.Mid, businessID, lm.OriginID, lm.MessageID, newState, lm.LikeTime); err != nil {
  64. log.Warn("like event: UpdateLikeState(%+v) err: +v", lm)
  65. time.Sleep(time.Millisecond * 50)
  66. continue
  67. }
  68. // 聚合数据
  69. key := fmt.Sprintf("%d-%d-%d", businessID, lm.OriginID, lm.MessageID)
  70. s.merge.Add(ctx, key, lm)
  71. stat = calculateCount(stat, likeType)
  72. s.updateCache(ctx, lm.Mid, businessID, lm.OriginID, lm.MessageID, likeType, lm.LikeTime, &stat)
  73. s.dao.PubStatDatabus(ctx, lm.Business, lm.Mid, &stat, lm.UpMid)
  74. log.Info("like event: like success params(%+v)", m)
  75. // 拜年祭
  76. target := s.mergeTarget(lm.Business, lm.MessageID)
  77. if target <= 0 {
  78. continue
  79. }
  80. if stat, err = s.dao.Stat(ctx, businessID, 0, target); err != nil {
  81. continue
  82. }
  83. lm.MessageID = target
  84. key = fmt.Sprintf("%d-%d-%d", businessID, 0, target)
  85. s.merge.Add(ctx, key, lm)
  86. stat = calculateCount(stat, likeType)
  87. s.updateStatCache(ctx, businessID, 0, &stat)
  88. s.dao.PubStatDatabus(ctx, lm.Business, lm.Mid, &stat, 0)
  89. log.Info("like success params(%+v)", m)
  90. }
  91. }
  92. func (s *Service) countsSplit(key string) int {
  93. messageIDStr := strings.Split(key, "-")[2]
  94. messageID, _ := strconv.Atoi(messageIDStr)
  95. return messageID % s.c.Merge.Worker
  96. }
  97. func (s *Service) updateCountsDo(c context.Context, ch int, values map[string][]interface{}) {
  98. mItem := make(map[model.LikeItem]*model.LikeCounts)
  99. for _, vs := range values {
  100. for _, v := range vs {
  101. item := v.(*xmdl.LikeMsg)
  102. stat := calculateCount(model.Stats{}, item.Type)
  103. likesCount := stat.Likes
  104. dislikesCount := stat.Dislikes
  105. likeItem := model.LikeItem{
  106. Business: item.Business,
  107. OriginID: item.OriginID,
  108. MessageID: item.MessageID,
  109. }
  110. if mItem[likeItem] == nil {
  111. mItem[likeItem] = &model.LikeCounts{Like: likesCount, Dislike: dislikesCount, UpMid: item.UpMid}
  112. } else {
  113. mItem[likeItem].Like += likesCount
  114. mItem[likeItem].Dislike += dislikesCount
  115. if item.UpMid > 0 {
  116. mItem[likeItem].UpMid = item.UpMid
  117. }
  118. }
  119. }
  120. }
  121. for item, count := range mItem {
  122. for i := 0; i < _retryTimes; i++ {
  123. if err := s.dao.UpdateCounts(context.Background(), s.businessMap[item.Business].ID, item.OriginID, item.MessageID, count.Like, count.Dislike, count.UpMid); err == nil {
  124. break
  125. }
  126. }
  127. }
  128. }
  129. // checkBusiness .
  130. func (s *Service) checkBusiness(business string) (id int64, err error) {
  131. b := s.businessMap[business]
  132. if b == nil {
  133. err = ecode.ThumbupBusinessBlankErr
  134. return
  135. }
  136. id = b.ID
  137. return
  138. }
  139. // checkBusinessOrigin .
  140. func (s *Service) checkBusinessOrigin(business string, originID int64) (id int64, err error) {
  141. b := s.businessMap[business]
  142. if b == nil {
  143. err = ecode.ThumbupBusinessBlankErr
  144. return
  145. }
  146. if (b.EnableOriginID == 1 && originID == 0) || (b.EnableOriginID == 0 && originID != 0) {
  147. err = ecode.ThumbupOriginErr
  148. return
  149. }
  150. id = b.ID
  151. return
  152. }
  153. // updateCache .
  154. func (s *Service) updateCache(c context.Context, mid, businessID, originID, messageID int64, likeType int8, likeTime time.Time, stat *model.Stats) {
  155. if stat != nil {
  156. s.updateStatCache(c, businessID, originID, stat)
  157. }
  158. business := s.businessIDMap[businessID]
  159. likeRecord := &model.ItemLikeRecord{MessageID: messageID, Time: xtime.Time(likeTime.Unix())}
  160. userRecord := &model.UserLikeRecord{Mid: mid, Time: xtime.Time(likeTime.Unix())}
  161. switch likeType {
  162. case model.TypeLike:
  163. if business.EnableUserLikeList() {
  164. s.addUserlikeRecord(c, mid, businessID, model.StateLike, likeRecord)
  165. }
  166. if business.EnableItemLikeList() {
  167. s.addItemlikeRecord(c, businessID, messageID, model.StateLike, userRecord)
  168. }
  169. case model.TypeCancelLike:
  170. if business.EnableUserLikeList() {
  171. s.dao.DelUserLikeCache(c, mid, businessID, messageID, model.StateLike)
  172. }
  173. if business.EnableItemLikeList() {
  174. s.dao.DelItemLikeCache(c, messageID, businessID, mid, model.StateLike)
  175. }
  176. case model.TypeDislike:
  177. if business.EnableUserDislikeList() {
  178. s.addUserlikeRecord(c, mid, businessID, model.StateDislike, likeRecord)
  179. }
  180. if business.EnableItemDislikeList() {
  181. s.addItemlikeRecord(c, businessID, messageID, model.StateDislike, userRecord)
  182. }
  183. case model.TypeCancelDislike:
  184. if business.EnableUserDislikeList() {
  185. s.dao.DelUserLikeCache(c, mid, businessID, messageID, model.StateDislike)
  186. }
  187. if business.EnableItemDislikeList() {
  188. s.dao.DelItemLikeCache(c, messageID, businessID, mid, model.StateDislike)
  189. }
  190. case model.TypeLikeReverse:
  191. if business.EnableUserLikeList() {
  192. s.dao.DelUserLikeCache(c, mid, businessID, messageID, model.StateLike)
  193. }
  194. if business.EnableItemLikeList() {
  195. s.dao.DelItemLikeCache(c, messageID, businessID, mid, model.StateLike)
  196. }
  197. if business.EnableUserDislikeList() {
  198. s.addUserlikeRecord(c, mid, businessID, model.StateDislike, likeRecord)
  199. }
  200. if business.EnableItemDislikeList() {
  201. s.addItemlikeRecord(c, businessID, messageID, model.StateDislike, userRecord)
  202. }
  203. case model.TypeDislikeReverse:
  204. if business.EnableUserDislikeList() {
  205. s.dao.DelUserLikeCache(c, mid, businessID, messageID, model.StateDislike)
  206. }
  207. if business.EnableItemDislikeList() {
  208. s.dao.DelItemLikeCache(c, messageID, businessID, mid, model.StateDislike)
  209. }
  210. if business.EnableUserLikeList() {
  211. s.addUserlikeRecord(c, mid, businessID, model.StateLike, likeRecord)
  212. }
  213. if business.EnableItemLikeList() {
  214. s.addItemlikeRecord(c, businessID, messageID, model.StateLike, userRecord)
  215. }
  216. }
  217. }
  218. // updateStateCache .
  219. func (s *Service) updateStatCache(c context.Context, businessID, originID int64, stat *model.Stats) (err error) {
  220. if stat == nil {
  221. return
  222. }
  223. var ok bool
  224. if originID == 0 {
  225. err = s.dao.AddStatsCache(c, businessID, stat)
  226. } else {
  227. if ok, err = s.dao.ExpireHashStatsCache(c, businessID, originID); ok {
  228. err = s.dao.AddHashStatsCache(c, businessID, originID, stat)
  229. }
  230. }
  231. return
  232. }
  233. func calculateCount(stat model.Stats, typ int8) model.Stats {
  234. var likesCount, dislikeCount int64
  235. switch typ {
  236. case model.TypeLike:
  237. likesCount = 1
  238. case model.TypeCancelLike:
  239. likesCount = -1
  240. case model.TypeDislike:
  241. dislikeCount = 1
  242. case model.TypeCancelDislike:
  243. dislikeCount = -1
  244. case model.TypeLikeReverse:
  245. likesCount = -1
  246. dislikeCount = 1
  247. case model.TypeDislikeReverse:
  248. likesCount = 1
  249. dislikeCount = -1
  250. }
  251. stat.Likes += likesCount
  252. stat.Dislikes += dislikeCount
  253. return stat
  254. }
  255. // checkState .
  256. func (s *Service) checkState(c context.Context, oldState int8, lm *xmdl.LikeMsg) (newState, likeType int8, err error) {
  257. likeType = lm.Type
  258. if oldState == model.StateBlank {
  259. switch lm.Type {
  260. case model.TypeLike:
  261. newState = model.StateLike
  262. case model.TypeCancelLike:
  263. err = ecode.ThumbupCancelLikeErr
  264. case model.TypeDislike:
  265. newState = model.StateDislike
  266. case model.TypeCancelDislike:
  267. err = ecode.ThumbupCancelDislikeErr
  268. }
  269. } else if oldState == model.StateLike {
  270. switch lm.Type {
  271. case model.TypeLike:
  272. err = ecode.ThumbupDupLikeErr
  273. limit := s.businessMap[lm.Business].UserLikesLimit
  274. bid := s.businessMap[lm.Business].ID
  275. likeRecord := &model.ItemLikeRecord{MessageID: lm.MessageID, Time: xtime.Time(lm.LikeTime.Unix())}
  276. if exists, err1 := s.dao.ExpireUserLikesCache(c, lm.Mid, bid, model.StateLike); err1 == nil && exists {
  277. s.dao.AppendCacheUserLikeList(c, lm.Mid, likeRecord, bid, model.StateLike, limit)
  278. }
  279. case model.TypeCancelLike:
  280. newState = model.StateBlank
  281. case model.TypeDislike:
  282. likeType = model.TypeLikeReverse
  283. newState = model.StateDislike
  284. case model.TypeCancelDislike:
  285. err = ecode.ThumbupCancelDislikeErr
  286. }
  287. } else if oldState == model.StateDislike {
  288. switch lm.Type {
  289. case model.TypeLike:
  290. likeType = model.TypeDislikeReverse
  291. newState = model.StateLike
  292. case model.TypeCancelLike:
  293. err = ecode.ThumbupCancelLikeErr
  294. case model.TypeDislike:
  295. err = ecode.ThumbupDupDislikeErr
  296. limit := s.businessMap[lm.Business].UserLikesLimit
  297. bid := s.businessMap[lm.Business].ID
  298. likeRecord := &model.ItemLikeRecord{MessageID: lm.MessageID, Time: xtime.Time(lm.LikeTime.Unix())}
  299. if exists, err1 := s.dao.ExpireUserLikesCache(c, lm.Mid, bid, model.StateDislike); err1 == nil && exists {
  300. s.dao.AppendCacheUserLikeList(c, lm.Mid, likeRecord, bid, model.StateDislike, limit)
  301. }
  302. case model.TypeCancelDislike:
  303. newState = model.StateBlank
  304. }
  305. } else {
  306. log.Warn("oldState abnormal mid:%d business:%v oid:%d messageID:%d oldState:%d", lm.Mid, lm.Business, lm.OriginID, lm.MessageID, oldState)
  307. }
  308. return
  309. }
  310. func (s *Service) mergeTarget(business string, aid int64) int64 {
  311. if s.statMerge != nil && s.statMerge.Business == business && s.statMerge.Sources[aid] {
  312. return s.statMerge.Target
  313. }
  314. return 0
  315. }