video.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/service/bbq/search/api/grpc/v1"
  5. "go-common/app/service/bbq/search/conf"
  6. "go-common/library/log"
  7. "io/ioutil"
  8. "math/rand"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/json-iterator/go"
  14. "gopkg.in/olivere/elastic.v5"
  15. )
  16. const (
  17. _bbqEsName = "bbq"
  18. _videoIndex = "video"
  19. _videoType = "video_info"
  20. _videoMapping = `
  21. {
  22. "settings":{
  23. "number_of_shards":5,
  24. "number_of_replicas":2,
  25. "index":{
  26. "analysis.analyzer.default.type":"ik_smart"
  27. }
  28. }
  29. }
  30. `
  31. )
  32. //SaveVideo 保存视频信息
  33. func (d *Dao) SaveVideo(c context.Context, videos *v1.SaveVideoRequest) (err error) {
  34. d.createESIndex(_bbqEsName, _videoIndex, _videoMapping)
  35. bulkRequest := d.esPool[_bbqEsName].Bulk()
  36. for _, v := range videos.List {
  37. request := elastic.NewBulkUpdateRequest().Index(_videoIndex).Type(_videoType).Id(strconv.Itoa(int(v.SVID))).Doc(v).DocAsUpsert(true)
  38. bulkRequest.Add(request)
  39. }
  40. if _, err = bulkRequest.Do(c); err != nil {
  41. log.Error("save es [%d] err(%v)", 1, err)
  42. }
  43. return
  44. }
  45. //RecVideoDataElastic 获取视频信息
  46. func (d *Dao) RecVideoDataElastic(c context.Context, query elastic.Query, script *elastic.ScriptSort, from, size int) (total int64, list []*v1.RecVideoInfo, err error) {
  47. search := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType)
  48. if query != nil {
  49. search.Query(query)
  50. }
  51. if script != nil {
  52. search.SortBy(script)
  53. }
  54. log.Error("start time(%d)", time.Now().UnixNano())
  55. res, err := search.From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c)
  56. if err != nil {
  57. log.Error("video search es (%s) err(%v)", _bbqEsName, err)
  58. return
  59. }
  60. log.Error("do time(%d)", time.Now().UnixNano())
  61. total = res.TotalHits()
  62. list = []*v1.RecVideoInfo{}
  63. for _, value := range res.Hits.Hits {
  64. tmp := new(v1.RecVideoInfo)
  65. byte, _ := jsoniter.Marshal(value.Source)
  66. jsoniter.Unmarshal(byte, tmp)
  67. if value.Score != nil {
  68. tmp.ESScore = float64(*value.Score)
  69. }
  70. if value.Sort != nil {
  71. for _, v := range value.Sort {
  72. tmp.CustomScore = append(tmp.CustomScore, v.(float64))
  73. }
  74. }
  75. list = append(list, tmp)
  76. }
  77. return
  78. }
  79. //VideoData 获取视频信息
  80. func (d *Dao) VideoData(c context.Context, query elastic.Query, from, size int) (total int64, list []*v1.VideoESInfo, err error) {
  81. res, err := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType).Query(query).From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c)
  82. if err != nil {
  83. log.Error("video search es (%s) err(%v)", _bbqEsName, err)
  84. return
  85. }
  86. total = res.TotalHits()
  87. list = []*v1.VideoESInfo{}
  88. for _, value := range res.Hits.Hits {
  89. tmp := new(v1.VideoESInfo)
  90. byte, _ := jsoniter.Marshal(value.Source)
  91. jsoniter.Unmarshal(byte, tmp)
  92. list = append(list, tmp)
  93. }
  94. return
  95. }
  96. //ESVideoData 获取视频信息
  97. func (d *Dao) ESVideoData(c context.Context, query string) (total int64, list []*v1.RecVideoInfo, err error) {
  98. i := rand.Intn(len(conf.Conf.Es["bbq"].Addr))
  99. req, err := http.NewRequest("POST", conf.Conf.Es["bbq"].Addr[i]+"/video/_search", strings.NewReader(query))
  100. if err != nil {
  101. log.Error("conn es err(%v)", err)
  102. return
  103. }
  104. req.Header.Set("Content-Type", "application/json; charset=UTF-8")
  105. j := rand.Intn(len(d.httpClient))
  106. res, err := d.httpClient[j].Do(req)
  107. if err != nil || res.StatusCode != 200 {
  108. log.Error("conn es http err(%v)", err)
  109. return
  110. }
  111. body, err := ioutil.ReadAll(res.Body)
  112. if err != nil {
  113. log.Error("es read body err(%v)", err)
  114. return
  115. }
  116. log.Infov(c, log.KV("query", query), log.KV("response", string(body)))
  117. videos := new(elastic.SearchResult)
  118. jsoniter.Unmarshal(body, &videos)
  119. if videos == nil {
  120. return
  121. }
  122. list = make([]*v1.RecVideoInfo, 0)
  123. total = videos.TotalHits()
  124. for _, value := range videos.Hits.Hits {
  125. tmp := new(v1.RecVideoInfo)
  126. byte, _ := jsoniter.Marshal(value.Source)
  127. jsoniter.Unmarshal(byte, tmp)
  128. list = append(list, tmp)
  129. }
  130. return
  131. }
  132. // DelVideoDataBySVID 根据svid删除视频
  133. func (d *Dao) DelVideoDataBySVID(c context.Context, svid int64) (err error) {
  134. i := rand.Intn(len(conf.Conf.Es["bbq"].Addr))
  135. url := conf.Conf.Es["bbq"].Addr[i] + "/video/video_info/" + strconv.Itoa(int(svid))
  136. req, err := http.NewRequest("DELETE", url, nil)
  137. if err != nil {
  138. log.Error("conn es err(%v)", err)
  139. return
  140. }
  141. j := rand.Intn(len(d.httpClient))
  142. res, err := d.httpClient[j].Do(req)
  143. if err != nil || res.StatusCode != 200 {
  144. log.Error("conn read body err(%v)", err)
  145. }
  146. return
  147. }