hbase.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "strconv"
  10. "sync"
  11. "go-common/app/interface/main/push-archive/model"
  12. "go-common/library/log"
  13. "go-common/library/sync/errgroup"
  14. "github.com/tsuna/gohbase/hrpc"
  15. )
  16. const _hbaseShard = 200
  17. var (
  18. hbaseTable = "ugc:PushArchive"
  19. hbaseFamily = "relation"
  20. hbaseFamilyB = []byte(hbaseFamily)
  21. )
  22. func _rowKey(upper, fans int64) string {
  23. k := fmt.Sprintf("%d_%d", upper, fans%_hbaseShard)
  24. key := fmt.Sprintf("%x", md5.Sum([]byte(k)))
  25. return key
  26. }
  27. // Fans gets the upper's fans.
  28. func (d *Dao) Fans(c context.Context, upper int64, isPGC bool) (res map[int64]int, err error) {
  29. var mutex sync.Mutex
  30. res = make(map[int64]int)
  31. group := errgroup.Group{}
  32. for i := 0; i < _hbaseShard; i++ {
  33. shard := int64(i)
  34. group.Go(func() (e error) {
  35. key := _rowKey(upper, shard)
  36. relations, e := d.fansByKey(context.TODO(), key)
  37. if e != nil {
  38. return
  39. }
  40. mutex.Lock()
  41. for fans, tp := range relations {
  42. // pgc稿件,屏蔽非特殊关注粉丝
  43. if isPGC && tp != model.RelationSpecial {
  44. continue
  45. }
  46. res[fans] = tp
  47. }
  48. mutex.Unlock()
  49. return
  50. })
  51. }
  52. group.Wait()
  53. return
  54. }
  55. // AddFans add upper's fans.
  56. func (d *Dao) AddFans(c context.Context, upper, fans int64, tp int) (err error) {
  57. key := _rowKey(upper, fans)
  58. relations, err := d.fansByKey(c, key)
  59. if err != nil {
  60. return
  61. }
  62. relations[fans] = tp
  63. err = d.saveRelation(c, key, upper, relations)
  64. return
  65. }
  66. // DelFans del fans.
  67. func (d *Dao) DelFans(c context.Context, upper, fans int64) (err error) {
  68. key := _rowKey(upper, fans)
  69. relations, err := d.fansByKey(c, key)
  70. if err != nil {
  71. return
  72. }
  73. delete(relations, fans)
  74. err = d.saveRelation(c, key, upper, relations)
  75. return
  76. }
  77. // DelSpecialAttention del special attention.
  78. func (d *Dao) DelSpecialAttention(c context.Context, upper, fans int64) (err error) {
  79. key := _rowKey(upper, fans)
  80. relations, err := d.fansByKey(c, key)
  81. if err != nil {
  82. return
  83. }
  84. if relations[fans] != model.RelationSpecial {
  85. return
  86. }
  87. relations[fans] = model.RelationAttention
  88. err = d.saveRelation(c, key, upper, relations)
  89. return
  90. }
  91. func (d *Dao) fansByKey(c context.Context, key string) (relations map[int64]int, err error) {
  92. var (
  93. result *hrpc.Result
  94. ctx, cancel = context.WithTimeout(c, d.relationHBaseReadTimeout)
  95. )
  96. defer cancel()
  97. relations = make(map[int64]int)
  98. if result, err = d.relationHBase.Get(ctx, []byte(hbaseTable), []byte(key)); err != nil {
  99. log.Error("d.relationHBase.Get error(%v) querytable(%v)", err, hbaseTable)
  100. PromError("hbase:Get")
  101. return
  102. } else if result == nil {
  103. return
  104. }
  105. for _, c := range result.Cells {
  106. if c != nil && bytes.Equal(c.Family, hbaseFamilyB) {
  107. if err = json.Unmarshal(c.Value, &relations); err != nil {
  108. log.Error("json.Unmarshal() error(%v)", err)
  109. return
  110. }
  111. break
  112. }
  113. }
  114. return
  115. }
  116. func (d *Dao) saveRelation(c context.Context, key string, upper int64, relations map[int64]int) (err error) {
  117. var (
  118. column = strconv.FormatInt(upper, 10)
  119. ctx, cancel = context.WithTimeout(c, d.relationHBaseWriteTimeout)
  120. )
  121. defer cancel()
  122. value, err := json.Marshal(relations)
  123. if err != nil {
  124. return
  125. }
  126. values := map[string]map[string][]byte{hbaseFamily: {column: value}}
  127. if _, err = d.relationHBase.PutStr(ctx, hbaseTable, key, values); err != nil {
  128. log.Error("d.relationHBase.PutStr error(%v), table(%s), values(%+v)", err, hbaseTable, values)
  129. PromError("hbase:Put")
  130. }
  131. return
  132. }
  133. // filterFanByUpper 根据fans在hbase存储的up主列表,筛选出upper主在up主列表中的粉丝
  134. func (d *Dao) filterFanByUpper(c context.Context, fan int64, up interface{}, table string, family []string) (included bool, err error) {
  135. var (
  136. res *hrpc.Result
  137. key string
  138. ctx, cancel = context.WithTimeout(c, d.fanHBaseReadTimeout)
  139. )
  140. defer cancel()
  141. upper := up.(int64)
  142. rowKeyMD := md5.Sum([]byte(strconv.FormatInt(fan, 10)))
  143. key = fmt.Sprintf("%x", rowKeyMD)
  144. if res, err = d.fanHBase.Get(ctx, []byte(table), []byte(key)); err != nil {
  145. log.Error("d.fanHBase.Get error(%v) querytable(%v) key(%s), fan(%d), upper(%d)", err, table, key, fan, upper)
  146. PromError("hbase:Get")
  147. return
  148. } else if res == nil {
  149. return
  150. }
  151. for _, c := range res.Cells {
  152. if c == nil || !existFamily(c.Family, family) {
  153. continue
  154. }
  155. upID := int64(binary.BigEndian.Uint32(c.Value))
  156. if upID != upper || upID <= 0 {
  157. continue
  158. }
  159. included = true
  160. log.Info("filter fan: included by hbase, fan(%d) upper(%d) table(%s)", fan, upper, table)
  161. return
  162. }
  163. if !included {
  164. log.Info("filter fan: excluded by hbase, fan(%d) upper(%d) table(%s)", fan, upper, table)
  165. }
  166. return
  167. }
  168. // FilterFans 批量筛选
  169. func (d *Dao) FilterFans(fans *[]int64, params map[string]interface{}) (err error) {
  170. base := params["base"]
  171. table := params["table"].(string)
  172. family := params["family"].([]string)
  173. result := params["result"].(*[]int64)
  174. excluded := params["excluded"].(*[]int64)
  175. handler := params["handler"].(func(context.Context, int64, interface{}, string, []string) (bool, error))
  176. mutex := sync.Mutex{}
  177. group := errgroup.Group{}
  178. l := len(*fans)
  179. for i := 0; i < l; i++ {
  180. shared := (*fans)[i]
  181. group.Go(func() (e error) {
  182. included, e := handler(context.TODO(), shared, base, table, family)
  183. if e != nil {
  184. log.Error("FilterFans error(%v) fan(%d) base(%d) table(%s) family(%v)", e, shared, base, table, family)
  185. }
  186. mutex.Lock()
  187. if included {
  188. *result = append(*result, shared)
  189. } else {
  190. *excluded = append(*excluded, shared)
  191. }
  192. mutex.Unlock()
  193. return
  194. })
  195. }
  196. group.Wait()
  197. return
  198. }
  199. // existFamily 某个hbase列族是否存在于指定列族中
  200. func existFamily(actual []byte, family []string) bool {
  201. for _, f := range family {
  202. if bytes.Equal(actual, []byte(f)) {
  203. return true
  204. }
  205. }
  206. return false
  207. }
  208. // filterFanByActive 根据用户的活跃时间段,过滤不在活跃期内更新的粉丝; 若无活跃列表,从默认活跃时间内过滤
  209. func (d *Dao) filterFanByActive(ctx context.Context, fan int64, oneHour interface{}, table string, family []string) (included bool, err error) {
  210. var (
  211. b []byte
  212. result *hrpc.Result
  213. c, cancel = context.WithTimeout(ctx, d.fanHBaseReadTimeout)
  214. activeHour int
  215. )
  216. defer cancel()
  217. hour := oneHour.(int)
  218. if _, included = d.ActiveDefaultTime[hour]; included {
  219. return
  220. }
  221. rowKey := md5.Sum(strconv.AppendInt(b, fan, 10))
  222. key := fmt.Sprintf("%x", rowKey)
  223. if result, err = d.fanHBase.Get(c, []byte(table), []byte(key)); err != nil {
  224. log.Error("filterFanByActive d.fanHBase.Get error(%v) table(%s) key(%s) fan(%d)", err, table, key, fan)
  225. PromError("hbase:Get")
  226. return
  227. } else if result == nil {
  228. return
  229. }
  230. included = false
  231. for _, cell := range result.Cells {
  232. if cell != nil && existFamily(cell.Family, family) {
  233. activeHour, err = strconv.Atoi(string(cell.Value))
  234. if err != nil {
  235. log.Error("filterFanByActive strconv.Atoi error(%v) fan(%d) value(%s)", err, fan, string(cell.Value))
  236. break
  237. }
  238. if activeHour == hour {
  239. included = true
  240. break
  241. }
  242. }
  243. }
  244. if !included {
  245. log.Info("filter fan:excluded by active time from table, fan(%d)", fan)
  246. }
  247. return
  248. }
  249. // ExistsInBlacklist 按黑名单过滤用户
  250. func (d *Dao) ExistsInBlacklist(ctx context.Context, upper int64, mids []int64) (exists, notExists []int64) {
  251. var (
  252. mutex sync.Mutex
  253. group = errgroup.Group{}
  254. )
  255. for _, mid := range mids {
  256. mid := mid
  257. group.Go(func() error {
  258. include, _ := d.filterFanByUpper(context.Background(), mid, upper, d.c.Abtest.HbaseBlacklistTable, d.c.Abtest.HbaseBlacklistFamily)
  259. mutex.Lock()
  260. if include {
  261. exists = append(exists, mid)
  262. } else {
  263. notExists = append(notExists, mid)
  264. }
  265. mutex.Unlock()
  266. return nil
  267. })
  268. }
  269. group.Wait()
  270. return
  271. }
  272. // ExistsInWhitelist 按白名单过滤用户
  273. func (d *Dao) ExistsInWhitelist(ctx context.Context, upper int64, mids []int64) (exists, notExists []int64) {
  274. var (
  275. mutex sync.Mutex
  276. group = errgroup.Group{}
  277. )
  278. for _, mid := range mids {
  279. mid := mid
  280. group.Go(func() error {
  281. include, _ := d.filterFanByUpper(context.Background(), mid, upper, d.c.Abtest.HbaseeWhitelistTable, d.c.Abtest.HbaseWhitelistFamily)
  282. mutex.Lock()
  283. if include {
  284. exists = append(exists, mid)
  285. } else {
  286. notExists = append(notExists, mid)
  287. }
  288. mutex.Unlock()
  289. return nil
  290. })
  291. }
  292. group.Wait()
  293. return
  294. }