user.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. package dao
  2. import (
  3. "bufio"
  4. "context"
  5. "database/sql"
  6. "encoding/json"
  7. "fmt"
  8. "go-common/app/job/bbq/video/conf"
  9. "go-common/app/job/bbq/video/model"
  10. video "go-common/app/service/bbq/video/api/grpc/v1"
  11. acc "go-common/app/service/main/account/api"
  12. xsql "go-common/library/database/sql"
  13. "go-common/library/log"
  14. "go-common/library/net/metadata"
  15. "io"
  16. "net/http"
  17. "net/url"
  18. "os"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. const (
  24. _userDmgCacheKey = "bbq:user:profile:%s"
  25. _userDmgCacheKeyBuvid = "bbq:device:profile:{buvid}:%s"
  26. //_userDmgCacheKeyBbq="bbq:user:profile"
  27. _userDmgCacheTimeout = 86400
  28. _incrUpUserDmgSQL = "insert into user_statistics_hive (`mid`,`uname`,`play_total`,`fan_total`,`av_total`,`like_total`) value (?,?,?,?,?,?)"
  29. _updateUpUserDmgSQL = "update user_statistics_hive set `uname` = ? , `play_total` = ? , `fan_total` = ? , `av_total` = ? , `like_total` = ? , `mtime` = ? where `mid` = ?"
  30. _delUpUserDmgSQL = "delete from user_statistics_hive where mtime < ?"
  31. _insertOnDupUpDmg = "insert into user_statistics_hive (`mid`,`uname`,`play_total`,`fan_total`,`av_total`,`like_total`) value (?,?,?,?,?,?) on duplicate key update `uname`=?,`play_total`=?,`fan_total`=?,`av_total`=?,`like_total`=?"
  32. _selMidFromVideo = "select distinct mid from video"
  33. _queryUsersByLast = "select id,mid,uname from user_base where id > ? order by id ASC limit ?"
  34. _selMidFromUserBase = "SELECT DISTINCT mid fROM user_base limit ?, 1000"
  35. _upUserBase = "UPDATE user_base SET face = ? where mid = ?"
  36. )
  37. //getUserDmgKey .
  38. func getUserDmgKey(mid string) (key string) {
  39. return fmt.Sprintf(_userDmgCacheKey, mid)
  40. }
  41. //getUserBuvidDmgKey .
  42. func getUserBuvidDmgKey(buvid string) (key string) {
  43. return fmt.Sprintf(_userDmgCacheKeyBuvid, buvid)
  44. }
  45. // InsertOnDup ...
  46. func (d *Dao) InsertOnDup(c context.Context, upUserDmg *model.UpUserDmg) (err error) {
  47. _, err = d.db.Exec(c, _insertOnDupUpDmg, upUserDmg.MID, upUserDmg.Uname, upUserDmg.Play, upUserDmg.Fans, upUserDmg.AVs, upUserDmg.Likes, upUserDmg.Uname, upUserDmg.Play, upUserDmg.Fans, upUserDmg.AVs, upUserDmg.Likes)
  48. return
  49. }
  50. //CacheUserDmg ...
  51. func (d *Dao) CacheUserDmg(c context.Context, userDmg *model.UserDmg) (mid string, err error) {
  52. conn := d.redis.Get(c)
  53. defer conn.Close()
  54. var b []byte
  55. if b, err = json.Marshal(userDmg); err != nil {
  56. log.Error("cache user dmg marshal err(%v)", err)
  57. return
  58. }
  59. cacheKey := getUserDmgKey(userDmg.MID)
  60. fmt.Println(cacheKey)
  61. if _, err = conn.Do("SET", cacheKey, b, "EX", _userDmgCacheTimeout); err != nil {
  62. log.Error("cache user dmg redis set err(%v)", err)
  63. return
  64. }
  65. return
  66. }
  67. //CacheUserBbqDmg ...
  68. func (d *Dao) CacheUserBbqDmg(c context.Context, userBbqDmg *model.UserBbqDmg) (mid string, err error) {
  69. conn := d.redis.Get(c)
  70. defer conn.Close()
  71. tag2 := strings.Join(userBbqDmg.Tag2, ",")
  72. tag3 := strings.Join(userBbqDmg.Tag3, ",")
  73. up := strings.Join(userBbqDmg.Up, ",")
  74. cacheKey := getUserDmgKey(userBbqDmg.MID)
  75. if err = conn.Send("HSET", cacheKey, "zone", tag2); err != nil {
  76. log.Error("cache user bbq dmg redis set tag2 err(%v)", err)
  77. return
  78. }
  79. if err = conn.Send("HSET", cacheKey, "tag", tag3); err != nil {
  80. log.Error("cache user bbq dmg redis set tag3 err(%v)", err)
  81. return
  82. }
  83. if err = conn.Send("HSET", cacheKey, "up", up); err != nil {
  84. log.Error("cache user bbq dmg redis set up err(%v)", err)
  85. return
  86. }
  87. return
  88. }
  89. //CacheUserBbqDmgBuvid ...
  90. func (d *Dao) CacheUserBbqDmgBuvid(c context.Context, userBbqDmgBuvid *model.UserBbqBuvidDmg) (Buvid string, err error) {
  91. conn := d.redis.Get(c)
  92. defer conn.Close()
  93. tag2 := strings.Join(userBbqDmgBuvid.Tag2, ",")
  94. tag3 := strings.Join(userBbqDmgBuvid.Tag3, ",")
  95. up := strings.Join(userBbqDmgBuvid.Up, ",")
  96. cacheKey := getUserBuvidDmgKey(userBbqDmgBuvid.Buvid)
  97. if err = conn.Send("HSET", cacheKey, "zone", tag2); err != nil {
  98. log.Error("cache user bbq buvid dmg redis set tag2 err(%v)", err)
  99. return
  100. }
  101. if err = conn.Send("HSET", cacheKey, "tag", tag3); err != nil {
  102. log.Error("cache user bbq buvid dmg redis set tag3 err(%v)", err)
  103. return
  104. }
  105. if err = conn.Send("HSET", cacheKey, "up", up); err != nil {
  106. log.Error("cache user bbq buvid dmg redis set up err(%v)", err)
  107. return
  108. }
  109. return
  110. }
  111. // AddUpUserDmg .
  112. func (d *Dao) AddUpUserDmg(c context.Context, upUserDmg *model.UpUserDmg) (num int64, err error) {
  113. var res sql.Result
  114. if res, err = d.db.Exec(c, _incrUpUserDmgSQL, upUserDmg.MID, upUserDmg.Uname, upUserDmg.Play, upUserDmg.Fans, upUserDmg.AVs, upUserDmg.Likes); err != nil {
  115. return 0, err
  116. }
  117. return res.LastInsertId()
  118. }
  119. // UpdateUpUserDmg .
  120. func (d *Dao) UpdateUpUserDmg(c context.Context, upUserDmg *model.UpUserDmg) (num int64, err error) {
  121. t := time.Now().AddDate(0, 0, 0).Format("2006-01-02 15:04:05")
  122. var res sql.Result
  123. if res, err = d.db.Exec(c, _updateUpUserDmgSQL, upUserDmg.Uname, upUserDmg.Play, upUserDmg.Fans, upUserDmg.AVs, upUserDmg.Likes, t, upUserDmg.MID); err != nil {
  124. return 0, err
  125. }
  126. return res.RowsAffected()
  127. }
  128. // DelUpUserDmg .
  129. func (d *Dao) DelUpUserDmg(c context.Context) (num int64, err error) {
  130. t := time.Unix(time.Now().Unix(), -int64(36*time.Hour)).Format("2006-01-02 15:04:05")
  131. var res sql.Result
  132. if res, err = d.db.Exec(c, _delUpUserDmgSQL, t); err != nil {
  133. return 0, err
  134. }
  135. return res.RowsAffected()
  136. }
  137. //Download 下载文件
  138. func (d *Dao) Download(url string, name string) (fpath string, err error) {
  139. if name == "" {
  140. u := strings.Split(url, "/")
  141. l := len(u)
  142. name = u[l-1]
  143. }
  144. t := time.Now().AddDate(0, 0, 0).Format("20060102")
  145. path := conf.Conf.Download.File + t
  146. err = d.CreateDir(path)
  147. if err != nil {
  148. log.Error("create dir(%s) err(%v)", path, err)
  149. return
  150. }
  151. fpath = path + "/" + name
  152. newFile, err := os.Create(fpath)
  153. if err != nil {
  154. log.Error("create path(%s) err(%v)", fpath, err)
  155. return
  156. }
  157. defer newFile.Close()
  158. client := http.Client{}
  159. resp, err := client.Get(url)
  160. if err != nil {
  161. log.Error("download url(%s) err(%v)", url, err)
  162. return
  163. }
  164. defer resp.Body.Close()
  165. _, err = io.Copy(newFile, resp.Body)
  166. if err != nil {
  167. log.Error("copy err(%v)", err)
  168. return
  169. }
  170. return
  171. }
  172. //CreateDir 创建文件夹
  173. func (d *Dao) CreateDir(path string) (err error) {
  174. _, err = os.Stat(path)
  175. defer func() {
  176. if os.IsExist(err) {
  177. err = nil
  178. }
  179. }()
  180. if os.IsNotExist(err) {
  181. err = os.Mkdir(path, os.ModePerm)
  182. }
  183. return
  184. }
  185. // ReadLine 按行读取文件,hander回调
  186. func (d *Dao) ReadLine(path string, handler func(string)) (err error) {
  187. f, err := os.Open(path)
  188. if err != nil {
  189. log.Error("open path(%s) err(%v)", path, err)
  190. return
  191. }
  192. defer f.Close()
  193. buf := bufio.NewReader(f)
  194. for {
  195. line, err := buf.ReadString('\n')
  196. if err != nil {
  197. if err == io.EOF {
  198. return nil
  199. }
  200. log.Error("read path(%s) err(%v)", path, err)
  201. return nil
  202. }
  203. line = strings.TrimSpace(line)
  204. handler(line)
  205. time.Sleep(time.Duration(1) * time.Second)
  206. }
  207. }
  208. // ReadLines 50条发起一次grpc请求
  209. func (d *Dao) ReadLines(path string, handler func([]int64)) (err error) {
  210. f, err := os.Open(path)
  211. if err != nil {
  212. log.Error("ReadLine open path(%s) err(%v)", path, err)
  213. return
  214. }
  215. defer f.Close()
  216. buf := bufio.NewReader(f)
  217. mids := make([]int64, 0, 50)
  218. i := 0
  219. for {
  220. line, err := buf.ReadString('\n')
  221. if err != nil {
  222. if err == io.EOF {
  223. err = nil
  224. break
  225. }
  226. log.Error("read path(%s) err(%v)", path, err)
  227. break
  228. }
  229. mid, _ := strconv.ParseInt(strings.TrimSpace(line), 10, 64)
  230. mids = append(mids, mid)
  231. i++
  232. if i == 50 {
  233. handler(mids)
  234. mids = make([]int64, 0, 50)
  235. i = 0
  236. time.Sleep(time.Duration(1) * time.Second)
  237. }
  238. }
  239. if len(mids) != 0 {
  240. handler(mids)
  241. }
  242. return
  243. }
  244. //HandlerUserDmg mid, gender, age, geo, content_tag, viewed_video, content_zone, content_count, follow_ups
  245. func (d *Dao) HandlerUserDmg(user string) {
  246. u := strings.Split(user, "\u0001")
  247. userDmg := &model.UserDmg{
  248. MID: u[0],
  249. Gender: u[1],
  250. Age: u[2],
  251. Geo: u[3],
  252. ContentTag: u[4],
  253. ViewedVideo: d.HandlerViewedVideo(u[5]),
  254. ContentZone: u[6],
  255. ContentCount: u[7],
  256. FollowUps: u[8],
  257. }
  258. d.CacheUserDmg(context.Background(), userDmg)
  259. }
  260. //HandlerUserBbqDmg ..
  261. func (d *Dao) HandlerUserBbqDmg(user string) {
  262. u := strings.Split(user, ",")
  263. userBbqDmg := &model.UserBbqDmg{
  264. MID: u[0],
  265. Tag2: strings.Split(u[1], "\u0002"),
  266. Tag3: strings.Split(u[2], "\u0002"),
  267. Up: strings.Split(u[3], "\u0002"),
  268. }
  269. d.CacheUserBbqDmg(context.Background(), userBbqDmg)
  270. }
  271. //HandlerUserBbqDmgBuvid ..
  272. func (d *Dao) HandlerUserBbqDmgBuvid(user string) {
  273. u := strings.Split(user, ",")
  274. UserBbqBuvidDmg := &model.UserBbqBuvidDmg{
  275. Buvid: u[0],
  276. Tag2: strings.Split(u[1], "\u0002"),
  277. Tag3: strings.Split(u[2], "\u0002"),
  278. Up: strings.Split(u[3], "\u0002"),
  279. }
  280. d.CacheUserBbqDmgBuvid(context.Background(), UserBbqBuvidDmg)
  281. }
  282. // HandlerMids update userbase by mids
  283. func (d *Dao) HandlerMids(mids []int64) {
  284. res, err := d.VideoClient.SyncUserStas(context.Background(), &video.SyncMidsRequset{MIDS: mids})
  285. if err != nil {
  286. log.Error("userbases update failes, mids(%v), err(%v)", mids, err)
  287. return
  288. }
  289. log.Info("userbases update success, affected %v rows", res.Affc)
  290. }
  291. // HandlerMid update userbase by mid
  292. func (d *Dao) HandlerMid(s string) {
  293. mid, _ := strconv.ParseInt(s, 10, 64)
  294. res, err := d.VideoClient.SyncUserSta(context.Background(), &video.SyncMidRequset{MID: mid})
  295. if err != nil {
  296. log.Error("userbase update failes, mid(%v), err(%v)", mid, err)
  297. return
  298. }
  299. if res.Affc == 1 {
  300. log.Info("userbase insert success ,mid(%v)", mid)
  301. } else if res.Affc == 2 {
  302. log.Info("userbase update success , mid(%v)", mid)
  303. }
  304. }
  305. //HandlerViewedVideo 处理看过的视频,保存最近看过的100个
  306. func (d *Dao) HandlerViewedVideo(v string) (res map[int64]string) {
  307. res = make(map[int64]string)
  308. var vv [][]interface{}
  309. var dd string
  310. err := json.Unmarshal([]byte(v), &vv)
  311. if err != nil {
  312. return
  313. }
  314. l := len(vv)
  315. n := 1
  316. for i := l - 1; i >= 0; i-- {
  317. for _, a := range vv[i] {
  318. switch b := a.(type) {
  319. case string:
  320. dd = b
  321. case []interface{}:
  322. ll := len(b)
  323. for j := ll - 1; j >= 0; j-- {
  324. switch c := b[j].(type) {
  325. case float64:
  326. k := int64(c)
  327. if _, ok := res[k]; !ok {
  328. res[k] = dd
  329. n++
  330. }
  331. }
  332. if n > 100 {
  333. return
  334. }
  335. }
  336. }
  337. }
  338. }
  339. return
  340. }
  341. // SelMidFromVideo get distinct mid list from table video
  342. func (d *Dao) SelMidFromVideo() (mids []int64, err error) {
  343. rows, err := d.db.Query(context.Background(), _selMidFromVideo)
  344. if err != nil {
  345. log.Error("SelMidFromVideo failed, err(%v)", err)
  346. return
  347. }
  348. defer rows.Close()
  349. for rows.Next() {
  350. var s string
  351. if err = rows.Scan(&s); err != nil {
  352. panic(err.Error())
  353. }
  354. var mid int64
  355. if mid, err = strconv.ParseInt(s, 10, 64); err != nil {
  356. log.Error("strconv.ParseInt(%s) error(%v)", s, err)
  357. return
  358. }
  359. mids = append(mids, mid)
  360. }
  361. return
  362. }
  363. //MergeUpInfo merge up info
  364. func (d *Dao) MergeUpInfo(mid int64) (err error) {
  365. var (
  366. ctx = context.Background()
  367. params = url.Values{}
  368. req = &http.Request{}
  369. id int64
  370. res struct {
  371. Code int
  372. Data model.UpUserInfoRes
  373. }
  374. )
  375. err = d.db.QueryRow(ctx, "select mid from user_base where mid = ?", mid).Scan(&id)
  376. if err == nil {
  377. log.Infow(ctx, "log", "already has mid in user_base", "mid", mid)
  378. return
  379. }
  380. if err == sql.ErrNoRows {
  381. params.Set("mid", strconv.FormatInt(mid, 10))
  382. req, err = d.HTTPClient.NewRequest("GET", d.c.URLs["account"], "", params)
  383. if err != nil {
  384. log.Error("MergeUpInfo error(%v)", err)
  385. return
  386. }
  387. if err = d.HTTPClient.Do(ctx, req, &res); err != nil {
  388. log.Error("MergeUpInfo http req failed ,err:%v", err)
  389. return
  390. }
  391. res := res.Data
  392. var sex int
  393. switch res.Sex {
  394. case "男":
  395. sex = 1
  396. case "女":
  397. sex = 2
  398. default:
  399. sex = 3
  400. }
  401. _, err = d.db.Exec(ctx,
  402. "insert into user_base (mid,uname,face,sex,user_type,complete_degree)values(?,?,?,?,?,?)",
  403. res.MID,
  404. res.Name,
  405. res.Face,
  406. sex,
  407. model.UserTypeUp,
  408. 0)
  409. if err != nil {
  410. log.Error("MergeUpInfo insert upinfo failed,err:%v", err)
  411. return
  412. }
  413. } else {
  414. log.Error("MergeUpInfo query sql failed,err:%v", err)
  415. }
  416. if err = d.db.QueryRow(ctx, "select id from user_statistics where mid = ?", mid).Scan(&id); err != nil {
  417. if err == sql.ErrNoRows {
  418. if _, err = d.db.Exec(ctx, "insert into user_statistics (mid) values (?)", mid); err != nil {
  419. log.Error("init insert user_statistics failed,err:%v", err)
  420. }
  421. } else {
  422. log.Error("init query user_statistics failed,err:%v", err)
  423. }
  424. }
  425. return
  426. }
  427. //UsersByLast 使用lastid批量获取用户
  428. func (d *Dao) UsersByLast(c context.Context, lastid int64) (r []*model.UserBaseDB, err error) {
  429. var rows *xsql.Rows
  430. rows, err = d.db.Query(c, _queryUsersByLast, lastid, _limitSize)
  431. if err != nil {
  432. log.Error("db _queryVideos err(%v)", err)
  433. return
  434. }
  435. for rows.Next() {
  436. u := new(model.UserBaseDB)
  437. if err = rows.Scan(&u.ID, &u.MID, &u.Uname); err != nil {
  438. log.Error("scan err(%v)", err)
  439. continue
  440. }
  441. r = append(r, u)
  442. }
  443. return
  444. }
  445. // SelMidFromUserBase get distinct mid list from table user_base
  446. func (d *Dao) SelMidFromUserBase(start int) (mids []int64, err error) {
  447. var mid int64
  448. rows, err := d.db.Query(context.Background(), _selMidFromUserBase, start)
  449. if err != nil {
  450. log.Error("SelMidFromUserBase failed, err(%v)", err)
  451. return
  452. }
  453. defer rows.Close()
  454. for rows.Next() {
  455. var s string
  456. if err = rows.Scan(&s); err != nil {
  457. panic(err.Error())
  458. }
  459. if mid, err = strconv.ParseInt(s, 10, 64); err != nil {
  460. log.Error("strconv.ParseInt(%s) error(%v)", s, err)
  461. return
  462. }
  463. mids = append(mids, mid)
  464. }
  465. return
  466. }
  467. // UpUserBases 根据mids更新用户基本信息
  468. func (d *Dao) UpUserBases(c context.Context, mids []int64) (err error) {
  469. var (
  470. tx *xsql.Tx
  471. )
  472. midsReq := &acc.MidsReq{
  473. Mids: mids,
  474. RealIp: metadata.String(c, metadata.RemoteIP)}
  475. infosReply, err := d.AccountClient.Infos3(c, midsReq)
  476. if infosReply == nil {
  477. log.Error("查询infos3失败,err(%v)", err)
  478. fmt.Printf("查询infos3失败,err(%v)", err)
  479. return
  480. }
  481. if tx, err = d.BeginTran(c); err != nil {
  482. log.Error("begin transaction error(%v)", err)
  483. return
  484. }
  485. for _, info := range infosReply.Infos {
  486. if info.Mid != 0 {
  487. if len(info.Face) > 255 {
  488. info.Face = "http://i0.hdslb.com/bfs/bbq/video-image/userface/1558868601542006937.png"
  489. }
  490. for try := 0; try < 3; try++ {
  491. if _, err = tx.Exec(_upUserBase, info.Face, info.Mid); err == nil {
  492. break
  493. }
  494. }
  495. if err != nil {
  496. log.Error("mid(%v) update failed", info.Mid)
  497. }
  498. }
  499. }
  500. if err = tx.Commit(); err != nil {
  501. log.Error("UpUserBases commit failed err(%v)", err)
  502. }
  503. return
  504. }