action.go 12 KB


  1. package like
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. l "go-common/app/interface/main/activity/model/like"
  9. "go-common/library/cache/memcache"
  10. "go-common/library/cache/redis"
  11. xsql "go-common/library/database/sql"
  12. "go-common/library/log"
  13. "go-common/library/stat/prom"
  14. "go-common/library/xstr"
  15. "github.com/pkg/errors"
  16. )
  17. // like_action sql and like state
  18. const (
  19. _likeActInfosSQL = "select id,lid from like_action where lid in (%s) and mid = ?"
  20. _likeActAddSQL = "INSERT INTO like_action(lid,mid,action,sid,ipv6,ctime,mtime) VALUES(?,?,?,?,?,?,?)"
  21. _likeActSumSQL = "select sum(action) likes, lid from like_action where sid = ? and lid in (%s) group by lid ORDER BY likes desc"
  22. _storyActSumSQL = "select sum(action) likes from like_action where sid = ? and mid = ? and ctime >= ? and ctime <= ?"
  23. _storyEachActSumSQL = "select sum(action) likes from like_action where sid = ? and mid = ? and lid = ? and ctime >= ? and ctime <= ?"
  24. HasLike = 1
  25. NoLike = -1
  26. //Total number of activities set the old is bilibili-activity:like:%d
  27. _likeActScoreKeyFmt = "go:bl-a:l:%d"
  28. //Total number of comments for different types of manuscripts
  29. _likeActScoreTyoeKeyFmt = "go:bl:a:l:%d:%d"
  30. //liked key the old is bilibili-activity:like:%d:%d:%d
  31. _likeActKeyFmt = "go:bl-act:l:%d:%d:%d"
  32. //Total number of like the old is likes:oid:%d
  33. _likeLidKeyFmt = "go:ls:oid:%d"
  34. //Total number of activities like the old is sb:likes:count:%d
  35. _likeCountKeyFmt = "go:sb:ls:count:%d"
  36. )
  37. // likeActScoreKey .
  38. func likeActScoreKey(sid int64) string {
  39. return fmt.Sprintf(_likeActScoreKeyFmt, sid)
  40. }
  41. // likeActScoreTypeKey .
  42. func likeActScoreTypeKey(sid int64, ltype int) string {
  43. return fmt.Sprintf(_likeActScoreTyoeKeyFmt, ltype, sid)
  44. }
  45. func likeActKey(sid, lid, mid int64) string {
  46. return fmt.Sprintf(_likeActKeyFmt, sid, lid, mid)
  47. }
  48. // likeLidKey .
  49. func likeLidKey(oid int64) string {
  50. return fmt.Sprintf(_likeLidKeyFmt, oid)
  51. }
  52. // likeCountKey .
  53. func likeCountKey(sid int64) string {
  54. return fmt.Sprintf(_likeCountKeyFmt, sid)
  55. }
  56. // LikeActInfos get likesaction logs.
  57. func (dao *Dao) LikeActInfos(c context.Context, lids []int64, mid int64) (likeActs map[int64]*l.Action, err error) {
  58. var rows *xsql.Rows
  59. if rows, err = dao.db.Query(c, fmt.Sprintf(_likeActInfosSQL, xstr.JoinInts(lids)), mid); err != nil {
  60. if err == xsql.ErrNoRows {
  61. err = nil
  62. } else {
  63. err = errors.Wrapf(err, "LikeActInfos:Query(%s)", _likeActInfosSQL)
  64. return
  65. }
  66. }
  67. defer rows.Close()
  68. likeActs = make(map[int64]*l.Action, len(lids))
  69. for rows.Next() {
  70. a := &l.Action{}
  71. if err = rows.Scan(&a.ID, &a.Lid); err != nil {
  72. err = errors.Wrap(err, "LikeActInfos:scan()")
  73. return
  74. }
  75. likeActs[a.Lid] = a
  76. }
  77. if err = rows.Err(); err != nil {
  78. err = errors.Wrap(err, "LikeActInfos:rows.Err()")
  79. }
  80. return
  81. }
  82. // LikeActSums get like_action likes sum data .
  83. func (dao *Dao) LikeActSums(c context.Context, sid int64, lids []int64) (res []*l.LidLikeSum, err error) {
  84. var rows *xsql.Rows
  85. if rows, err = dao.db.Query(c, fmt.Sprintf(_likeActSumSQL, xstr.JoinInts(lids)), sid); err != nil {
  86. if err == xsql.ErrNoRows {
  87. err = nil
  88. } else {
  89. err = errors.Wrapf(err, "LikeActSums:Query(%s)", _likeActSumSQL)
  90. return
  91. }
  92. }
  93. defer rows.Close()
  94. res = make([]*l.LidLikeSum, 0, len(lids))
  95. for rows.Next() {
  96. a := &l.LidLikeSum{}
  97. if err = rows.Scan(&a.Likes, &a.Lid); err != nil {
  98. err = errors.Wrapf(err, "LikeActSums:Scan(%s)", _likeActSumSQL)
  99. return
  100. }
  101. res = append(res, a)
  102. }
  103. if err = rows.Err(); err != nil {
  104. err = errors.Wrapf(err, "LikeActSums:rows.Err(%s)", _likeActSumSQL)
  105. }
  106. return
  107. }
  108. // StoryLikeActSum .
  109. func (dao *Dao) StoryLikeActSum(c context.Context, sid, mid int64, stime, etime string) (res int64, err error) {
  110. var tt sql.NullInt64
  111. row := dao.db.QueryRow(c, _storyActSumSQL, sid, mid, stime, etime)
  112. if err = row.Scan(&tt); err != nil {
  113. if err == sql.ErrNoRows {
  114. err = nil
  115. } else {
  116. err = errors.Wrap(err, "row.Scan()")
  117. }
  118. }
  119. res = tt.Int64
  120. return
  121. }
  122. // StoryEachLikeAct .
  123. func (dao *Dao) StoryEachLikeAct(c context.Context, sid, mid, lid int64, stime, etime string) (res int64, err error) {
  124. var tt sql.NullInt64
  125. row := dao.db.QueryRow(c, _storyEachActSumSQL, sid, mid, lid, stime, etime)
  126. if err = row.Scan(&tt); err != nil {
  127. if err == sql.ErrNoRows {
  128. err = nil
  129. } else {
  130. err = errors.Wrap(err, "row.Scan()")
  131. }
  132. }
  133. res = tt.Int64
  134. return
  135. }
  136. // SetRedisCache .
  137. func (dao *Dao) SetRedisCache(c context.Context, sid, lid, score int64, likeType int) (err error) {
  138. var (
  139. conn = dao.redis.Get(c)
  140. key = likeActScoreKey(sid)
  141. lidKey = likeLidKey(lid)
  142. lidCountKey = likeCountKey(sid)
  143. max = 3
  144. )
  145. defer conn.Close()
  146. if err = conn.Send("ZINCRBY", key, score, lid); err != nil {
  147. err = errors.Wrap(err, "conn.Send(ZINCRBY) likeActScoreKey")
  148. return
  149. }
  150. if err = conn.Send("INCRBY", lidKey, score); err != nil {
  151. err = errors.Wrap(err, "conn.Send(INCR) likeLidKey")
  152. return
  153. }
  154. if likeType != 0 {
  155. max++
  156. typeKey := likeActScoreTypeKey(sid, likeType)
  157. if err = conn.Send("ZINCRBY", typeKey, score, lid); err != nil {
  158. err = errors.Wrap(err, "conn.Send(ZINCRBY) likeActScoreTypeKey")
  159. return
  160. }
  161. }
  162. if err = conn.Send("INCRBY", lidCountKey, score); err != nil {
  163. err = errors.Wrap(err, "conn.Send(INCR) likeLidKey")
  164. return
  165. }
  166. if err = conn.Flush(); err != nil {
  167. err = errors.Wrap(err, " conn.Set()")
  168. return
  169. }
  170. for i := 0; i < max; i++ {
  171. if _, err = conn.Receive(); err != nil {
  172. err = errors.Wrap(err, fmt.Sprintf("conn.Receive()%d", i+1))
  173. return
  174. }
  175. }
  176. return
  177. }
  178. // RedisCache get cache order by like .
  179. func (dao *Dao) RedisCache(c context.Context, sid int64, start, end int) (res []*l.LidLikeRes, err error) {
  180. var (
  181. conn = dao.redis.Get(c)
  182. key = likeActScoreKey(sid)
  183. )
  184. defer conn.Close()
  185. values, err := redis.Values(conn.Do("ZREVRANGE", key, start, end, "WITHSCORES"))
  186. if err != nil {
  187. err = errors.Wrap(err, "conn.Do(ZREVRANGE)")
  188. return
  189. }
  190. if len(values) == 0 {
  191. return
  192. }
  193. res = make([]*l.LidLikeRes, 0, len(values))
  194. for len(values) > 0 {
  195. t := &l.LidLikeRes{}
  196. if values, err = redis.Scan(values, &t.Lid, &t.Score); err != nil {
  197. err = errors.Wrap(err, "redis.Scan")
  198. return
  199. }
  200. res = append(res, t)
  201. }
  202. return
  203. }
  204. // LikeActZscore .
  205. func (dao *Dao) LikeActZscore(c context.Context, sid, lid int64) (res int64, err error) {
  206. var (
  207. conn = dao.redis.Get(c)
  208. key = likeActScoreKey(sid)
  209. )
  210. defer conn.Close()
  211. if res, err = redis.Int64(conn.Do("ZSCORE", key, lid)); err != nil {
  212. if err == redis.ErrNil {
  213. err = nil
  214. } else {
  215. err = errors.Wrap(err, "conn.Do(ZSCORE)")
  216. }
  217. }
  218. return
  219. }
  220. // SetInitializeLikeCache initialize like_action like data .
  221. func (dao *Dao) SetInitializeLikeCache(c context.Context, sid int64, lidLikeAct map[int64]int64, typeLike map[int64]int) (err error) {
  222. var (
  223. conn = dao.redis.Get(c)
  224. max = 0
  225. key = likeActScoreKey(sid)
  226. args = redis.Args{}.Add(key)
  227. )
  228. defer conn.Close()
  229. for k, val := range lidLikeAct {
  230. args = args.Add(val).Add(k)
  231. if typeLike[k] != 0 {
  232. keyType := likeActScoreTypeKey(sid, typeLike[k])
  233. argsType := redis.Args{}.Add(keyType).Add(val).Add(k)
  234. if err = conn.Send("ZADD", argsType...); err != nil {
  235. log.Error("SetInitializeLikeCache:conn.Send(zadd) args(%v) error(%v)", argsType, err)
  236. return
  237. }
  238. max++
  239. }
  240. }
  241. if err = conn.Send("ZADD", args...); err != nil {
  242. log.Error("SetInitializeLikeCache:conn.Send(zadd) args(%v) error(%v)", args, err)
  243. return
  244. }
  245. max++
  246. if err = conn.Flush(); err != nil {
  247. err = errors.Wrap(err, "SetInitializeLikeCache:conn.Set()")
  248. return
  249. }
  250. for i := 0; i < max; i++ {
  251. if _, err = conn.Receive(); err != nil {
  252. err = errors.Wrapf(err, "SetInitializeLikeCache:conn.Receive()%d", i+1)
  253. }
  254. }
  255. return
  256. }
  257. // LikeActAdd add like_action .
  258. func (dao *Dao) LikeActAdd(c context.Context, likeAct *l.Action) (id int64, err error) {
  259. var res sql.Result
  260. if res, err = dao.db.Exec(c, _likeActAddSQL, likeAct.Lid, likeAct.Mid, likeAct.Action, likeAct.Sid, likeAct.IPv6, time.Now(), time.Now()); err != nil {
  261. err = errors.Wrapf(err, "d.db.Exec(%s)", _likeActAddSQL)
  262. return
  263. }
  264. return res.LastInsertId()
  265. }
  266. // LikeActLidCounts get lid score.
  267. func (dao *Dao) LikeActLidCounts(c context.Context, lids []int64) (res map[int64]int64, err error) {
  268. var (
  269. conn = dao.redis.Get(c)
  270. args = redis.Args{}
  271. ss []int64
  272. )
  273. defer conn.Close()
  274. for _, lid := range lids {
  275. args = args.Add(likeLidKey(lid))
  276. }
  277. if ss, err = redis.Int64s(conn.Do("MGET", args...)); err != nil {
  278. if err == redis.ErrNil {
  279. err = nil
  280. } else {
  281. err = errors.Wrapf(err, "redis.Ints(conn.Do(HMGET,%v)", args)
  282. }
  283. return
  284. }
  285. res = make(map[int64]int64, len(lids))
  286. for key, val := range ss {
  287. res[lids[key]] = val
  288. }
  289. return
  290. }
  291. // LikeActs get data from cache if miss will call source method, then add to cache.
  292. func (dao *Dao) LikeActs(c context.Context, sid, mid int64, lids []int64) (res map[int64]int, err error) {
  293. var (
  294. miss []int64
  295. likeActInfos map[int64]*l.Action
  296. missVal map[int64]int
  297. )
  298. if len(lids) == 0 {
  299. return
  300. }
  301. addCache := true
  302. res, err = dao.CacheLikeActs(c, sid, mid, lids)
  303. if err != nil {
  304. addCache = false
  305. res = nil
  306. err = nil
  307. }
  308. for _, key := range lids {
  309. if (res == nil) || (res[key] == 0) {
  310. miss = append(miss, key)
  311. }
  312. }
  313. prom.CacheHit.Add("LikeActs", int64(len(lids)-len(miss)))
  314. if len(miss) == 0 {
  315. return
  316. }
  317. if likeActInfos, err = dao.LikeActInfos(c, miss, mid); err != nil {
  318. err = errors.Wrapf(err, "dao.LikeActInfos(%v) error(%v)", miss, err)
  319. return
  320. }
  321. if res == nil {
  322. res = make(map[int64]int)
  323. }
  324. missVal = make(map[int64]int, len(miss))
  325. for _, mcLid := range miss {
  326. if _, ok := likeActInfos[mcLid]; ok {
  327. res[mcLid] = HasLike
  328. } else {
  329. res[mcLid] = NoLike
  330. }
  331. missVal[mcLid] = res[mcLid]
  332. }
  333. if !addCache {
  334. return
  335. }
  336. dao.AddCacheLikeActs(c, sid, mid, missVal)
  337. return
  338. }
  339. // CacheLikeActs res value val -1:no like 1:has like 0:no value.
  340. func (dao *Dao) CacheLikeActs(c context.Context, sid, mid int64, lids []int64) (res map[int64]int, err error) {
  341. l := len(lids)
  342. if l == 0 {
  343. return
  344. }
  345. keysMap := make(map[string]int64, l)
  346. keys := make([]string, 0, l)
  347. for _, id := range lids {
  348. key := likeActKey(sid, id, mid)
  349. keysMap[key] = id
  350. keys = append(keys, key)
  351. }
  352. conn := dao.mc.Get(c)
  353. defer conn.Close()
  354. replies, err := conn.GetMulti(keys)
  355. if err != nil {
  356. prom.BusinessErrCount.Incr("mc:CacheLikeActs")
  357. log.Errorv(c, log.KV("CacheLikeActs", fmt.Sprintf("%+v", err)), log.KV("keys", keys))
  358. return
  359. }
  360. for key, reply := range replies {
  361. var v string
  362. err = conn.Scan(reply, &v)
  363. if err != nil {
  364. prom.BusinessErrCount.Incr("mc:CacheLikeActs")
  365. log.Errorv(c, log.KV("CacheLikeActs", fmt.Sprintf("%+v", err)), log.KV("key", key))
  366. return
  367. }
  368. r, err := strconv.ParseInt(v, 10, 64)
  369. if err != nil {
  370. prom.BusinessErrCount.Incr("mc:CacheLikeActs")
  371. log.Errorv(c, log.KV("CacheLikeActs", fmt.Sprintf("%+v", err)), log.KV("key", key))
  372. return res, err
  373. }
  374. if res == nil {
  375. res = make(map[int64]int, len(keys))
  376. }
  377. res[keysMap[key]] = int(r)
  378. }
  379. return
  380. }
  381. // AddCacheLikeActs Set data to mc
  382. func (dao *Dao) AddCacheLikeActs(c context.Context, sid, mid int64, values map[int64]int) (err error) {
  383. if len(values) == 0 {
  384. return
  385. }
  386. conn := dao.mc.Get(c)
  387. defer conn.Close()
  388. for id, val := range values {
  389. key := likeActKey(sid, id, mid)
  390. bs := []byte(strconv.FormatInt(int64(val), 10))
  391. item := &memcache.Item{Key: key, Value: bs, Expiration: dao.mcPerpetualExpire, Flags: memcache.FlagRAW}
  392. if err = conn.Set(item); err != nil {
  393. prom.BusinessErrCount.Incr("mc:AddCacheLikeActs")
  394. log.Errorv(c, log.KV("AddCacheLikeActs", fmt.Sprintf("%+v", err)), log.KV("key", key))
  395. return
  396. }
  397. }
  398. return
  399. }