manual.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package ugc
  2. import (
  3. "context"
  4. "strings"
  5. "time"
  6. appDao "go-common/app/job/main/tv/dao/app"
  7. ugcmdl "go-common/app/job/main/tv/model/ugc"
  8. arccli "go-common/app/service/main/archive/api"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. )
  12. func (s *Service) manualproc() {
  13. var (
  14. err error
  15. arcs []*ugcmdl.Archive
  16. )
  17. defer s.waiter.Done()
  18. for {
  19. if s.daoClosed {
  20. log.Info("manualproc DB closed!")
  21. return
  22. }
  23. if arcs, err = s.dao.Manual(ctx); err != nil && err != sql.ErrNoRows {
  24. log.Error("manualproc Error %v", err)
  25. appDao.PromError("Manual:Err")
  26. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ManualFre))
  27. continue
  28. }
  29. if err == sql.ErrNoRows || len(arcs) == 0 {
  30. log.Info("No Manual Data")
  31. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ManualFre))
  32. continue
  33. }
  34. if err = s.manual(arcs); err != nil {
  35. log.Error("manualproc Error %v", err)
  36. appDao.PromError("Manual:Err")
  37. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ManualFre))
  38. continue
  39. }
  40. appDao.PromInfo("Manual:Succ")
  41. time.Sleep(1 * time.Second)
  42. }
  43. }
  44. func (s *Service) manual(arcs []*ugcmdl.Archive) (err error) {
  45. for _, v := range arcs { // locate each archive
  46. if err = s.importArc(context.Background(), v.AID, true); err != nil {
  47. log.Error("importArc Error %v", err)
  48. s.dao.Ppmnl(ctx, v.AID) // postpone the next retry
  49. continue
  50. }
  51. }
  52. return
  53. }
  54. //coverURL is used for completing url start with bfs,example: /bfs/archive/diuren.png
  55. func (s *Service) coverURL(uri string, prefix string) string {
  56. if strings.HasPrefix(uri, "/bfs") {
  57. return prefix + uri
  58. }
  59. return uri
  60. }
  61. // importArc imports an brand new archive data, isManual means whether we need update the arc's status
  62. func (s *Service) importArc(ctx context.Context, aid int64, isManual bool) (err error) {
  63. var (
  64. tx *sql.Tx
  65. arcGrpc *arccli.Arc
  66. view *arccli.ViewReply
  67. arcAllow = &ugcmdl.ArcAllow{}
  68. arc = &ugcmdl.Archive{}
  69. )
  70. if arcGrpc, err = s.arcPick(ctx, aid); err != nil { // pick archive api data
  71. return
  72. }
  73. arcAllow.FromArcReply(arcGrpc)
  74. if allow := s.arcAllowImport(arcAllow); !allow { // check whether the archive is allowed to import into TV db
  75. if isManual {
  76. err = s.delArc(aid)
  77. }
  78. return
  79. }
  80. arc.FromArcReply(arcGrpc)
  81. arc.Cover = s.coverURL(arc.Cover, s.c.UgcSync.Cfg.BFSPrefix)
  82. if view, err = s.videoPick(ctx, aid); err != nil { // pick video api data
  83. return
  84. }
  85. if tx, err = s.dao.BeginTran(ctx); err != nil { // begin transaction
  86. return
  87. }
  88. if isManual {
  89. if err = s.dao.TxMnlArc(tx, arc); err != nil { // manual import archive data, update
  90. tx.Rollback()
  91. return
  92. }
  93. if arc.MID != 0 {
  94. s.manualUp(ctx, arc.MID)
  95. }
  96. } else {
  97. if err = s.dao.TxAutoArc(tx, arc); err != nil { // databus import archive data, insert
  98. tx.Rollback()
  99. return
  100. }
  101. }
  102. if err = s.dao.TxMnlVideos(tx, view); err != nil { // import video data
  103. tx.Rollback()
  104. return
  105. }
  106. if isManual {
  107. if err = s.dao.TxMnlStatus(tx, aid); err != nil { // update the manual to 0, finish the operation
  108. tx.Rollback()
  109. return
  110. }
  111. }
  112. log.Info("ImportArc Aid %d Succ", aid)
  113. tx.Commit()
  114. return
  115. }
  116. // manualUp imports the manual submit archive's upper
  117. func (s *Service) manualUp(ctx context.Context, mid int64) {
  118. if cmid, _ := s.dao.UpInList(ctx, mid); cmid != 0 { // mid already in list, no need to import
  119. return
  120. }
  121. upRPC, err := s.upDao.Card3(ctx, mid)
  122. if err != nil { // load remote upper data
  123. log.Warn("[manualUp] Card3 Mid %d, Err %d", mid, err)
  124. return
  125. }
  126. s.upDao.ImportUp(ctx, &ugcmdl.EasyUp{ // import this upper into DB and cache
  127. MID: mid,
  128. Face: upRPC.Face,
  129. Name: upRPC.Name,
  130. })
  131. }