databus.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/job/main/videoup/model/archive"
  6. "go-common/app/job/main/videoup/model/message"
  7. "go-common/app/job/main/videoup/model/redis"
  8. "go-common/library/log"
  9. "strconv"
  10. "time"
  11. )
  12. // sendBblog 发送粉丝动态databus
  13. func (s *Service) sendBblog(a *archive.Result) {
  14. var (
  15. err error
  16. c = context.TODO()
  17. noPush = int64(a.AttrVal(archive.AttrNoPushBplus))
  18. //默认不展示
  19. show = int64(0)
  20. staffs []*archive.Staff
  21. dataPoi, dataVote, dataVoteFix []byte
  22. )
  23. if noPush == 0 && a.IsNormal() {
  24. show = int64(1)
  25. }
  26. if noPush == 1 {
  27. show = int64(2)
  28. }
  29. msg := &message.BlogCardMsg{
  30. Card: &archive.BlogCard{
  31. Type: 8,
  32. Rid: a.Aid,
  33. OwnerID: a.Mid,
  34. Show: show,
  35. Ts: time.Now().Unix(),
  36. Dynamic: a.Dynamic,
  37. },
  38. }
  39. //lbs
  40. if dataPoi, err = s.arc.POI(context.TODO(), a.Aid); err != nil {
  41. log.Error("aid(%s) s.videoupPub.SendBblog(%v) POI error(%v)", a.Aid, msg, err)
  42. }
  43. //vote
  44. if dataVote, err = s.arc.Vote(context.TODO(), a.Aid); err != nil {
  45. log.Error("aid(%s) s.videoupPub.SendBblog(%v) Vote error(%v)", a.Aid, msg, err)
  46. }
  47. if dataPoi != nil || dataVote != nil {
  48. if dataVote != nil {
  49. var old *archive.VoteOld
  50. if err = json.Unmarshal(dataVote, &old); err != nil {
  51. log.Error("aid(%s) s.videoupPub.SendBblog(%+v) Vote old Unmarshal error(%v)", a.Aid, string(dataVote), err)
  52. }
  53. if dataVoteFix, err = json.Marshal(&archive.Vote{VoteID: old.VoteID, VoteTitle: old.VoteTitle}); err != nil {
  54. log.Error("aid(%s) s.videoupPub.SendBblog(%+v) Vote new Marshal error(%v)", a.Aid, old, err)
  55. }
  56. }
  57. var ext []byte
  58. if ext, err = json.Marshal(&archive.Ext{LBS: string(dataPoi), Vote: string(dataVoteFix)}); err != nil {
  59. log.Error("aid(%s) s.videoupPub.SendBblog(%+v) Unmarshal error(%v)", a.Aid, msg, err)
  60. }
  61. msg.Card.Ext = string(ext)
  62. }
  63. //staffs
  64. if a.AttrVal(archive.AttrBitSTAFF) == archive.AttrYes {
  65. if staffs, err = s.arc.Staffs(c, a.Aid); err != nil {
  66. log.Error("aid(%s) s.arc.Staffs (%+v) error(%v)", a.Aid, msg, err)
  67. }
  68. var staffBox []*archive.StaffItem
  69. if staffs != nil && len(staffs) > 0 {
  70. for _, v := range staffs {
  71. item := &archive.StaffItem{Type: 1, UID: v.StaffMID}
  72. staffBox = append(staffBox, item)
  73. }
  74. }
  75. if staffBox != nil && len(staffBox) > 0 {
  76. msg.Card.Staffs = staffBox
  77. log.Info("aid(%d) SendBblog(%+v) staffs(%+v)", a.Aid, msg, msg.Card.Staffs)
  78. }
  79. }
  80. log.Info("aid(%d) start to send SendBblog(%+v) poi(%v) vote(%v) to databus", a.Aid, msg, string(dataPoi), string(dataVoteFix))
  81. k := strconv.FormatInt(a.Aid, 10)
  82. if err = s.blogPub.Send(c, k, msg); err != nil {
  83. s.syncRetry(c, a.Aid, a.Mid, redis.ActionForSendBblog, "", "")
  84. log.Error("aid(%s) %s s.videoupPub.SendBblog(%v) error(%v)", k, msg, err)
  85. }
  86. }