package dao import ( "bytes" "context" "crypto/md5" "encoding/hex" "fmt" "go-common/app/job/bbq/video/conf" "go-common/app/job/bbq/video/model" "go-common/library/conf/env" "go-common/library/ecode" "go-common/library/log" xhttp "net/http" "net/url" "os" "sort" "strings" "time" "io/ioutil" pkgerr "github.com/pkg/errors" ) const ( _jobStatusSuccess = 1 _jobStatusFailed = 2 _jobStatusDoing = 3 _jobStatusWaiting = 4 //_httpHeaderUser = "x1-bilispy-user" //_httpHeaderColor = "x1-bilispy-color" //_httpHeaderTimeout = "x1-bilispy-timeout" _httpHeaderRemoteIP = "x-backend-bili-real-ip" _userAgent = "User-Agent" _noKickUserAgent = "yangyucheng@bilibili.com" _queryJSON = `{"select":[],"where":{"log_date":{"in":["%s"]}},"page":{"limit":1000},"sort":{"play":-1}}` _queryJSONOper = `{"select":[],"where":{"log_date":{"in":["%s"]},"cid":{"gt":%d}},"page":{"limit":5000},"sort":{"cid":1}}` _hscUserAgent = "huangshancheng@bilibili.com" _lzqUserAgent = "liuzhiquan@bilibili.com" _chmUserAgent = "caiheming@bilibili.com" _ljUserAgent = "liujin@bilibili.com" //_userDmgQueryJSON = `{"select":[],"where":{"log_date":{"in":["%s"]},"mid":{"gt":"%s"}},"sort":{"mid":1},"page":{"limit":200}}` _upUserDmgQueryJSON = `{"select":[],"where":{"mid":{"gt":%d}},"sort":{"mid":1},"page":{"limit":200}}` _userDmgQueryHive = `select mid, gender, age, geo, content_tag, viewed_video, content_zone, content_count, follow_ups from sycpb.hbase_dmp_tag where last_active_date >= %s and length(viewed_video) > 0` _upMidQueryHive = `select mid from ods.ods_member_relation_stat where log_date = %s and follower>= 10000 limit 100` //_upMidQueryHive = `{"select":["name":"mid"],"where":{"log_date":{"in":["%s"]},"follower":{"gte":10000}, "pages":{"limit":10}}` _basePathUserProfile = "/tmp/" _basePathUserProfileBuvid = "/data/" ) var ( signParams = []string{"appKey", "timestamp", "version"} ) // QueryPlayDaily get video play rank list from berserker func (d *Dao) QueryPlayDaily(c context.Context, date string) (vlist []*model.VideoHiveInfo, err error) { v := make(url.Values, 8) query := fmt.Sprintf(_queryJSON, date) v.Set("query", query) var res struct { Code int `json:"code"` Result []model.VideoHiveInfo `json:"result"` } if err = d.doHTTPGet(c, d.c.Berserker.API.Rankdaily, "", v, d.c.Berserker.Key.YYC, _noKickUserAgent, &res); err != nil { log.Error("d.doHTTPGet err[%v]", err) return } if res.Code != 200 || len(res.Result) == 0 { err = ecode.NothingFound log.Warn("Berserker return err, url:%s;res:%d", d.c.Berserker.API.Rankdaily+"?"+v.Encode(), res.Code) return } for _, info := range res.Result { i := info vlist = append(vlist, &i) } return } //QueryOperaVideo query operation video once func (d *Dao) QueryOperaVideo(c context.Context, date string, ch chan<- *model.VideoHiveInfo) (err error) { i := int64(0) var mid int64 for { v := make(url.Values, 8) var res struct { Code int `json:"code"` Result []model.VideoHiveInfo `json:"result"` } query := fmt.Sprintf(_queryJSONOper, date, i) v.Set("query", query) if err = d.doHTTPGet(c, d.c.Berserker.API.Operaonce, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res); err != nil { log.Error("d.doHTTPGet err[%v]", err) return } if res.Code == 200 && len(res.Result) == 0 { return } if res.Code != 200 { err = ecode.NothingFound log.Warn("Berserker return err, url:%s;res:%d", d.c.Berserker.API.Operaonce+"?"+v.Encode(), res.Code) return } for _, info := range res.Result { ch <- &info mid = info.CID } i = mid } } //QueryUserBasic ... func (d *Dao) QueryUserBasic(c context.Context) (jobURL string, err error) { v := make(url.Values, 8) var res struct { Code int `json:"code"` Msg string `json:"msg"` Result []string `json:"result"` } query := "{}" v.Set("query", query) if err = d.doHTTPGet(c, d.c.Berserker.API.Userbasic, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res); err != nil { log.Error("d.doHTTPGet err[%v]", err) return } for i, file := range res.Result { query = fmt.Sprintf("{\"fileSuffix\": \"%s\"}", file) v.Set("query", query) bs, err := d.doHTTPGetRaw(c, d.c.Berserker.API.Userbasic, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res) if err != nil { log.Error("d.doHTTPGet err[%v]", err) } else { fileName := fmt.Sprintf("/data/basic_profile/part_%d", i) if ioutil.WriteFile(fileName, bs, 0644) == nil { log.Info("write file success") } else { log.Error("write file error(%v)", err) } } } return } //UserProfileGet ... func (d *Dao) UserProfileGet(c context.Context) (jobURL []string, err error) { // v := make(url.Values, 8) var res struct { Code int `json:"code"` Msg string `json:"msg"` Result []string `json:"result"` } query := "{}" v.Set("query", query) if err = d.doHTTPGet(c, d.c.Berserker.API.UserProfile, "", v, d.c.Berserker.Key.HM, _chmUserAgent, &res); err != nil { log.Error("d.doHTTPGet err[%v]", err) return } for i, file := range res.Result { query = fmt.Sprintf("{\"fileSuffix\": \"/%s\"}", file) //fmt.Printf("query: %v\n", query) v.Set("query", query) time.Sleep(3 * time.Second) var bs []byte bs, err = d.doHTTPGetRaw(c, d.c.Berserker.API.UserProfile, "", v, d.c.Berserker.Key.HM, _chmUserAgent, &res) if err != nil { log.Error("d.doHTTPGet err[%v]", err) } else { fileName := fmt.Sprintf(_basePathUserProfile+"part_%d", i) if ioutil.WriteFile(fileName, bs, 0644) == nil { log.Info("write file success") } else { log.Error("write file error(%v)", err) } d.ReadLine(fmt.Sprintf(_basePathUserProfile+"part_%d", i), d.HandlerUserBbqDmg) os.RemoveAll(fmt.Sprintf(_basePathUserProfile+"part_%d", i)) } } time.Sleep(3 * time.Second) v2 := make(url.Values, 8) var res2 struct { Code int `json:"code"` Msg string `json:"msg"` Result []string `json:"result"` } query2 := "{}" v2.Set("query2", query2) if err = d.doHTTPGet(c, d.c.Berserker.API.UserProfileBuvid, "", v2, d.c.Berserker.Key.HM, _chmUserAgent, &res2); err != nil { log.Error("d.doHTTPGet err[%v]", err) return } for i, file := range res2.Result { query2 = fmt.Sprintf("{\"fileSuffix\": \"/%s\"}", file) //fmt.Printf("query: %v\n", query) v2.Set("query", query2) time.Sleep(3 * time.Second) bs, err := d.doHTTPGetRaw(c, d.c.Berserker.API.UserProfileBuvid, "", v2, d.c.Berserker.Key.HM, _chmUserAgent, &res2) if err != nil { log.Error("d.doHTTPGet err[%v]", err) } else { fileName := fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i) if ioutil.WriteFile(fileName, bs, 0644) == nil { log.Info("write file success") } else { log.Error("write file error(%v)", err) } d.ReadLine(fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i), d.HandlerUserBbqDmgBuvid) os.RemoveAll(fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i)) } } return } // doHttpRequest make a http request for data platform api func (d *Dao) doHTTPGet(c context.Context, uri, realIP string, params url.Values, key *conf.BerSerkerKey, userAgent string, res interface{}) (err error) { enc, err := d.berserkeSign(params, key) if err != nil { err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params) return } if enc != "" { uri = uri + "?" + enc } req, err := xhttp.NewRequest(xhttp.MethodGet, uri, nil) fmt.Printf("Req: %s ", req.URL) if err != nil { err = pkgerr.Wrapf(err, "method:%s,uri:%s", xhttp.MethodGet, uri) return } req.Header.Set(_userAgent, userAgent+" "+env.AppID) if err != nil { return } if realIP != "" { req.Header.Set(_httpHeaderRemoteIP, realIP) } return d.HTTPClient.Do(c, req, res) } // doHTTPGetRaw make a http request for data platform api func (d *Dao) doHTTPGetRaw(c context.Context, uri, realIP string, params url.Values, key *conf.BerSerkerKey, userAgent string, res interface{}) (bs []byte, err error) { enc, err := d.berserkeSign(params, key) if err != nil { err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params) return } if enc != "" { uri = uri + "?" + enc } req, err := xhttp.NewRequest(xhttp.MethodGet, uri, nil) if err != nil { err = pkgerr.Wrapf(err, "method:%s,uri:%s", xhttp.MethodGet, uri) return } req.Header.Set(_userAgent, userAgent+" "+env.AppID) if err != nil { return } if realIP != "" { req.Header.Set(_httpHeaderRemoteIP, realIP) } return d.HTTPClient.Raw(c, req) } // Sign calc appkey and appsecret sign. func (d *Dao) berserkeSign(params url.Values, key *conf.BerSerkerKey) (query string, err error) { params.Set("appKey", key.Appkey) params.Set("signMethod", "md5") params.Set("timestamp", time.Now().Format("2006-01-02 15:04:05")) params.Set("version", "1.0") tmp := params.Encode() signTmp := d.encode(params) if strings.IndexByte(tmp, '+') > -1 { tmp = strings.Replace(tmp, "+", "%20", -1) } var b bytes.Buffer b.WriteString(key.Secret) b.WriteString(signTmp) b.WriteString(key.Secret) mh := md5.Sum(b.Bytes()) // query var qb bytes.Buffer qb.WriteString(tmp) qb.WriteString("&sign=") qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:]))) query = qb.String() return } // Encode encodes the values into ``URL encoded'' form // ("bar=baz&foo=quux") sorted by key. func (d *Dao) encode(v url.Values) string { if v == nil { return "" } var buf bytes.Buffer keys := make([]string, 0, len(v)) for k := range v { keys = append(keys, k) } sort.Strings(keys) for _, k := range keys { found := false for _, p := range signParams { if p == k { found = true break } } if !found { continue } vs := v[k] prefix := k for _, v := range vs { buf.WriteString(prefix) buf.WriteString(v) } } return buf.String() } // QueryUserDmg . func (d *Dao) QueryUserDmg(c context.Context) (jobURL string, err error) { logDay := time.Now().AddDate(0, 0, -1).Format("20060102") params := url.Values{} params.Set("query", fmt.Sprintf(_userDmgQueryHive, logDay)) var res struct { Code int `json:"code"` Msg string `json:"msg"` JobStatusURL string `json:"jobStatusUrl"` } if err = d.doHTTPGet(c, d.c.Berserker.API.Userdmg, "", params, d.c.Berserker.Key.HSC, _hscUserAgent, &res); err != nil { return } if res.Code != 200 { log.Error("Berserker user_dmg err(%v)", err) return } jobURL = res.JobStatusURL return } // QueryJobStatus 查询hive脚本执行结果 func (d *Dao) QueryJobStatus(c context.Context, jobURL string) (urls []string, err error) { var res struct { Code int `json:"code"` Msg string `json:"msg"` StatusID int `json:"statusId"` StatusMsg string `json:"statusMsg"` HdfsPath []string `json:"hdfsPath"` } req, err := xhttp.NewRequest(xhttp.MethodGet, jobURL, nil) if err != nil { log.Error("QueryJobStatus NewRequest, err(%v)", err) return } for { if err = d.HTTPClient.Do(c, req, &res); err != nil { log.Error("QueryJobStatus do get failed, joburl(%v), err(%v)", jobURL, err) return } if res.Code != 200 { log.Error("QueryJobStatus http code error, joburl(%v), err(%v)", jobURL, err) return } if res.StatusID == _jobStatusDoing || res.StatusID == _jobStatusWaiting { //等待1min log.Info("QueryJobStatus got job status %v, joburl(%v)", res.StatusID, jobURL) time.Sleep(60 * time.Second) continue } if res.StatusID == _jobStatusFailed { log.Error("QueryJobStatus got job status failed joburl(%v), err(%v)", jobURL, err) return } if res.StatusID == _jobStatusSuccess { log.Info("QueryJobStatus got job status success joburl(%v), err(%v)", jobURL, err) urls = res.HdfsPath return } if res.StatusID != _jobStatusSuccess && res.StatusID != _jobStatusFailed && res.StatusID != _jobStatusDoing && res.StatusID != _jobStatusWaiting { log.Error("QueryJobStatus got wrong job status status(%v), joburl(%v)", res.StatusID, jobURL) return } } } //QueryUpUserDmg . func (d *Dao) QueryUpUserDmg(c context.Context, mid int64) (upUserDmg []*model.UpUserDmg, err error) { params := url.Values{} params.Set("query", fmt.Sprintf(_upUserDmgQueryJSON, mid)) var res struct { Code int `json:"code"` Result []*model.UpUserDmg `json:"result"` } if err = d.doHTTPGet(c, d.c.Berserker.API.Upuserdmg, "", params, d.c.Berserker.Key.HSC, _hscUserAgent, &res); err != nil { return } if res.Code != 200 { log.Error("Berserker up_user_dmg err(%v)", err) return } upUserDmg = res.Result return } //QueryUpMid .发起hive查询,取粉丝数大于1万的up mid func (d *Dao) QueryUpMid(c context.Context, date string) (jobURL string, err error) { params := url.Values{} params.Set("query", fmt.Sprintf(_upMidQueryHive, date)) var res struct { Code int `json:"code"` Msg string `json:"msg"` JobStatusURL string `json:"jobStatusUrl"` } if err = d.doHTTPGet(c, d.c.Berserker.API.Upmid, "", params, d.c.Berserker.Key.LJ, _ljUserAgent, &res); err != nil { log.Error("hive QueryUpMid failed, err(%v)", err) return } if res.Code != 200 { fmt.Println(res.Code) log.Error("hive QueryUpMid failed, err(%v), httpcode(%v)", err, res.Code) return } jobURL = res.JobStatusURL fmt.Println(jobURL) return }