report.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package report
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. xhttp "net/http"
  10. "net/url"
  11. "sort"
  12. "strings"
  13. "time"
  14. mdlrep "go-common/app/job/main/tv/model/report"
  15. "go-common/library/conf/env"
  16. "go-common/library/log"
  17. "go-common/library/net/metadata"
  18. )
  19. const (
  20. _httpOK = "0"
  21. _pid = "73" // platform: 73 is tv
  22. _plat = 5 // platform: 5 is tv
  23. _tv = "android_tv_yst"
  24. _actDur = `select request_uri,time_iso,ip,version,buvid,fts,proid,chid,pid,brand,deviceid,model,osver,ctime,mid,ver,net,oid,eid,start_time,end_time,duration,openudid,idfa,mac,is_coldstart,session_id,buvid_ext from ods.app_active_duration where unix_timestamp(ctime, 'yyyyMMddHHmmss')>=%d and unix_timestamp(ctime, 'yyyyMMddHHmmss')<%d and log_date=%s and pid=%s`
  25. _playDur = `select stime,build,buvid,mobi_app,platform,session,mid,aid,cid,sid,epid,type,sub_type,quality,total_time,paused_time,played_time,video_duration,play_type,network_type,last_play_progress_time,max_play_progress_time,device,epid_status,play_status,user_status,actual_played_time,auto_play,detail_play_time,list_play_time from ods.app_play_duration where stime>=%d and stime<%d and log_date=%s and mobi_app=%s`
  26. _visitEvent = `select request_uri,time_iso,ip,version,buvid,fts,proid,chid,pid,brand,deviceid,model,osver,mid,ctime,ver,net,oid,page_name,page_arg,ua,h5_chid,unix_timestamp(ctime, 'yyyyMMddHHmmss') from ods.app_visit_event where unix_timestamp(ctime, 'yyyyMMddHHmmss')>=%d and unix_timestamp(ctime, 'yyyyMMddHHmmss')<%d and log_date=%s and pid=%s`
  27. _arcClick = `select r_type,avid,cid,part,mid,stime,did,ip,ua,buvid,cookie_sid,refer,type,sub_type,sid,epid,platform,device from ods.ods_archive_click where stime>=%d and stime<%d and log_date=%s and plat=%d`
  28. )
  29. var (
  30. signParams = []string{"appKey", "timestamp", "version"}
  31. _userAgent = "User-Agent"
  32. )
  33. // Report .
  34. func (d *Dao) Report(ctx context.Context, table string) (info string, err error) {
  35. var (
  36. v = url.Values{}
  37. ip = metadata.String(ctx, metadata.RemoteIP)
  38. logdata = d.queryfmt()
  39. start, end = d.dealDate()
  40. query string
  41. res struct {
  42. Code int `json:"code"`
  43. Msg string `json:"msg"`
  44. StatusURL string `json:"jobStatusUrl"`
  45. }
  46. )
  47. logdata = "'" + logdata + "'"
  48. switch table {
  49. case mdlrep.ArchiveClick:
  50. query = fmt.Sprintf(_arcClick, start, end, logdata, _plat)
  51. case mdlrep.ActiveDuration:
  52. query = fmt.Sprintf(_actDur, start, end, logdata, `"`+_pid+`"`)
  53. case mdlrep.PlayDuration:
  54. query = fmt.Sprintf(_playDur, start, end, logdata, "'"+_tv+"'")
  55. case mdlrep.VisitEvent:
  56. query = fmt.Sprintf(_visitEvent, start, end, logdata, `"`+_pid+`"`)
  57. default:
  58. err = errors.New("table is nill")
  59. return
  60. }
  61. v.Set("appKey", d.conf.DpClient.Key)
  62. v.Set("signMethod", "md5")
  63. v.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  64. v.Set("version", "1.0")
  65. v.Set("query", query)
  66. if err = d.newRequest(ctx, d.conf.Report.ReportURI, ip, v, &res); err != nil {
  67. log.Error("newRequest url(%v), err(%v)", d.conf.Report.ReportURI+"?"+v.Encode(), err)
  68. return
  69. }
  70. if res.Code == 200 && res.StatusURL != "" {
  71. info = res.StatusURL
  72. }
  73. return
  74. }
  75. // CheckJob .
  76. func (d *Dao) CheckJob(ctx context.Context, urls string) (res *mdlrep.DpCheckJobResult, err error) {
  77. var (
  78. v = url.Values{}
  79. ip = metadata.String(ctx, metadata.RemoteIP)
  80. )
  81. res = &mdlrep.DpCheckJobResult{}
  82. v.Set("appKey", d.conf.DpClient.Key)
  83. v.Set("signMethod", "md5")
  84. v.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  85. v.Set("version", "1.0")
  86. if err = d.newRequest(ctx, urls, ip, v, &res); err != nil {
  87. log.Error("d.newRequest error(%v)", err)
  88. return
  89. }
  90. if res.Code != xhttp.StatusOK {
  91. log.Error("d.CheckJob newRequest error code:%d ; url(%s) ", res.Code, urls+"?"+v.Encode())
  92. err = fmt.Errorf("code(%d) msg(%s) statusID(%d) statusID(%s)", res.Code, res.Msg, res.StatusID, res.StatusMsg)
  93. }
  94. return
  95. }
  96. // PostRequest .
  97. func (d *Dao) PostRequest(ctx context.Context, body string) (err error) {
  98. var (
  99. req *xhttp.Request
  100. res struct {
  101. Code string `json:"code"`
  102. Message string `json:"message"`
  103. }
  104. )
  105. if req, err = xhttp.NewRequest(xhttp.MethodPost, d.conf.Report.UpDataURI, strings.NewReader(body)); err != nil {
  106. log.Error("xhttp.NewRequest url(%s) error (%v)", d.conf.Report.UpDataURI, err)
  107. return
  108. }
  109. req.Header.Add("Content-Type", "text/plain; charset=utf-8")
  110. if err = d.httpR.Do(ctx, req, &res); err != nil {
  111. log.Error("d.httpReq.Do error(%v) url(%s)", err, d.conf.Report.UpDataURI)
  112. return
  113. }
  114. if res.Code != _httpOK {
  115. log.Error("PostRequest error code:%s ; url(%s) ", res.Code, d.conf.Report.UpDataURI)
  116. err = fmt.Errorf("code(%s)", res.Code)
  117. }
  118. return
  119. }
  120. // NewRequest new http request with method, url, ip, values and headers .
  121. func (d *Dao) newRequest(c context.Context, url, realIP string, params url.Values, res interface{}) (err error) {
  122. enc, err := d.sign(params)
  123. if err != nil {
  124. log.Error("url:%s,params:%v", url, params)
  125. return
  126. }
  127. if enc != "" {
  128. url = url + "?" + enc
  129. }
  130. req, err := xhttp.NewRequest(xhttp.MethodGet, url, nil)
  131. if err != nil {
  132. log.Error("xhttp.NewRequest method:%s,url:%s", xhttp.MethodGet, url)
  133. return
  134. }
  135. req.Header.Set(_userAgent, "haoguanwei@bilibili.com "+env.AppID)
  136. if err != nil {
  137. return
  138. }
  139. return d.httpR.Do(c, req, res)
  140. }
  141. // Sign calc appkey and appsecret sign .
  142. func (d *Dao) sign(params url.Values) (query string, err error) {
  143. tmp := params.Encode()
  144. signTmp := d.encode(params)
  145. if strings.IndexByte(tmp, '+') > -1 {
  146. tmp = strings.Replace(tmp, "+", "%20", -1)
  147. }
  148. var b bytes.Buffer
  149. b.WriteString(d.conf.DpClient.Secret)
  150. b.WriteString(signTmp)
  151. b.WriteString(d.conf.DpClient.Secret)
  152. mh := md5.Sum(b.Bytes())
  153. // query
  154. var qb bytes.Buffer
  155. qb.WriteString(tmp)
  156. qb.WriteString("&sign=")
  157. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  158. query = qb.String()
  159. return
  160. }
  161. // Encode encodes the values into ``URL encoded'' form .
  162. // ("bar=baz&foo=quux") sorted by key .
  163. func (d *Dao) encode(v url.Values) string {
  164. if v == nil {
  165. return ""
  166. }
  167. var buf bytes.Buffer
  168. keys := make([]string, 0, len(v))
  169. for k := range v {
  170. keys = append(keys, k)
  171. }
  172. sort.Strings(keys)
  173. for _, k := range keys {
  174. found := false
  175. for _, p := range signParams {
  176. if p == k {
  177. found = true
  178. break
  179. }
  180. }
  181. if !found {
  182. continue
  183. }
  184. vs := v[k]
  185. prefix := k
  186. for _, v := range vs {
  187. buf.WriteString(prefix)
  188. buf.WriteString(v)
  189. }
  190. }
  191. return buf.String()
  192. }
  193. // delDate create start and end time .
  194. func (d *Dao) dealDate() (start, end int64) {
  195. var (
  196. timeDelay = "-" + d.conf.Report.TimeDelay
  197. timeSpan = "-" + time.Duration(d.conf.Report.SeTimeSpan).String()
  198. )
  199. endTime := time.Now()
  200. et, _ := time.ParseDuration(timeDelay)
  201. endTmp := endTime.Add(et)
  202. end = endTime.Add(et).Unix()
  203. st, _ := time.ParseDuration(timeSpan)
  204. start = endTmp.Add(st).Unix()
  205. return
  206. }
  207. func (d *Dao) queryfmt() (logdata string) {
  208. var (
  209. dtime = time.Now()
  210. timeData = "-" + d.conf.Report.TimeDelay
  211. )
  212. et, _ := time.ParseDuration(timeData)
  213. logdata = dtime.Add(et).Format("20060102")
  214. return
  215. }