forward_index.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package service
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "os"
  6. "os/exec"
  7. "go-common/app/job/bbq/recall/proto"
  8. "go-common/library/log"
  9. "github.com/golang/snappy"
  10. )
  11. // GenForwardIndex 生产正排索引
  12. func (s *Service) GenForwardIndex() {
  13. log.Info("run [%s]", "GenForwardIndex")
  14. c := context.Background()
  15. vInfo, err := s.videoBasicInfo(c)
  16. if err != nil {
  17. log.Error("video info: %v", err)
  18. return
  19. }
  20. outputFile, err := os.OpenFile(s.c.Job.ForwardIndex.Output, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
  21. if err != nil {
  22. log.Error("open file: %v", err)
  23. return
  24. }
  25. shadowFile, _ := os.OpenFile(s.c.Job.ForwardIndex.Output+".bak", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
  26. defer outputFile.Close()
  27. defer shadowFile.Close()
  28. for _, v := range vInfo {
  29. qu, _ := s.dao.FetchVideoQuality(c, v.SVID)
  30. tmp := &proto.ForwardIndex{
  31. SVID: v.SVID,
  32. BasicInfo: v,
  33. VideoQuality: qu,
  34. }
  35. raw, err := tmp.Marshal()
  36. if err != nil {
  37. log.Error("json marshal: %v", err)
  38. continue
  39. }
  40. _, err = outputFile.WriteString(hex.EncodeToString(snappy.Encode(nil, raw)))
  41. if err != nil {
  42. log.Error("output: %v", err)
  43. }
  44. outputFile.Write([]byte("\n"))
  45. if err != nil {
  46. log.Error("output endline: %v", err)
  47. }
  48. shadowFile.WriteString(tmp.String())
  49. if err != nil {
  50. log.Error("shadow: %v", err)
  51. }
  52. shadowFile.Write([]byte("\n"))
  53. if err != nil {
  54. log.Error("shadow endline: %v", err)
  55. }
  56. }
  57. exec.Command(s.c.Job.ForwardIndex.Output + ".sh").Run()
  58. log.Info("finish [GenForwardIndex]")
  59. s.GenRealTimeInvertedIndex()
  60. }
  61. func (s *Service) videoBasicInfo(c context.Context) (result []*proto.VideoInfo, err error) {
  62. // fetch tag info from db
  63. tags, err := s.dao.FetchVideoTagAll(c)
  64. if err != nil {
  65. return
  66. }
  67. tagIDMap := make(map[int32]*proto.Tag)
  68. tagNameMap := make(map[string]*proto.Tag)
  69. for _, v := range tags {
  70. tagIDMap[v.TagID] = v
  71. tagNameMap[v.TagName] = v
  72. }
  73. // fetch video info from db
  74. offset := 0
  75. size := 1000
  76. basic, err := s.dao.FetchVideoInfo(c, offset, size)
  77. if err != nil {
  78. log.Error("FetchVideoInfo: %v", err)
  79. return
  80. }
  81. for len(basic) > 0 && err == nil {
  82. log.Info("FetchVideoInfo: %v", len(result))
  83. for _, v := range basic {
  84. vInfo := &proto.VideoInfo{
  85. SVID: uint64(v.SVID),
  86. Title: v.Title,
  87. Content: v.Content,
  88. MID: uint64(v.MID),
  89. AVID: uint64(v.AVID),
  90. CID: uint64(v.CID),
  91. PubTime: v.PubTime.Time().Unix(),
  92. CTime: v.CTime.Time().Unix(),
  93. MTime: v.MTime.Time().Unix(),
  94. Duration: uint32(v.Duration),
  95. State: int32(v.State),
  96. }
  97. vTags := make([]*proto.Tag, 0)
  98. // 一级标签
  99. if tag, ok := tagIDMap[v.TID]; ok {
  100. vTags = append(vTags, tag)
  101. }
  102. // 二级标签
  103. if subTag, ok := tagIDMap[v.SubTID]; ok {
  104. vTags = append(vTags, subTag)
  105. }
  106. // 三级标签
  107. if textTags, e := s.dao.FetchVideoTextTag(c, v.SVID); e == nil {
  108. for _, v := range textTags {
  109. if tmp, ok := tagNameMap[v]; ok {
  110. vTags = append(vTags, tmp)
  111. }
  112. }
  113. }
  114. vInfo.Tags = vTags
  115. result = append(result, vInfo)
  116. }
  117. offset += size
  118. basic, err = s.dao.FetchVideoInfo(c, offset, size)
  119. if err != nil {
  120. log.Error("FetchVideoInfo: %v", err)
  121. }
  122. }
  123. return
  124. }