hbase.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/tsuna/gohbase/hrpc"
  9. "go-common/app/interface/live/push-live/model"
  10. "go-common/library/log"
  11. "go-common/library/sync/errgroup"
  12. "sync"
  13. )
  14. const _hbaseShard = 200
  15. var (
  16. hbaseTable = "ugc:PushArchive"
  17. hbaseFamily = "relation"
  18. hbaseFamilyB = []byte(hbaseFamily)
  19. )
  20. // Fans gets the upper's fans.
  21. func (d *Dao) Fans(c context.Context, upper int64, types int) (fans map[int64]bool, fansSP map[int64]bool, err error) {
  22. var mutex sync.Mutex
  23. fans = make(map[int64]bool)
  24. fansSP = make(map[int64]bool)
  25. group := errgroup.Group{}
  26. for i := 0; i < _hbaseShard; i++ {
  27. shard := int64(i)
  28. group.Go(func() (e error) {
  29. key := _rowKey(upper, shard)
  30. relations, e := d.fansByKey(context.TODO(), key)
  31. if e != nil {
  32. return
  33. }
  34. mutex.Lock()
  35. for fansID, fansType := range relations {
  36. switch types {
  37. // 返回普通关注
  38. case model.RelationAttention:
  39. if fansType == types {
  40. fans[fansID] = true
  41. }
  42. // 返回特别关注
  43. case model.RelationSpecial:
  44. if fansType == types {
  45. fansSP[fansID] = true
  46. }
  47. // 同时返回普通关注与特别关注
  48. case model.RelationAll:
  49. if fansType == model.RelationSpecial {
  50. fansSP[fansID] = true
  51. } else if fansType == model.RelationAttention {
  52. fans[fansID] = true
  53. }
  54. default:
  55. return
  56. }
  57. }
  58. mutex.Unlock()
  59. return
  60. })
  61. }
  62. group.Wait()
  63. return
  64. }
  65. // SeparateFans Separate the upper's fans by 1 or 2.
  66. func (d *Dao) SeparateFans(c context.Context, upper int64, fansIn map[int64]bool) (fans map[int64]bool, fansSP map[int64]bool, err error) {
  67. var mutex sync.Mutex
  68. special := make(map[int64]bool)
  69. fans = make(map[int64]bool)
  70. fansSP = make(map[int64]bool)
  71. group := errgroup.Group{}
  72. for i := 0; i < _hbaseShard; i++ {
  73. shard := int64(i)
  74. group.Go(func() (e error) {
  75. key := _rowKey(upper, shard)
  76. relations, e := d.fansByKey(context.TODO(), key)
  77. if e != nil {
  78. return
  79. }
  80. mutex.Lock()
  81. // 获取所有特别关注
  82. for fansID, fansType := range relations {
  83. if fansType == model.RelationSpecial {
  84. special[fansID] = true
  85. }
  86. }
  87. mutex.Unlock()
  88. return
  89. })
  90. }
  91. group.Wait()
  92. for id := range fansIn {
  93. if _, ok := special[id]; ok {
  94. // 特别关注
  95. fansSP[id] = true
  96. } else {
  97. // 不是特别关注就是普通关注
  98. fans[id] = true
  99. }
  100. }
  101. return
  102. }
  103. func _rowKey(upper, fans int64) string {
  104. k := fmt.Sprintf("%d_%d", upper, fans%_hbaseShard)
  105. key := fmt.Sprintf("%x", md5.Sum([]byte(k)))
  106. return key
  107. }
  108. func (d *Dao) fansByKey(c context.Context, key string) (relations map[int64]int, err error) {
  109. relations = make(map[int64]int)
  110. var (
  111. query *hrpc.Get
  112. result *hrpc.Result
  113. ctx, cancel = context.WithTimeout(c, d.relationHBaseReadTimeout)
  114. )
  115. defer cancel()
  116. if result, err = d.relationHBase.GetStr(ctx, hbaseTable, key); err != nil {
  117. log.Error("d.relationHBase.Get error(%v) querytable(%v)", err, string(query.Table()))
  118. // PromError("hbase:Get")
  119. return
  120. } else if result == nil {
  121. return
  122. }
  123. for _, c := range result.Cells {
  124. if c != nil && bytes.Equal(c.Family, hbaseFamilyB) {
  125. if err = json.Unmarshal(c.Value, &relations); err != nil {
  126. log.Error("json.Unmarshal() error(%v)", err)
  127. return
  128. }
  129. break
  130. }
  131. }
  132. return
  133. }