123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- package dao
- import (
- "context"
- "go-common/app/service/bbq/search/api/grpc/v1"
- "go-common/app/service/bbq/search/conf"
- "go-common/library/log"
- "io/ioutil"
- "math/rand"
- "net/http"
- "strconv"
- "strings"
- "time"
- "github.com/json-iterator/go"
- "gopkg.in/olivere/elastic.v5"
- )
- const (
- _bbqEsName = "bbq"
- _videoIndex = "video"
- _videoType = "video_info"
- _videoMapping = `
- {
- "settings":{
- "number_of_shards":5,
- "number_of_replicas":2,
- "index":{
- "analysis.analyzer.default.type":"ik_smart"
- }
- }
- }
- `
- )
- //SaveVideo 保存视频信息
- func (d *Dao) SaveVideo(c context.Context, videos *v1.SaveVideoRequest) (err error) {
- d.createESIndex(_bbqEsName, _videoIndex, _videoMapping)
- bulkRequest := d.esPool[_bbqEsName].Bulk()
- for _, v := range videos.List {
- request := elastic.NewBulkUpdateRequest().Index(_videoIndex).Type(_videoType).Id(strconv.Itoa(int(v.SVID))).Doc(v).DocAsUpsert(true)
- bulkRequest.Add(request)
- }
- if _, err = bulkRequest.Do(c); err != nil {
- log.Error("save es [%d] err(%v)", 1, err)
- }
- return
- }
- //RecVideoDataElastic 获取视频信息
- func (d *Dao) RecVideoDataElastic(c context.Context, query elastic.Query, script *elastic.ScriptSort, from, size int) (total int64, list []*v1.RecVideoInfo, err error) {
- search := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType)
- if query != nil {
- search.Query(query)
- }
- if script != nil {
- search.SortBy(script)
- }
- log.Error("start time(%d)", time.Now().UnixNano())
- res, err := search.From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c)
- if err != nil {
- log.Error("video search es (%s) err(%v)", _bbqEsName, err)
- return
- }
- log.Error("do time(%d)", time.Now().UnixNano())
- total = res.TotalHits()
- list = []*v1.RecVideoInfo{}
- for _, value := range res.Hits.Hits {
- tmp := new(v1.RecVideoInfo)
- byte, _ := jsoniter.Marshal(value.Source)
- jsoniter.Unmarshal(byte, tmp)
- if value.Score != nil {
- tmp.ESScore = float64(*value.Score)
- }
- if value.Sort != nil {
- for _, v := range value.Sort {
- tmp.CustomScore = append(tmp.CustomScore, v.(float64))
- }
- }
- list = append(list, tmp)
- }
- return
- }
- //VideoData 获取视频信息
- func (d *Dao) VideoData(c context.Context, query elastic.Query, from, size int) (total int64, list []*v1.VideoESInfo, err error) {
- res, err := d.esPool[_bbqEsName].Search().Index(_videoIndex).Type(_videoType).Query(query).From(from).Size(size).Timeout(d.c.Es[_bbqEsName].Timeout).Do(c)
- if err != nil {
- log.Error("video search es (%s) err(%v)", _bbqEsName, err)
- return
- }
- total = res.TotalHits()
- list = []*v1.VideoESInfo{}
- for _, value := range res.Hits.Hits {
- tmp := new(v1.VideoESInfo)
- byte, _ := jsoniter.Marshal(value.Source)
- jsoniter.Unmarshal(byte, tmp)
- list = append(list, tmp)
- }
- return
- }
- //ESVideoData 获取视频信息
- func (d *Dao) ESVideoData(c context.Context, query string) (total int64, list []*v1.RecVideoInfo, err error) {
- i := rand.Intn(len(conf.Conf.Es["bbq"].Addr))
- req, err := http.NewRequest("POST", conf.Conf.Es["bbq"].Addr[i]+"/video/_search", strings.NewReader(query))
- if err != nil {
- log.Error("conn es err(%v)", err)
- return
- }
- req.Header.Set("Content-Type", "application/json; charset=UTF-8")
- j := rand.Intn(len(d.httpClient))
- res, err := d.httpClient[j].Do(req)
- if err != nil || res.StatusCode != 200 {
- log.Error("conn es http err(%v)", err)
- return
- }
- body, err := ioutil.ReadAll(res.Body)
- if err != nil {
- log.Error("es read body err(%v)", err)
- return
- }
- log.Infov(c, log.KV("query", query), log.KV("response", string(body)))
- videos := new(elastic.SearchResult)
- jsoniter.Unmarshal(body, &videos)
- if videos == nil {
- return
- }
- list = make([]*v1.RecVideoInfo, 0)
- total = videos.TotalHits()
- for _, value := range videos.Hits.Hits {
- tmp := new(v1.RecVideoInfo)
- byte, _ := jsoniter.Marshal(value.Source)
- jsoniter.Unmarshal(byte, tmp)
- list = append(list, tmp)
- }
- return
- }
- // DelVideoDataBySVID 根据svid删除视频
- func (d *Dao) DelVideoDataBySVID(c context.Context, svid int64) (err error) {
- i := rand.Intn(len(conf.Conf.Es["bbq"].Addr))
- url := conf.Conf.Es["bbq"].Addr[i] + "/video/video_info/" + strconv.Itoa(int(svid))
- req, err := http.NewRequest("DELETE", url, nil)
- if err != nil {
- log.Error("conn es err(%v)", err)
- return
- }
- j := rand.Intn(len(d.httpClient))
- res, err := d.httpClient[j].Do(req)
- if err != nil || res.StatusCode != 200 {
- log.Error("conn read body err(%v)", err)
- }
- return
- }
|