databus.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/job/main/growup/model"
  8. "go-common/library/log"
  9. )
  10. var (
  11. _archiveTable = "archive"
  12. _musicTable = "music"
  13. _actionUpdate = "update"
  14. _actionInsert = "insert"
  15. )
  16. func (s *Service) archiveConsume(ctx context.Context) {
  17. var (
  18. msgs = s.arcSub.Messages()
  19. err error
  20. )
  21. log.Info("archiveConsume start")
  22. for {
  23. msg, ok := <-msgs
  24. if !ok {
  25. log.Error("s.arcSub.Messages closed", err)
  26. return
  27. }
  28. msg.Commit()
  29. archive := &model.ArchiveMsg{}
  30. if err = json.Unmarshal(msg.Value, archive); err != nil {
  31. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  32. continue
  33. }
  34. if archive.Table == _musicTable {
  35. go s.addBgmWhiteList(archive.Action, archive.New, archive.Old)
  36. }
  37. if archive.Table == _archiveTable && archive.Action == _actionUpdate {
  38. go s.checkArchiveState(archive.New, archive.Old)
  39. }
  40. }
  41. }
  42. // (action == insert && new.state = 0) || (action = update && new.state = 0 && old.state < 0)
  43. func (s *Service) addBgmWhiteList(action string, newMsg, oldMsg []byte) {
  44. nw := &model.BgmSub{}
  45. if err := json.Unmarshal(newMsg, nw); err != nil {
  46. log.Error("json.Unmarshal(%s) error(%v)", newMsg, err)
  47. return
  48. }
  49. old := &model.BgmSub{}
  50. if action == _actionUpdate {
  51. if err := json.Unmarshal(oldMsg, old); err != nil {
  52. log.Error("json.Unmarshal(%s) error(%v)", oldMsg, err)
  53. return
  54. }
  55. }
  56. if (action == _actionInsert && nw.State == 0) || (action == _actionUpdate && nw.State == 0 && old.State < 0) {
  57. log.Info("addBgmWhiteList mid(%d)", nw.MID)
  58. _, err := s.dao.InsertBgmWhiteList(context.Background(), nw.MID)
  59. if err != nil {
  60. log.Error(" s.dao.InsertBgmWhiteList(%d) error(%v)", nw.MID, err)
  61. }
  62. }
  63. }
  64. // new.state>=0 && new.Copyright =2 && old.Copyright == 1 原创变转载
  65. // new.state>=0 && new.Copyright =1 && old.Copyright == 2 转载变原创
  66. func (s *Service) checkArchiveState(newMsg, oldMsg []byte) {
  67. nw := &model.ArchiveSub{}
  68. if err := json.Unmarshal(newMsg, nw); err != nil {
  69. log.Error("json.Unmarshal(%s) error(%v)", newMsg, err)
  70. return
  71. }
  72. old := &model.ArchiveSub{}
  73. if err := json.Unmarshal(oldMsg, old); err != nil {
  74. log.Error("json.Unmarshal(%s) error(%v)", oldMsg, err)
  75. return
  76. }
  77. // 原创变转载
  78. if nw.State >= 0 && nw.Copyright == 2 && old.Copyright == 1 {
  79. log.Info("checkArchiveState get 1 avid(%d) mid(%d)", nw.ID, nw.MID)
  80. s.avBreachPre(context.Background(), nw.ID, nw.MID, 1)
  81. }
  82. // 转载变原创
  83. if nw.State >= 0 && nw.Copyright == 1 && old.Copyright == 2 {
  84. log.Info("checkArchiveState get 0 avid(%d) mid(%d)", nw.ID, nw.MID)
  85. s.avBreachPre(context.Background(), nw.ID, nw.MID, 0)
  86. }
  87. }
  88. func (s *Service) avBreachPre(c context.Context, aid, mid int64, state int) (err error) {
  89. if aid == 0 || mid == 0 {
  90. return
  91. }
  92. accState, err := s.dao.GetUpStateByMID(c, mid)
  93. if err != nil {
  94. log.Error(" s.dao.GetUpStateByMID(%d) error(%v)", mid, err)
  95. return
  96. }
  97. if accState != 3 {
  98. return
  99. }
  100. // status == 1 insert av_breach_pre, status == 0 update state if exist
  101. if state == 1 {
  102. val := fmt.Sprintf("%d,%d,'%s',0,1", aid, mid, time.Now().Format(_layout))
  103. _, err = s.dao.InsertAvBreachPre(c, val)
  104. if err != nil {
  105. log.Error("s.dao.InsertAvBreachPre error(%v)", err)
  106. }
  107. } else if state == 0 {
  108. _, err = s.dao.UpdateAvBreachPre(c, aid, 0, time.Now().AddDate(0, 0, -1).Format(_layout), state)
  109. if err != nil {
  110. log.Error("s.dao.UpdateAvBreachPre error(%v)", err)
  111. }
  112. }
  113. return
  114. }