berserker.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "fmt"
  8. "go-common/app/job/bbq/video/conf"
  9. "go-common/app/job/bbq/video/model"
  10. "go-common/library/conf/env"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. xhttp "net/http"
  14. "net/url"
  15. "os"
  16. "sort"
  17. "strings"
  18. "time"
  19. "io/ioutil"
  20. pkgerr "github.com/pkg/errors"
  21. )
  22. const (
  23. _jobStatusSuccess = 1
  24. _jobStatusFailed = 2
  25. _jobStatusDoing = 3
  26. _jobStatusWaiting = 4
  27. //_httpHeaderUser = "x1-bilispy-user"
  28. //_httpHeaderColor = "x1-bilispy-color"
  29. //_httpHeaderTimeout = "x1-bilispy-timeout"
  30. _httpHeaderRemoteIP = "x-backend-bili-real-ip"
  31. _userAgent = "User-Agent"
  32. _noKickUserAgent = "yangyucheng@bilibili.com"
  33. _queryJSON = `{"select":[],"where":{"log_date":{"in":["%s"]}},"page":{"limit":1000},"sort":{"play":-1}}`
  34. _queryJSONOper = `{"select":[],"where":{"log_date":{"in":["%s"]},"cid":{"gt":%d}},"page":{"limit":5000},"sort":{"cid":1}}`
  35. _hscUserAgent = "huangshancheng@bilibili.com"
  36. _lzqUserAgent = "liuzhiquan@bilibili.com"
  37. _chmUserAgent = "caiheming@bilibili.com"
  38. _ljUserAgent = "liujin@bilibili.com"
  39. //_userDmgQueryJSON = `{"select":[],"where":{"log_date":{"in":["%s"]},"mid":{"gt":"%s"}},"sort":{"mid":1},"page":{"limit":200}}`
  40. _upUserDmgQueryJSON = `{"select":[],"where":{"mid":{"gt":%d}},"sort":{"mid":1},"page":{"limit":200}}`
  41. _userDmgQueryHive = `select mid, gender, age, geo, content_tag, viewed_video, content_zone, content_count, follow_ups from sycpb.hbase_dmp_tag where last_active_date >= %s and length(viewed_video) > 0`
  42. _upMidQueryHive = `select mid from ods.ods_member_relation_stat where log_date = %s and follower>= 10000 limit 100`
  43. //_upMidQueryHive = `{"select":["name":"mid"],"where":{"log_date":{"in":["%s"]},"follower":{"gte":10000}, "pages":{"limit":10}}`
  44. _basePathUserProfile = "/tmp/"
  45. _basePathUserProfileBuvid = "/data/"
  46. )
  47. var (
  48. signParams = []string{"appKey", "timestamp", "version"}
  49. )
  50. // QueryPlayDaily get video play rank list from berserker
  51. func (d *Dao) QueryPlayDaily(c context.Context, date string) (vlist []*model.VideoHiveInfo, err error) {
  52. v := make(url.Values, 8)
  53. query := fmt.Sprintf(_queryJSON, date)
  54. v.Set("query", query)
  55. var res struct {
  56. Code int `json:"code"`
  57. Result []model.VideoHiveInfo `json:"result"`
  58. }
  59. if err = d.doHTTPGet(c, d.c.Berserker.API.Rankdaily, "", v, d.c.Berserker.Key.YYC, _noKickUserAgent, &res); err != nil {
  60. log.Error("d.doHTTPGet err[%v]", err)
  61. return
  62. }
  63. if res.Code != 200 || len(res.Result) == 0 {
  64. err = ecode.NothingFound
  65. log.Warn("Berserker return err, url:%s;res:%d", d.c.Berserker.API.Rankdaily+"?"+v.Encode(), res.Code)
  66. return
  67. }
  68. for _, info := range res.Result {
  69. i := info
  70. vlist = append(vlist, &i)
  71. }
  72. return
  73. }
  74. //QueryOperaVideo query operation video once
  75. func (d *Dao) QueryOperaVideo(c context.Context, date string, ch chan<- *model.VideoHiveInfo) (err error) {
  76. i := int64(0)
  77. var mid int64
  78. for {
  79. v := make(url.Values, 8)
  80. var res struct {
  81. Code int `json:"code"`
  82. Result []model.VideoHiveInfo `json:"result"`
  83. }
  84. query := fmt.Sprintf(_queryJSONOper, date, i)
  85. v.Set("query", query)
  86. if err = d.doHTTPGet(c, d.c.Berserker.API.Operaonce, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res); err != nil {
  87. log.Error("d.doHTTPGet err[%v]", err)
  88. return
  89. }
  90. if res.Code == 200 && len(res.Result) == 0 {
  91. return
  92. }
  93. if res.Code != 200 {
  94. err = ecode.NothingFound
  95. log.Warn("Berserker return err, url:%s;res:%d", d.c.Berserker.API.Operaonce+"?"+v.Encode(), res.Code)
  96. return
  97. }
  98. for _, info := range res.Result {
  99. ch <- &info
  100. mid = info.CID
  101. }
  102. i = mid
  103. }
  104. }
  105. //QueryUserBasic ...
  106. func (d *Dao) QueryUserBasic(c context.Context) (jobURL string, err error) {
  107. v := make(url.Values, 8)
  108. var res struct {
  109. Code int `json:"code"`
  110. Msg string `json:"msg"`
  111. Result []string `json:"result"`
  112. }
  113. query := "{}"
  114. v.Set("query", query)
  115. if err = d.doHTTPGet(c, d.c.Berserker.API.Userbasic, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res); err != nil {
  116. log.Error("d.doHTTPGet err[%v]", err)
  117. return
  118. }
  119. for i, file := range res.Result {
  120. query = fmt.Sprintf("{\"fileSuffix\": \"%s\"}", file)
  121. v.Set("query", query)
  122. bs, err := d.doHTTPGetRaw(c, d.c.Berserker.API.Userbasic, "", v, d.c.Berserker.Key.LZQ, _lzqUserAgent, &res)
  123. if err != nil {
  124. log.Error("d.doHTTPGet err[%v]", err)
  125. } else {
  126. fileName := fmt.Sprintf("/data/basic_profile/part_%d", i)
  127. if ioutil.WriteFile(fileName, bs, 0644) == nil {
  128. log.Info("write file success")
  129. } else {
  130. log.Error("write file error(%v)", err)
  131. }
  132. }
  133. }
  134. return
  135. }
  136. //UserProfileGet ...
  137. func (d *Dao) UserProfileGet(c context.Context) (jobURL []string, err error) {
  138. //
  139. v := make(url.Values, 8)
  140. var res struct {
  141. Code int `json:"code"`
  142. Msg string `json:"msg"`
  143. Result []string `json:"result"`
  144. }
  145. query := "{}"
  146. v.Set("query", query)
  147. if err = d.doHTTPGet(c, d.c.Berserker.API.UserProfile, "", v, d.c.Berserker.Key.HM, _chmUserAgent, &res); err != nil {
  148. log.Error("d.doHTTPGet err[%v]", err)
  149. return
  150. }
  151. for i, file := range res.Result {
  152. query = fmt.Sprintf("{\"fileSuffix\": \"/%s\"}", file)
  153. //fmt.Printf("query: %v\n", query)
  154. v.Set("query", query)
  155. time.Sleep(3 * time.Second)
  156. var bs []byte
  157. bs, err = d.doHTTPGetRaw(c, d.c.Berserker.API.UserProfile, "", v, d.c.Berserker.Key.HM, _chmUserAgent, &res)
  158. if err != nil {
  159. log.Error("d.doHTTPGet err[%v]", err)
  160. } else {
  161. fileName := fmt.Sprintf(_basePathUserProfile+"part_%d", i)
  162. if ioutil.WriteFile(fileName, bs, 0644) == nil {
  163. log.Info("write file success")
  164. } else {
  165. log.Error("write file error(%v)", err)
  166. }
  167. d.ReadLine(fmt.Sprintf(_basePathUserProfile+"part_%d", i), d.HandlerUserBbqDmg)
  168. os.RemoveAll(fmt.Sprintf(_basePathUserProfile+"part_%d", i))
  169. }
  170. }
  171. time.Sleep(3 * time.Second)
  172. v2 := make(url.Values, 8)
  173. var res2 struct {
  174. Code int `json:"code"`
  175. Msg string `json:"msg"`
  176. Result []string `json:"result"`
  177. }
  178. query2 := "{}"
  179. v2.Set("query2", query2)
  180. if err = d.doHTTPGet(c, d.c.Berserker.API.UserProfileBuvid, "", v2, d.c.Berserker.Key.HM, _chmUserAgent, &res2); err != nil {
  181. log.Error("d.doHTTPGet err[%v]", err)
  182. return
  183. }
  184. for i, file := range res2.Result {
  185. query2 = fmt.Sprintf("{\"fileSuffix\": \"/%s\"}", file)
  186. //fmt.Printf("query: %v\n", query)
  187. v2.Set("query", query2)
  188. time.Sleep(3 * time.Second)
  189. bs, err := d.doHTTPGetRaw(c, d.c.Berserker.API.UserProfileBuvid, "", v2, d.c.Berserker.Key.HM, _chmUserAgent, &res2)
  190. if err != nil {
  191. log.Error("d.doHTTPGet err[%v]", err)
  192. } else {
  193. fileName := fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i)
  194. if ioutil.WriteFile(fileName, bs, 0644) == nil {
  195. log.Info("write file success")
  196. } else {
  197. log.Error("write file error(%v)", err)
  198. }
  199. d.ReadLine(fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i), d.HandlerUserBbqDmgBuvid)
  200. os.RemoveAll(fmt.Sprintf(_basePathUserProfileBuvid+"part_%d", i))
  201. }
  202. }
  203. return
  204. }
  205. // doHttpRequest make a http request for data platform api
  206. func (d *Dao) doHTTPGet(c context.Context, uri, realIP string, params url.Values, key *conf.BerSerkerKey, userAgent string, res interface{}) (err error) {
  207. enc, err := d.berserkeSign(params, key)
  208. if err != nil {
  209. err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params)
  210. return
  211. }
  212. if enc != "" {
  213. uri = uri + "?" + enc
  214. }
  215. req, err := xhttp.NewRequest(xhttp.MethodGet, uri, nil)
  216. fmt.Printf("Req: %s ", req.URL)
  217. if err != nil {
  218. err = pkgerr.Wrapf(err, "method:%s,uri:%s", xhttp.MethodGet, uri)
  219. return
  220. }
  221. req.Header.Set(_userAgent, userAgent+" "+env.AppID)
  222. if err != nil {
  223. return
  224. }
  225. if realIP != "" {
  226. req.Header.Set(_httpHeaderRemoteIP, realIP)
  227. }
  228. return d.HTTPClient.Do(c, req, res)
  229. }
  230. // doHTTPGetRaw make a http request for data platform api
  231. func (d *Dao) doHTTPGetRaw(c context.Context, uri, realIP string, params url.Values, key *conf.BerSerkerKey, userAgent string, res interface{}) (bs []byte, err error) {
  232. enc, err := d.berserkeSign(params, key)
  233. if err != nil {
  234. err = pkgerr.Wrapf(err, "uri:%s,params:%v", uri, params)
  235. return
  236. }
  237. if enc != "" {
  238. uri = uri + "?" + enc
  239. }
  240. req, err := xhttp.NewRequest(xhttp.MethodGet, uri, nil)
  241. if err != nil {
  242. err = pkgerr.Wrapf(err, "method:%s,uri:%s", xhttp.MethodGet, uri)
  243. return
  244. }
  245. req.Header.Set(_userAgent, userAgent+" "+env.AppID)
  246. if err != nil {
  247. return
  248. }
  249. if realIP != "" {
  250. req.Header.Set(_httpHeaderRemoteIP, realIP)
  251. }
  252. return d.HTTPClient.Raw(c, req)
  253. }
  254. // Sign calc appkey and appsecret sign.
  255. func (d *Dao) berserkeSign(params url.Values, key *conf.BerSerkerKey) (query string, err error) {
  256. params.Set("appKey", key.Appkey)
  257. params.Set("signMethod", "md5")
  258. params.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  259. params.Set("version", "1.0")
  260. tmp := params.Encode()
  261. signTmp := d.encode(params)
  262. if strings.IndexByte(tmp, '+') > -1 {
  263. tmp = strings.Replace(tmp, "+", "%20", -1)
  264. }
  265. var b bytes.Buffer
  266. b.WriteString(key.Secret)
  267. b.WriteString(signTmp)
  268. b.WriteString(key.Secret)
  269. mh := md5.Sum(b.Bytes())
  270. // query
  271. var qb bytes.Buffer
  272. qb.WriteString(tmp)
  273. qb.WriteString("&sign=")
  274. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  275. query = qb.String()
  276. return
  277. }
  278. // Encode encodes the values into ``URL encoded'' form
  279. // ("bar=baz&foo=quux") sorted by key.
  280. func (d *Dao) encode(v url.Values) string {
  281. if v == nil {
  282. return ""
  283. }
  284. var buf bytes.Buffer
  285. keys := make([]string, 0, len(v))
  286. for k := range v {
  287. keys = append(keys, k)
  288. }
  289. sort.Strings(keys)
  290. for _, k := range keys {
  291. found := false
  292. for _, p := range signParams {
  293. if p == k {
  294. found = true
  295. break
  296. }
  297. }
  298. if !found {
  299. continue
  300. }
  301. vs := v[k]
  302. prefix := k
  303. for _, v := range vs {
  304. buf.WriteString(prefix)
  305. buf.WriteString(v)
  306. }
  307. }
  308. return buf.String()
  309. }
  310. // QueryUserDmg .
  311. func (d *Dao) QueryUserDmg(c context.Context) (jobURL string, err error) {
  312. logDay := time.Now().AddDate(0, 0, -1).Format("20060102")
  313. params := url.Values{}
  314. params.Set("query", fmt.Sprintf(_userDmgQueryHive, logDay))
  315. var res struct {
  316. Code int `json:"code"`
  317. Msg string `json:"msg"`
  318. JobStatusURL string `json:"jobStatusUrl"`
  319. }
  320. if err = d.doHTTPGet(c, d.c.Berserker.API.Userdmg, "", params, d.c.Berserker.Key.HSC, _hscUserAgent, &res); err != nil {
  321. return
  322. }
  323. if res.Code != 200 {
  324. log.Error("Berserker user_dmg err(%v)", err)
  325. return
  326. }
  327. jobURL = res.JobStatusURL
  328. return
  329. }
  330. // QueryJobStatus 查询hive脚本执行结果
  331. func (d *Dao) QueryJobStatus(c context.Context, jobURL string) (urls []string, err error) {
  332. var res struct {
  333. Code int `json:"code"`
  334. Msg string `json:"msg"`
  335. StatusID int `json:"statusId"`
  336. StatusMsg string `json:"statusMsg"`
  337. HdfsPath []string `json:"hdfsPath"`
  338. }
  339. req, err := xhttp.NewRequest(xhttp.MethodGet, jobURL, nil)
  340. if err != nil {
  341. log.Error("QueryJobStatus NewRequest, err(%v)", err)
  342. return
  343. }
  344. for {
  345. if err = d.HTTPClient.Do(c, req, &res); err != nil {
  346. log.Error("QueryJobStatus do get failed, joburl(%v), err(%v)", jobURL, err)
  347. return
  348. }
  349. if res.Code != 200 {
  350. log.Error("QueryJobStatus http code error, joburl(%v), err(%v)", jobURL, err)
  351. return
  352. }
  353. if res.StatusID == _jobStatusDoing || res.StatusID == _jobStatusWaiting {
  354. //等待1min
  355. log.Info("QueryJobStatus got job status %v, joburl(%v)", res.StatusID, jobURL)
  356. time.Sleep(60 * time.Second)
  357. continue
  358. }
  359. if res.StatusID == _jobStatusFailed {
  360. log.Error("QueryJobStatus got job status failed joburl(%v), err(%v)", jobURL, err)
  361. return
  362. }
  363. if res.StatusID == _jobStatusSuccess {
  364. log.Info("QueryJobStatus got job status success joburl(%v), err(%v)", jobURL, err)
  365. urls = res.HdfsPath
  366. return
  367. }
  368. if res.StatusID != _jobStatusSuccess && res.StatusID != _jobStatusFailed && res.StatusID != _jobStatusDoing && res.StatusID != _jobStatusWaiting {
  369. log.Error("QueryJobStatus got wrong job status status(%v), joburl(%v)", res.StatusID, jobURL)
  370. return
  371. }
  372. }
  373. }
  374. //QueryUpUserDmg .
  375. func (d *Dao) QueryUpUserDmg(c context.Context, mid int64) (upUserDmg []*model.UpUserDmg, err error) {
  376. params := url.Values{}
  377. params.Set("query", fmt.Sprintf(_upUserDmgQueryJSON, mid))
  378. var res struct {
  379. Code int `json:"code"`
  380. Result []*model.UpUserDmg `json:"result"`
  381. }
  382. if err = d.doHTTPGet(c, d.c.Berserker.API.Upuserdmg, "", params, d.c.Berserker.Key.HSC, _hscUserAgent, &res); err != nil {
  383. return
  384. }
  385. if res.Code != 200 {
  386. log.Error("Berserker up_user_dmg err(%v)", err)
  387. return
  388. }
  389. upUserDmg = res.Result
  390. return
  391. }
  392. //QueryUpMid .发起hive查询,取粉丝数大于1万的up mid
  393. func (d *Dao) QueryUpMid(c context.Context, date string) (jobURL string, err error) {
  394. params := url.Values{}
  395. params.Set("query", fmt.Sprintf(_upMidQueryHive, date))
  396. var res struct {
  397. Code int `json:"code"`
  398. Msg string `json:"msg"`
  399. JobStatusURL string `json:"jobStatusUrl"`
  400. }
  401. if err = d.doHTTPGet(c, d.c.Berserker.API.Upmid, "", params, d.c.Berserker.Key.LJ, _ljUserAgent, &res); err != nil {
  402. log.Error("hive QueryUpMid failed, err(%v)", err)
  403. return
  404. }
  405. if res.Code != 200 {
  406. fmt.Println(res.Code)
  407. log.Error("hive QueryUpMid failed, err(%v), httpcode(%v)", err, res.Code)
  408. return
  409. }
  410. jobURL = res.JobStatusURL
  411. fmt.Println(jobURL)
  412. return
  413. }