dataplatform.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "net/http"
  10. "net/url"
  11. "sort"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "go-common/app/admin/main/push/model"
  16. pushmdl "go-common/app/service/main/push/model"
  17. "go-common/library/ecode"
  18. "go-common/library/log"
  19. "go-common/library/sync/errgroup"
  20. )
  21. const (
  22. _countryForeign = 1
  23. _countryChina = 2
  24. _sexMale = 1
  25. _sexFemale = 2
  26. _platformAll = "4" // 全部平台,iOS、小米、华为...
  27. _attentionTypeUnion = 1 // 多个自选关注条件用union查询 (另一种是join查询)
  28. _timeLayout = "2006-01-02 15:04:05"
  29. _retry = 3
  30. // 检查数据平台当天的计算任务有没有成功
  31. checkDpDataURL = "http://berserker.bilibili.co/api/archer/project/%d/status"
  32. )
  33. var (
  34. areas = map[int]string{1: "海外", 2: "中国", 3: "青海", 4: "上海", 5: "安徽", 6: "广西", 7: "贵州", 8: "吉林", 9: "福建", 10: "黑龙江",
  35. 11: "江西", 12: "甘肃", 13: "云南", 14: "湖南", 15: "河北", 16: "山东", 17: "湖北", 18: "广东", 19: "宁夏", 20: "重庆",
  36. 21: "辽宁", 22: "内蒙古", 23: "山西", 24: "澳门", 25: "陕西", 26: "江苏", 27: "四川", 28: "浙江", 29: "海南", 30: "河南",
  37. 31: "北京", 32: "香港", 33: "台湾", 34: "天津", 35: "西藏", 36: "新疆", 37: "其他",
  38. }
  39. dpProjects = map[int]string{
  40. 355170: "mid维度数据",
  41. 357214: "buvid维度数据",
  42. 355628: "自选关注数据",
  43. }
  44. )
  45. // DpTaskInfo .
  46. func (s *Service) DpTaskInfo(ctx context.Context, id int64, job string) (res *model.DPTask, err error) {
  47. var (
  48. t *model.Task
  49. cond *model.DPCondition
  50. group = errgroup.Group{}
  51. )
  52. group.Go(func() error {
  53. t, err = s.dao.TaskInfo(ctx, id)
  54. return err
  55. })
  56. group.Go(func() error {
  57. cond, err = s.dao.DPCondition(ctx, job)
  58. return err
  59. })
  60. if err = group.Wait(); err != nil {
  61. return
  62. }
  63. if t == nil {
  64. return
  65. }
  66. res = new(model.DPTask)
  67. res.Task = *t
  68. p := new(model.DPParams)
  69. if cond != nil {
  70. if err = json.Unmarshal([]byte(cond.Condition), &p); err != nil {
  71. return
  72. }
  73. }
  74. if len(p.VipExpires) == 0 {
  75. p.VipExpires = make([]*model.VipExpire, 0)
  76. }
  77. if len(p.Attentions) == 0 {
  78. p.Attentions = make([]*model.SelfAttention, 0)
  79. }
  80. if len(p.ActivePeriods) == 0 {
  81. p.ActivePeriods = make([]*model.ActivePeriod, 0)
  82. }
  83. res.DPParams = *p
  84. return
  85. }
  86. // AddDPTask add data platform task
  87. func (s *Service) AddDPTask(ctx context.Context, task *model.DPTask) (err error) {
  88. var (
  89. tasks []model.DPTask
  90. condTyp = task.Type
  91. )
  92. if len(task.DPParams.ActivePeriods) == 0 {
  93. tasks = append(tasks, *task)
  94. } else {
  95. // 多个推送需要添加多条任务
  96. for _, p := range task.DPParams.ActivePeriods {
  97. var (
  98. ptime time.Time
  99. etime time.Time
  100. t = *task
  101. )
  102. if ptime, err = time.ParseInLocation(_timeLayout, p.PushTime, time.Local); err != nil {
  103. return
  104. }
  105. if etime, err = time.ParseInLocation(_timeLayout, p.ExpireTime, time.Local); err != nil {
  106. return
  107. }
  108. t.Job = strconv.FormatInt(pushmdl.JobName(ptime.Unix(), t.Summary, t.LinkValue, t.Group), 10)
  109. t.PushTime = ptime
  110. t.ExpireTime = etime
  111. t.ActivePeriod = p.Period
  112. tasks = append(tasks, t)
  113. }
  114. }
  115. for _, t := range tasks {
  116. go func(t model.DPTask) {
  117. var (
  118. id int64
  119. e error
  120. )
  121. for i := 0; i < _retry; i++ {
  122. if id, e = s.dao.AddTask(context.Background(), &t.Task); err == nil {
  123. break
  124. }
  125. time.Sleep(10 * time.Millisecond)
  126. }
  127. if e != nil {
  128. log.Error("s.AddDPTask(%+v) add task error(%v)", t.Task, e)
  129. return
  130. }
  131. t.DPParams.LevelStr = pushmdl.JoinInts(t.DPParams.Level)
  132. t.DPParams.AreaStr = pushmdl.JoinInts(t.DPParams.Area)
  133. t.DPParams.PlatformStr = pushmdl.JoinInts(t.DPParams.Platforms)
  134. t.DPParams.LikeStr = pushmdl.JoinInts(t.DPParams.Like)
  135. t.DPParams.ChannelStr = strings.Join(t.DPParams.Channel, ",")
  136. params, _ := json.Marshal(t.DPParams)
  137. cond := &model.DPCondition{
  138. Task: id,
  139. Job: t.Job,
  140. Type: condTyp,
  141. Condition: string(params),
  142. SQL: s.parseQuery(task.Type, &t.DPParams),
  143. Status: pushmdl.DpCondStatusPending,
  144. }
  145. for i := 0; i < _retry; i++ {
  146. if _, e = s.dao.AddDPCondition(context.Background(), cond); e == nil {
  147. break
  148. }
  149. time.Sleep(10 * time.Millisecond)
  150. }
  151. if e != nil {
  152. log.Error("s.AddDPTask(%+v) add condition error(%v)", cond, e)
  153. }
  154. }(t)
  155. }
  156. return
  157. }
  158. // 把查询结构体解析成sql
  159. func (s *Service) parseQuery(typ int, p *model.DPParams) (sql string) {
  160. log.Info("data platform parse query start(%+v)", p)
  161. if typ != pushmdl.TaskTypeDataPlatformToken && typ != pushmdl.TaskTypeDataPlatformMid {
  162. log.Error("data platform parse query task type(%d) error", typ)
  163. return
  164. }
  165. logDate := time.Now().Add(-24 * time.Hour).Format("20060102")
  166. switch typ {
  167. case pushmdl.TaskTypeDataPlatformToken:
  168. sql = fmt.Sprintf("select platform_id,device_token from basic.dws_push_buvid where log_date='%s' ", logDate)
  169. if p.UserActiveDay > 0 {
  170. sql += fmt.Sprintf(" and is_visit_%d=1 ", p.UserActiveDay)
  171. }
  172. if p.UserNewDay > 0 {
  173. sql += fmt.Sprintf(" and is_new_%d=1 ", p.UserNewDay)
  174. }
  175. if p.UserSilentDay > 0 {
  176. sql += fmt.Sprintf(" and is_visit_%d=0 ", p.UserSilentDay)
  177. }
  178. case pushmdl.TaskTypeDataPlatformMid:
  179. if len(p.Attentions) > 0 {
  180. sql = fmt.Sprintf("select t1.platform_id,t1.device_token from (select mid,platform_id,device_token from basic.dws_push_mid where log_date='%s' ", logDate)
  181. } else {
  182. sql = fmt.Sprintf("select platform_id,device_token from basic.dws_push_mid where log_date='%s' ", logDate)
  183. }
  184. if p.Sex == _sexMale || p.Sex == _sexFemale {
  185. sql += fmt.Sprintf(" and final_sex=%d ", p.Sex)
  186. }
  187. // 年龄段。0:0-17, 1:18-24, 2:25-30, 3:31+, 9:全部
  188. if p.Age >= 0 && p.Age <= 3 {
  189. sql += fmt.Sprintf(" and final_age_range=%d ", p.Age)
  190. }
  191. if len(p.Level) > 0 {
  192. sql += fmt.Sprintf(" and level in (%s) ", pushmdl.JoinInts(p.Level))
  193. }
  194. // up主 0: 不是 1:是 2:全部
  195. if p.IsUp == 0 || p.IsUp == 1 {
  196. sql += fmt.Sprintf(" and is_up=%d ", p.IsUp)
  197. }
  198. // 正式会员 0:不是 1:是 2:全部
  199. if p.IsFormalMember == 0 || p.IsFormalMember == 1 {
  200. sql += fmt.Sprintf(" and is_formal_member=%d ", p.IsFormalMember)
  201. }
  202. if len(p.VipExpires) > 0 {
  203. var vips []string
  204. for _, v := range p.VipExpires {
  205. if v.Begin == "" && v.End == "" {
  206. continue
  207. }
  208. if v.Begin == "" {
  209. vips = append(vips, fmt.Sprintf(" vip_overdue_time <= '%s'", v.End))
  210. continue
  211. }
  212. if v.End == "" {
  213. vips = append(vips, fmt.Sprintf(" vip_overdue_time >= '%s'", v.Begin))
  214. continue
  215. }
  216. vips = append(vips, fmt.Sprintf(" vip_overdue_time between '%s' and '%s' ", v.Begin, v.End))
  217. }
  218. if len(vips) > 0 {
  219. sql += fmt.Sprintf(" and (%s) ", strings.Join(vips, "or"))
  220. }
  221. }
  222. }
  223. if p.PlatformStr != "" && p.PlatformStr != _platformAll {
  224. for _, v := range p.Platforms {
  225. if v == pushmdl.PlatformAndroid {
  226. for _, i := range pushmdl.Platforms {
  227. if i == pushmdl.PlatformIPhone || i == pushmdl.PlatformIPad {
  228. continue
  229. }
  230. p.Platforms = append(p.Platforms, i)
  231. }
  232. break
  233. }
  234. }
  235. sql += fmt.Sprintf(" and platform_id in (%s) ", pushmdl.JoinInts(p.Platforms))
  236. }
  237. if len(p.Channel) > 0 {
  238. var brands []string
  239. for _, v := range p.Channel {
  240. if v == "" {
  241. continue
  242. }
  243. brands = append(brands, "'"+v+"'")
  244. }
  245. if len(brands) > 0 {
  246. sql += fmt.Sprintf(" and device_brand in (%s) ", strings.Join(brands, ","))
  247. }
  248. }
  249. if len(p.Area) > 0 {
  250. var countries, provinces []string
  251. for _, v := range p.Area {
  252. if areas[v] == "" {
  253. continue
  254. }
  255. if v == _countryForeign || v == _countryChina {
  256. countries = append(countries, "'"+areas[v]+"'")
  257. } else {
  258. provinces = append(provinces, "'"+areas[v]+"'")
  259. }
  260. }
  261. if len(countries) > 0 {
  262. sql += fmt.Sprintf(" and country in(%s) ", strings.Join(countries, ","))
  263. }
  264. if len(provinces) > 0 {
  265. sql += fmt.Sprintf(" and province in(%s) ", strings.Join(provinces, ","))
  266. }
  267. }
  268. if len(p.Like) > 0 {
  269. var likes []string
  270. for _, id := range p.Like {
  271. if s.partitions[id] == "" {
  272. continue
  273. }
  274. likes = append(likes, "'"+s.partitions[id]+"'")
  275. }
  276. if len(likes) > 0 {
  277. sql += fmt.Sprintf(" and like_tid in(%s) ", strings.Join(likes, ","))
  278. }
  279. }
  280. if p.ActivePeriod > 0 {
  281. sql += fmt.Sprintf(" and active_hour_period=%d ", p.ActivePeriod)
  282. }
  283. if typ == pushmdl.TaskTypeDataPlatformMid && len(p.Attentions) > 0 {
  284. var attentions []string
  285. if p.AttentionsType == _attentionTypeUnion {
  286. // 并集
  287. sql += ") t1 join ("
  288. for _, v := range p.Attentions {
  289. s := fmt.Sprintf("select mid from basic.dwd_oid_mid_info where log_date='%s' and follow_type=%d ", logDate, v.Type)
  290. if v.Include != "" {
  291. s += fmt.Sprintf(" and name='%s' ", strings.Replace(v.Include, "'", "\\'", -1))
  292. }
  293. if v.Exclude != "" {
  294. s += fmt.Sprintf("and name!='%s'", strings.Replace(v.Exclude, "'", "\\'", -1))
  295. }
  296. attentions = append(attentions, s)
  297. }
  298. sql += strings.Join(attentions, " union ") + ") t2 on t1.mid=t2.mid"
  299. } else {
  300. // 交集
  301. sql += ") t1 join "
  302. for i, v := range p.Attentions {
  303. s := fmt.Sprintf("(select mid from basic.dwd_oid_mid_info where log_date='%s' and follow_type=%d ", logDate, v.Type)
  304. if v.Include != "" {
  305. s += fmt.Sprintf(" and name='%s' ", strings.Replace(v.Include, "'", "\\'", -1))
  306. }
  307. if v.Exclude != "" {
  308. s += fmt.Sprintf("and name!='%s'", strings.Replace(v.Exclude, "'", "\\'", -1))
  309. }
  310. s += fmt.Sprintf(") t%d on t1.mid=t%d.mid ", i+5, i+5)
  311. attentions = append(attentions, s)
  312. }
  313. sql += strings.Join(attentions, " join ")
  314. }
  315. }
  316. log.Info("data platform parse query end(%s)", sql)
  317. return
  318. }
  319. // UpdateDpCondtionStatus .
  320. func (s *Service) UpdateDpCondtionStatus(ctx context.Context, job string, status int) (err error) {
  321. s.dao.UpdateDpCondtionStatus(ctx, job, status)
  322. return
  323. }
  324. type checkDpDataRes struct {
  325. Code int `json:"code"`
  326. Msg string `json:"msg"`
  327. ProjectID int `json:"projectId"`
  328. ProjectName string `json:"projectName"`
  329. }
  330. // CheckDpData check whether data platform data is ready
  331. func (s *Service) CheckDpData(ctx context.Context) (err error) {
  332. group := errgroup.Group{}
  333. for id := range dpProjects {
  334. id := id
  335. now := time.Now()
  336. bts := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix() * 1000 // 13位时间戳
  337. ets := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, 0, time.Local).Unix() * 1000
  338. params := dpparams(s.c.DPClient.Key)
  339. params.Set("runStartTime", strconv.FormatInt(bts, 10))
  340. params.Set("runEndTime", strconv.FormatInt(ets, 10))
  341. group.Go(func() (err error) {
  342. u := fmt.Sprintf(checkDpDataURL, id)
  343. if enc := dpsign(s.c.DPClient.Secret, params); enc != "" {
  344. u = u + "?" + enc
  345. }
  346. req, err := http.NewRequest(http.MethodGet, u, nil)
  347. if err != nil {
  348. log.Error("CheckDpData url(%s) error(%v)", u, err)
  349. return
  350. }
  351. res := new(checkDpDataRes)
  352. if err = s.dpClient.Do(ctx, req, res); err != nil {
  353. log.Error("CheckDpData url(%s) error(%v)", u+"?"+params.Encode(), err)
  354. return
  355. }
  356. if res.Code != http.StatusOK {
  357. err = ecode.PushAdminDPNoDataErr
  358. }
  359. log.Info("check data platform url(%s) param(%s) res(%+v)", u, params.Encode(), res)
  360. return
  361. })
  362. }
  363. err = group.Wait()
  364. return
  365. }
  366. // dpsign calc appkey and appsecret sign.
  367. func dpsign(secret string, params url.Values) (query string) {
  368. tmp := params.Encode()
  369. signTmp := dpencode(params)
  370. if strings.IndexByte(tmp, '+') > -1 {
  371. tmp = strings.Replace(tmp, "+", "%20", -1)
  372. }
  373. var b bytes.Buffer
  374. b.WriteString(secret)
  375. b.WriteString(signTmp)
  376. b.WriteString(secret)
  377. mh := md5.Sum(b.Bytes())
  378. var qb bytes.Buffer
  379. qb.WriteString(tmp)
  380. qb.WriteString("&sign=")
  381. qb.WriteString(strings.ToUpper(hex.EncodeToString(mh[:])))
  382. query = qb.String()
  383. return
  384. }
  385. func dpparams(appkey string) url.Values {
  386. params := url.Values{}
  387. params.Set("appKey", appkey)
  388. params.Set("timestamp", time.Now().Format("2006-01-02 15:04:05"))
  389. params.Set("version", "1.0")
  390. params.Set("signMethod", "md5")
  391. return params
  392. }
  393. var dpSignParams = []string{"appKey", "timestamp", "version"}
  394. // encode data platform encodes the values into ``URL encoded'' form
  395. // ("bar=baz&foo=quux") sorted by key.
  396. func dpencode(v url.Values) string {
  397. if v == nil {
  398. return ""
  399. }
  400. var buf bytes.Buffer
  401. keys := make([]string, 0, len(v))
  402. for k := range v {
  403. keys = append(keys, k)
  404. }
  405. sort.Strings(keys)
  406. for _, k := range keys {
  407. found := false
  408. for _, p := range dpSignParams {
  409. if p == k {
  410. found = true
  411. break
  412. }
  413. }
  414. if !found {
  415. continue
  416. }
  417. vs := v[k]
  418. prefix := k
  419. for _, v := range vs {
  420. buf.WriteString(prefix)
  421. buf.WriteString(v)
  422. }
  423. }
  424. return buf.String()
  425. }