item_likes.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/thumbup/model"
  6. xmdl "go-common/app/service/main/thumbup/model"
  7. "go-common/library/log"
  8. "go-common/library/queue/databus"
  9. )
  10. func newItemLikeMsg(msg *databus.Message) (res interface{}, err error) {
  11. itemLikesMsg := new(xmdl.ItemMsg)
  12. if err = json.Unmarshal(msg.Value, &itemLikesMsg); err != nil {
  13. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  14. return
  15. }
  16. log.Info("get item like msg: %+v", itemLikesMsg)
  17. res = itemLikesMsg
  18. return
  19. }
  20. func itemLikesSplit(msg *databus.Message, data interface{}) int {
  21. im, ok := data.(*xmdl.ItemMsg)
  22. if !ok {
  23. log.Error("user item msg err: message_id: 0 %s", msg.Value)
  24. return 0
  25. }
  26. return int(im.MessageID)
  27. }
  28. func (s *Service) itemLikesDo(ms []interface{}) {
  29. for _, m := range ms {
  30. im, ok := m.(*xmdl.ItemMsg)
  31. if !ok {
  32. continue
  33. }
  34. var (
  35. err error
  36. businessID int64
  37. ctx = context.Background()
  38. )
  39. if businessID, err = s.checkBusinessOrigin(im.Business, im.OriginID); err != nil {
  40. continue
  41. }
  42. var exist bool
  43. if exist, _ = s.dao.ExpireItemLikesCache(ctx, im.MessageID, businessID, im.State); exist {
  44. continue
  45. }
  46. for i := 0; i < _retryTimes; i++ {
  47. if err = s.addItemLikesCache(ctx, im); err == nil {
  48. break
  49. }
  50. }
  51. log.Info("itemLikes success params(%+v)", m)
  52. }
  53. }
  54. // addCacheItemLikes .
  55. func (s *Service) addItemLikesCache(c context.Context, p *xmdl.ItemMsg) (err error) {
  56. var businessID int64
  57. if businessID, err = s.checkBusinessOrigin(p.Business, p.OriginID); err != nil {
  58. log.Error("s.checkBusinessOrigin business(%s) originID(%s)", p.Business, p.OriginID, err)
  59. return
  60. }
  61. var items []*model.UserLikeRecord
  62. var limit = s.businessIDMap[businessID].MessageLikesLimit
  63. if items, err = s.dao.ItemLikes(c, businessID, p.OriginID, p.MessageID, p.State, limit); err != nil {
  64. log.Error("s.dao.ItemLikes businessID(%d) originID(%d) messageID(%d) type(%d) error(%v)", businessID, p.OriginID, p.MessageID, p.State, err)
  65. return
  66. }
  67. err = s.dao.AddItemLikesCache(c, businessID, p.MessageID, p.State, limit, items)
  68. return
  69. }
  70. func (s *Service) addItemlikeRecord(c context.Context, businessID, messageID int64, state int8, item *model.UserLikeRecord) (err error) {
  71. var exist bool
  72. if exist, err = s.dao.ExpireItemLikesCache(c, messageID, businessID, state); (err != nil) || !exist {
  73. return
  74. }
  75. limit := s.businessIDMap[businessID].MessageLikesLimit
  76. err = s.dao.AppendCacheItemLikeList(c, messageID, item, businessID, state, limit)
  77. return
  78. }