hbase.go 13 KB


  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "io"
  8. "math"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "go-common/app/job/main/figure-timer/model"
  13. "go-common/library/log"
  14. "github.com/pkg/errors"
  15. "github.com/tsuna/gohbase/hrpc"
  16. )
  17. var (
  18. // record
  19. _hbaseRecordTable = "ugc:figurecalcrecord"
  20. _hbaseRecordFC = "x"
  21. _hbaseRecordQLawfulPosX = "pos_lawful"
  22. _hbaseRecordQLawfulNegX = "neg_lawful"
  23. _hbaseRecordQWidePosX = "pos_wide"
  24. _hbaseRecordQWideNegX = "neg_wide"
  25. _hbaseRecordQFriendlyPosX = "pos_friendly"
  26. _hbaseRecordQFriendlyNegX = "neg_friendly"
  27. _hbaseRecordQCreativityPosX = "pos_creativity"
  28. _hbaseRecordQCreativityNegX = "neg_creativity"
  29. _hbaseRecordQBountyPosX = "pos_bounty"
  30. _hbaseRecordQBountyNegX = "neg_bounty"
  31. // UserInfo
  32. _hbaseUserTable = "ugc:figureuserstatus"
  33. _hbaseUserFC = "user"
  34. _hbaseUserQExp = "exp"
  35. _hbaseUserQSpy = "spy_score"
  36. _hbaseUserQArchiveViews = "archive_views"
  37. _hbaseUserQVip = "vip_status"
  38. _hbaseUserQDisciplineCommittee = "discipline_committee"
  39. // ActionCounter
  40. _hbaseActionTable = "ugc:figureactioncounter"
  41. _hbaseActionFC = "user"
  42. _hbaseActionQCoinCount = "coins"
  43. _hbaseActionQReplyCount = "replies"
  44. _hbaseActionQDanmakuCount = "danmaku"
  45. _hbaseActionQCoinLowRisk = "coin_low_risk"
  46. _hbaseActionQCoinHighRisk = "coin_high_risk"
  47. _hbaseActionQReplyLowRisk = "reply_low_risk"
  48. _hbaseActionQReplyHighRisk = "reply_high_risk"
  49. _hbaseActionQReplyLiked = "reply_liked"
  50. _hbaseActionQReplyUnLiked = "reply_hate"
  51. _hbaseActionQReportReplyPassed = "report_reply_passed"
  52. _hbaseActionQReportDanmakuPassed = "report_danmaku_passed"
  53. _hbaseActionQPublishReplyDeleted = "publish_reply_deleted"
  54. _hbaseActionQPublishDanmakuDeleted = "publish_danmaku_deleted"
  55. _hbaseActionQPayMoney = "pay_money"
  56. _hbaseActionQPayLiveMoney = "pay_live_money"
  57. )
  58. func rowKeyFigureRecord(mid int64, weekTS int64) (key string) {
  59. return fmt.Sprintf("%d_%d", mid, weekTS)
  60. }
  61. func rowKeyUserInfo(mid int64) (key string) {
  62. return fmt.Sprintf("%d", mid)
  63. }
  64. func rowKeyActionCounter(mid int64, dayTS int64) (key string) {
  65. return fmt.Sprintf("%d_%d", mid, dayTS)
  66. }
  67. // UserInfo get it from hbase
  68. func (d *Dao) UserInfo(c context.Context, mid int64, weekVer int64) (userInfo *model.UserInfo, err error) {
  69. var (
  70. result *hrpc.Result
  71. key = rowKeyUserInfo(mid)
  72. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.Hbase.ReadTimeout))
  73. )
  74. defer cancel()
  75. if result, err = d.hbase.GetStr(ctx, _hbaseUserTable, key); err != nil {
  76. err = errors.Wrapf(err, "hbase.GetStr(%s,%s)", _hbaseUserTable, key)
  77. return
  78. }
  79. userInfo = &model.UserInfo{Mid: mid, SpyScore: math.MaxUint64}
  80. for _, c := range result.Cells {
  81. if c == nil {
  82. continue
  83. }
  84. if bytes.Equal([]byte(_hbaseUserFC), c.Family) {
  85. switch string(c.Qualifier) {
  86. case _hbaseUserQExp:
  87. userInfo.Exp = binary.BigEndian.Uint64(c.Value)
  88. case _hbaseUserQSpy:
  89. userInfo.SpyScore = binary.BigEndian.Uint64(c.Value)
  90. case _hbaseUserQArchiveViews:
  91. userInfo.ArchiveViews = binary.BigEndian.Uint64(c.Value)
  92. case _hbaseUserQVip:
  93. userInfo.VIPStatus = binary.BigEndian.Uint64(c.Value)
  94. case _hbaseUserQDisciplineCommittee:
  95. userInfo.DisciplineCommittee = binary.BigEndian.Uint16(c.Value)
  96. }
  97. }
  98. }
  99. // 未获取到spy分值特殊处理
  100. if userInfo.SpyScore == math.MaxUint64 {
  101. userInfo.SpyScore = 100
  102. }
  103. log.Info("User Info [%+v]", userInfo)
  104. return
  105. }
  106. // PutCalcRecord put record all the params x.
  107. func (d *Dao) PutCalcRecord(c context.Context, record *model.FigureRecord, weekTS int64) (err error) {
  108. var (
  109. key = rowKeyFigureRecord(record.Mid, weekTS)
  110. lawfulPosX = make([]byte, 8)
  111. lawfulNegX = make([]byte, 8)
  112. widePosX = make([]byte, 8)
  113. wideNegX = make([]byte, 8)
  114. friendlyPosX = make([]byte, 8)
  115. friendlyNegX = make([]byte, 8)
  116. creativityPosX = make([]byte, 8)
  117. creativityNegX = make([]byte, 8)
  118. bountyPosX = make([]byte, 8)
  119. bountyNegX = make([]byte, 8)
  120. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.Hbase.WriteTimeout))
  121. )
  122. defer cancel()
  123. binary.BigEndian.PutUint64(lawfulPosX, uint64(record.XPosLawful))
  124. binary.BigEndian.PutUint64(lawfulNegX, uint64(record.XNegLawful))
  125. binary.BigEndian.PutUint64(widePosX, uint64(record.XPosWide))
  126. binary.BigEndian.PutUint64(wideNegX, uint64(record.XNegWide))
  127. binary.BigEndian.PutUint64(friendlyPosX, uint64(record.XPosFriendly))
  128. binary.BigEndian.PutUint64(friendlyNegX, uint64(record.XNegFriendly))
  129. binary.BigEndian.PutUint64(creativityPosX, uint64(record.XPosCreativity))
  130. binary.BigEndian.PutUint64(creativityNegX, uint64(record.XNegCreativity))
  131. binary.BigEndian.PutUint64(bountyPosX, uint64(record.XPosBounty))
  132. binary.BigEndian.PutUint64(bountyNegX, uint64(record.XNegBounty))
  133. values := map[string]map[string][]byte{_hbaseRecordFC: {
  134. _hbaseRecordQLawfulPosX: lawfulPosX,
  135. _hbaseRecordQLawfulNegX: lawfulNegX,
  136. _hbaseRecordQWideNegX: wideNegX,
  137. _hbaseRecordQFriendlyPosX: friendlyPosX,
  138. _hbaseRecordQFriendlyNegX: friendlyNegX,
  139. _hbaseRecordQCreativityPosX: creativityPosX,
  140. _hbaseRecordQCreativityNegX: creativityNegX,
  141. _hbaseRecordQBountyPosX: bountyPosX,
  142. _hbaseRecordQBountyNegX: bountyNegX,
  143. }}
  144. if _, err = d.hbase.PutStr(ctx, _hbaseRecordTable, key, values); err != nil {
  145. err = errors.Wrapf(err, "hbase.Put(%s,%s,%+v) error(%v)", _hbaseRecordTable, key, values, err)
  146. }
  147. return
  148. }
  149. // CalcRecords get it from hbase
  150. func (d *Dao) CalcRecords(c context.Context, mid int64, weekTSFrom, weekTSTo int64) (figureRecords []*model.FigureRecord, err error) {
  151. var (
  152. scanner hrpc.Scanner
  153. keyFrom = rowKeyFigureRecord(mid, weekTSFrom)
  154. keyTo = rowKeyFigureRecord(mid, weekTSTo)
  155. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.Hbase.ReadTimeout))
  156. )
  157. defer cancel()
  158. if scanner, err = d.hbase.ScanRangeStr(ctx, _hbaseRecordTable, keyFrom, keyTo); err != nil {
  159. err = errors.Wrapf(err, "hbase.ScanRangeStr(%s,%s,%s)", _hbaseRecordTable, keyFrom, keyTo)
  160. return
  161. }
  162. for {
  163. res, err := scanner.Next()
  164. if err != nil {
  165. if err != io.EOF {
  166. err = errors.WithStack(err)
  167. return nil, err
  168. }
  169. break
  170. }
  171. figureRecord := &model.FigureRecord{Mid: mid}
  172. for i, c := range res.Cells {
  173. if c == nil {
  174. continue
  175. }
  176. if bytes.Equal([]byte(_hbaseRecordFC), c.Family) {
  177. switch string(c.Qualifier) {
  178. case _hbaseRecordQLawfulPosX:
  179. figureRecord.XPosLawful = int64(binary.BigEndian.Uint64(c.Value))
  180. case _hbaseRecordQLawfulNegX:
  181. figureRecord.XNegLawful = int64(binary.BigEndian.Uint64(c.Value))
  182. case _hbaseRecordQWidePosX:
  183. figureRecord.XPosWide = int64(binary.BigEndian.Uint64(c.Value))
  184. case _hbaseRecordQWideNegX:
  185. figureRecord.XNegWide = int64(binary.BigEndian.Uint64(c.Value))
  186. case _hbaseRecordQFriendlyPosX:
  187. figureRecord.XPosFriendly = int64(binary.BigEndian.Uint64(c.Value))
  188. case _hbaseRecordQFriendlyNegX:
  189. figureRecord.XNegFriendly = int64(binary.BigEndian.Uint64(c.Value))
  190. case _hbaseRecordQCreativityPosX:
  191. figureRecord.XPosCreativity = int64(binary.BigEndian.Uint64(c.Value))
  192. case _hbaseRecordQCreativityNegX:
  193. figureRecord.XNegCreativity = int64(binary.BigEndian.Uint64(c.Value))
  194. case _hbaseRecordQBountyPosX:
  195. figureRecord.XPosBounty = int64(binary.BigEndian.Uint64(c.Value))
  196. case _hbaseRecordQBountyNegX:
  197. figureRecord.XNegBounty = int64(binary.BigEndian.Uint64(c.Value))
  198. }
  199. }
  200. if i == 0 {
  201. figureRecord.Version = time.Unix(parseVersion(string(c.Row), c.Timestamp), 0)
  202. }
  203. }
  204. log.Info("User figure record [%+v]", figureRecord)
  205. figureRecords = append(figureRecords, figureRecord)
  206. }
  207. return
  208. }
  209. // ActionCounter .
  210. func (d *Dao) ActionCounter(c context.Context, mid int64, ts int64) (actionCounter *model.ActionCounter, err error) {
  211. var (
  212. result *hrpc.Result
  213. key = rowKeyActionCounter(mid, ts)
  214. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.Hbase.ReadTimeout))
  215. )
  216. defer cancel()
  217. log.Info("Get mid [%d] key [%s]", mid, key)
  218. if result, err = d.hbase.GetStr(ctx, _hbaseActionTable, key); err != nil {
  219. err = errors.Wrapf(err, "hbase.GetStr(%s,%s)", _hbaseActionTable, key)
  220. return
  221. }
  222. actionCounter = &model.ActionCounter{Mid: mid}
  223. for i, c := range result.Cells {
  224. if c == nil {
  225. continue
  226. }
  227. if bytes.Equal([]byte(_hbaseActionFC), c.Family) {
  228. switch string(c.Qualifier) {
  229. case _hbaseActionQCoinCount:
  230. actionCounter.CoinCount = binary.BigEndian.Uint64(c.Value)
  231. case _hbaseActionQReplyCount:
  232. actionCounter.ReplyCount = int64(binary.BigEndian.Uint64(c.Value))
  233. case _hbaseActionQDanmakuCount:
  234. actionCounter.DanmakuCount = int64(binary.BigEndian.Uint64(c.Value))
  235. case _hbaseActionQCoinLowRisk:
  236. actionCounter.CoinLowRisk = binary.BigEndian.Uint64(c.Value)
  237. case _hbaseActionQCoinHighRisk:
  238. actionCounter.CoinHighRisk = binary.BigEndian.Uint64(c.Value)
  239. case _hbaseActionQReplyLowRisk:
  240. actionCounter.ReplyLowRisk = binary.BigEndian.Uint64(c.Value)
  241. case _hbaseActionQReplyHighRisk:
  242. actionCounter.ReplyHighRisk = binary.BigEndian.Uint64(c.Value)
  243. case _hbaseActionQReplyLiked:
  244. actionCounter.ReplyLiked = int64(binary.BigEndian.Uint64(c.Value))
  245. case _hbaseActionQReplyUnLiked:
  246. actionCounter.ReplyUnliked = int64(binary.BigEndian.Uint64(c.Value))
  247. case _hbaseActionQReportReplyPassed:
  248. actionCounter.ReportReplyPassed = int64(binary.BigEndian.Uint64(c.Value))
  249. case _hbaseActionQReportDanmakuPassed:
  250. actionCounter.ReportDanmakuPassed = int64(binary.BigEndian.Uint64(c.Value))
  251. case _hbaseActionQPublishReplyDeleted:
  252. actionCounter.PublishReplyDeleted = int64(binary.BigEndian.Uint64(c.Value))
  253. case _hbaseActionQPublishDanmakuDeleted:
  254. actionCounter.PublishDanmakuDeleted = int64(binary.BigEndian.Uint64(c.Value))
  255. case _hbaseActionQPayMoney:
  256. actionCounter.PayMoney = int64(binary.BigEndian.Uint64(c.Value))
  257. case _hbaseActionQPayLiveMoney:
  258. actionCounter.PayLiveMoney = int64(binary.BigEndian.Uint64(c.Value))
  259. }
  260. }
  261. if i == 0 {
  262. actionCounter.Version = time.Unix(parseVersion(string(c.Row), c.Timestamp), 0)
  263. }
  264. }
  265. log.Info("User action counter [%+v]", actionCounter)
  266. return
  267. }
  268. // // ActionCounters .
  269. // func (d *Dao) ActionCounters(c context.Context, mid int64, tsfrom int64, tsto int64) (actionCounters []*model.ActionCounter, err error) {
  270. // var (
  271. // scan *hrpc.Scan
  272. // results []*hrpc.Result
  273. // keyFrom = rowKeyActionCounter(mid, tsfrom)
  274. // keyTo = rowKeyActionCounter(mid, tsto)
  275. // ctx, cancel = context.WithTimeout(c, time.Duration(d.c.Hbase.ReadTimeout))
  276. // )
  277. // defer cancel()
  278. // log.Info("Scan mid [%d] keyFrom [%s] keyTo [%s]", mid, keyFrom, keyTo)
  279. // if scan, err = hrpc.NewScanRangeStr(ctx, _hbaseActionTable, keyFrom, keyTo); err != nil {
  280. // err = errors.Wrapf(err, "hrcp.NewScanRangeStr(%s,%s,%s)", _hbaseActionTable, keyFrom, keyTo)
  281. // return
  282. // }
  283. // if results, err = d.hbase.Scan(ctx, scan); err != nil {
  284. // err = errors.Wrapf(err, "hbase.Scan(%s,%s,%s)", _hbaseActionTable, keyFrom, keyTo)
  285. // return
  286. // }
  287. // if len(results) == 0 {
  288. // return
  289. // }
  290. // for _, res := range results {
  291. // if res == nil {
  292. // continue
  293. // }
  294. // actionCounter := &model.ActionCounter{Mid: mid}
  295. // for i, c := range res.Cells {
  296. // if c == nil {
  297. // continue
  298. // }
  299. // if bytes.Equal([]byte(_hbaseActionFC), c.Family) {
  300. // switch string(c.Qualifier) {
  301. // case _hbaseActionQCoinCount:
  302. // actionCounter.CoinCount = binary.BigEndian.Uint64(c.Value)
  303. // case _hbaseActionQReplyCount:
  304. // actionCounter.ReplyCount = int64(binary.BigEndian.Uint64(c.Value))
  305. // case _hbaseActionQCoinLowRisk:
  306. // actionCounter.CoinLowRisk = binary.BigEndian.Uint64(c.Value)
  307. // case _hbaseActionQCoinHighRisk:
  308. // actionCounter.CoinHighRisk = binary.BigEndian.Uint64(c.Value)
  309. // case _hbaseActionQReplyLowRisk:
  310. // actionCounter.ReplyLowRisk = binary.BigEndian.Uint64(c.Value)
  311. // case _hbaseActionQReplyHighRisk:
  312. // actionCounter.ReplyHighRisk = binary.BigEndian.Uint64(c.Value)
  313. // case _hbaseActionQReplyLiked:
  314. // actionCounter.ReplyLiked = int64(binary.BigEndian.Uint64(c.Value))
  315. // case _hbaseActionQReplyUnLiked:
  316. // actionCounter.ReplyUnliked = int64(binary.BigEndian.Uint64(c.Value))
  317. // }
  318. // }
  319. // if i == 0 {
  320. // actionCounter.Version = time.Unix(parseVersion(string(c.Row), c.Timestamp), 0)
  321. // }
  322. // }
  323. // log.Info("User action counter [%+v]", actionCounter)
  324. // actionCounters = append(actionCounters, actionCounter)
  325. // }
  326. // return
  327. // }
  328. func parseVersion(rowKey string, timestamp *uint64) (ts int64) {
  329. strs := strings.Split(rowKey, "_")
  330. if len(strs) < 2 {
  331. return int64(*timestamp)
  332. }
  333. var err error
  334. if ts, err = strconv.ParseInt(strs[1], 10, 64); err != nil {
  335. return int64(*timestamp)
  336. }
  337. return ts
  338. }