import.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package ugc
  2. import (
  3. "fmt"
  4. "time"
  5. appDao "go-common/app/job/main/tv/dao/app"
  6. ugcmdl "go-common/app/job/main/tv/model/ugc"
  7. arccli "go-common/app/service/main/archive/api"
  8. arcmdl "go-common/app/service/main/archive/model/archive"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. )
  12. const (
  13. _arcRetry = 3
  14. _apiRetry = 5
  15. _sleep = 100 * time.Millisecond
  16. )
  17. // upImportproc always runs to init the uppers
  18. func (s *Service) upImportproc() {
  19. var (
  20. err error
  21. uppers []*ugcmdl.Upper
  22. )
  23. defer s.waiter.Done()
  24. for {
  25. if s.daoClosed {
  26. log.Info("upImportproc DB closed!")
  27. return
  28. }
  29. // if no more data, we scan per 30s
  30. if uppers, err = s.dao.Import(ctx); err != nil && err != sql.ErrNoRows {
  31. log.Error("upperImport error %v", err)
  32. appDao.PromError("ImportMid:Err")
  33. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ImportFre))
  34. continue
  35. }
  36. if len(uppers) == 0 && err == sql.ErrNoRows {
  37. log.Info("No Import Data")
  38. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ImportFre))
  39. continue
  40. }
  41. if err = s.upImport(uppers); err != nil {
  42. log.Error("upImport Error %v", err)
  43. appDao.PromError("ImportMid:Err")
  44. time.Sleep(time.Duration(s.c.UgcSync.Frequency.ImportFre))
  45. continue
  46. }
  47. appDao.PromInfo("ImportMid:Succ")
  48. time.Sleep(1 * time.Second)
  49. }
  50. }
  51. // upImport loads 20 uppers to init, and load them one by one
  52. func (s *Service) upImport(uppers []*ugcmdl.Upper) (err error) {
  53. for _, v := range uppers {
  54. // import data
  55. if err = s.InitUpper(v.MID); err != nil {
  56. log.Error("initUpper MID: %v, Err: %v, Postpone the MID", v.MID, err)
  57. s.dao.PpUpper(ctx, v.MID)
  58. continue
  59. }
  60. // update the status
  61. if err = s.dao.FinishUpper(ctx, v.MID); err != nil {
  62. log.Error("FinishUpper Mid: %d, Err: %v", v.MID, err)
  63. return
  64. }
  65. time.Sleep(time.Duration(s.c.UgcSync.Frequency.UpperPause)) // pause after import each upper
  66. }
  67. return
  68. }
  69. // InitUpper takes the upper's archive & videos, load them into our DB
  70. func (s *Service) InitUpper(mid int64) (err error) {
  71. var (
  72. arcCount int
  73. ps = s.c.UgcSync.Batch.ArcPS // page size to pick archives
  74. ptn int // total page number
  75. pMatch map[int64]*arccli.Arc // the mapping of aid to archive model of one page
  76. pAids []int64 // the aids of one page
  77. videoNum int64
  78. begin = time.Now()
  79. )
  80. // count upper's archive and get the total number of pages to get
  81. if arcCount, err = s.arcCount(mid); err != nil {
  82. return
  83. }
  84. log.Info("InitUpper mid %d, Count: %d", mid, arcCount)
  85. if arcCount == 0 {
  86. log.Error("Upper %d Arc Count is 0", mid)
  87. return
  88. }
  89. if arcCount%ps == 0 {
  90. ptn = arcCount / ps
  91. } else {
  92. ptn = arcCount/ps + 1
  93. }
  94. // get the upper's archives page by page
  95. for i := 1; i <= ptn; i++ {
  96. if pMatch, pAids, err = s.UpArchives(mid, i, ps); err != nil {
  97. log.Error("Mid %d, Page %d Error %v", mid, i, err)
  98. return
  99. }
  100. if len(pMatch) == 0 { // which means this page is all existing
  101. log.Error("Mid %d, Page %d, no need to import Due to Types Hit", mid, i)
  102. continue
  103. }
  104. if err = s.dao.FilterExist(ctx, &pMatch, pAids); err != nil { // filter the existing ones
  105. log.Error("Mid %d, Page %d Error %v", mid, i, err)
  106. return
  107. }
  108. if len(pMatch) == 0 { // which means this page is all existing
  109. log.Error("Mid %d, Page %d, no need to impot Due to Existing", mid, i)
  110. continue
  111. }
  112. if err = s.arcsIn(pMatch); err != nil { // insert this page's arc & views data into our DB
  113. log.Error("Mid %d, Page %d Error %v", mid, i, err)
  114. return
  115. }
  116. videoNum = videoNum + int64(len(pMatch))
  117. time.Sleep(time.Duration(s.c.UgcSync.Frequency.UpInitFre)) // pause after import each page of upper's archive
  118. }
  119. log.Info("ImportUpper Mid %d, Page Number %d, Page Size %d, "+
  120. "Video Number %d, Time %v", mid, ptn, ps, videoNum, time.Since(begin)) // record init upper time
  121. return
  122. }
  123. // get map's keys
  124. func mapKeys(myMap map[int64]*arccli.Arc) (keys []int64) {
  125. for k := range myMap {
  126. keys = append(keys, k)
  127. }
  128. return
  129. }
  130. // UpArchives picks one page of the up's archives
  131. func (s *Service) UpArchives(mid int64, pn int, ps int) (match map[int64]*arccli.Arc, aids []int64, err error) {
  132. var res []*arccli.Arc
  133. match = make(map[int64]*arccli.Arc)
  134. if err = Retry(func() (err error) {
  135. if res, err = s.arcRPC.UpArcs3(ctx, &arcmdl.ArgUpArcs2{
  136. Mid: mid,
  137. Pn: pn,
  138. Ps: ps,
  139. }); err != nil {
  140. log.Error("%+v", err)
  141. }
  142. return
  143. }, _arcRetry, _sleep); err != nil {
  144. log.Error("upArchives Error %+v", err)
  145. return
  146. } else if len(res) == 0 {
  147. err = fmt.Errorf("result empty")
  148. return
  149. }
  150. for _, v := range res {
  151. arcAllow := &ugcmdl.ArcAllow{}
  152. arcAllow.FromArcmdl(v)
  153. if allow := s.arcAllowImport(arcAllow); !allow { // check whether the archive is allowed to import into TV db
  154. continue
  155. }
  156. match[v.Aid] = v
  157. aids = append(aids, v.Aid)
  158. }
  159. return
  160. }
  161. // Retry . retry one function until no error
  162. func Retry(callback func() error, retry int, sleep time.Duration) (err error) {
  163. for i := 0; i < retry; i++ {
  164. if err = callback(); err == nil {
  165. return
  166. }
  167. time.Sleep(sleep)
  168. }
  169. return
  170. }
  171. // arcsIn picks one page of archive data and their views data, to import them into the DB one by one
  172. func (s *Service) arcsIn(pMatch map[int64]*arccli.Arc) (err error) {
  173. var (
  174. tx *sql.Tx
  175. pViews map[int64]*arccli.ViewReply
  176. pAids []int64
  177. )
  178. // get the filtered aids to get the views
  179. pAids = mapKeys(pMatch)
  180. if pViews, err = s.arcViews(pAids); err != nil {
  181. log.Error("arcsIn Error %v", err)
  182. return
  183. }
  184. // import the arc & its video one by one
  185. for aid, arc := range pMatch {
  186. // begin the transaction and insert the archive data
  187. if tx, err = s.dao.BeginTran(ctx); err != nil { // begin transaction
  188. return
  189. }
  190. arc.Pic = s.coverURL(arc.Pic, s.c.UgcSync.Cfg.BFSPrefix)
  191. if err = s.dao.TxImportArc(tx, arc); err != nil {
  192. tx.Rollback()
  193. return
  194. }
  195. cViews, ok := pViews[arc.Aid]
  196. if !ok {
  197. log.Error("arcIn View Data for %d not found", arc.Aid)
  198. tx.Rollback()
  199. return
  200. }
  201. if err = s.dao.TxMnlVideos(tx, cViews); err != nil {
  202. tx.Rollback()
  203. return
  204. }
  205. tx.Commit()
  206. log.Info("Succ Add Arc & View for Aid: %d", aid)
  207. }
  208. return
  209. }