123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package dao
- import (
- "bytes"
- "context"
- "crypto/md5"
- "encoding/json"
- "fmt"
- "github.com/tsuna/gohbase/hrpc"
- "go-common/app/interface/live/push-live/model"
- "go-common/library/log"
- "go-common/library/sync/errgroup"
- "sync"
- )
- const _hbaseShard = 200
- var (
- hbaseTable = "ugc:PushArchive"
- hbaseFamily = "relation"
- hbaseFamilyB = []byte(hbaseFamily)
- )
- func (d *Dao) Fans(c context.Context, upper int64, types int) (fans map[int64]bool, fansSP map[int64]bool, err error) {
- var mutex sync.Mutex
- fans = make(map[int64]bool)
- fansSP = make(map[int64]bool)
- group := errgroup.Group{}
- for i := 0; i < _hbaseShard; i++ {
- shard := int64(i)
- group.Go(func() (e error) {
- key := _rowKey(upper, shard)
- relations, e := d.fansByKey(context.TODO(), key)
- if e != nil {
- return
- }
- mutex.Lock()
- for fansID, fansType := range relations {
- switch types {
-
- case model.RelationAttention:
- if fansType == types {
- fans[fansID] = true
- }
-
- case model.RelationSpecial:
- if fansType == types {
- fansSP[fansID] = true
- }
-
- case model.RelationAll:
- if fansType == model.RelationSpecial {
- fansSP[fansID] = true
- } else if fansType == model.RelationAttention {
- fans[fansID] = true
- }
- default:
- return
- }
- }
- mutex.Unlock()
- return
- })
- }
- group.Wait()
- return
- }
- func (d *Dao) SeparateFans(c context.Context, upper int64, fansIn map[int64]bool) (fans map[int64]bool, fansSP map[int64]bool, err error) {
- var mutex sync.Mutex
- special := make(map[int64]bool)
- fans = make(map[int64]bool)
- fansSP = make(map[int64]bool)
- group := errgroup.Group{}
- for i := 0; i < _hbaseShard; i++ {
- shard := int64(i)
- group.Go(func() (e error) {
- key := _rowKey(upper, shard)
- relations, e := d.fansByKey(context.TODO(), key)
- if e != nil {
- return
- }
- mutex.Lock()
-
- for fansID, fansType := range relations {
- if fansType == model.RelationSpecial {
- special[fansID] = true
- }
- }
- mutex.Unlock()
- return
- })
- }
- group.Wait()
- for id := range fansIn {
- if _, ok := special[id]; ok {
-
- fansSP[id] = true
- } else {
-
- fans[id] = true
- }
- }
- return
- }
- func _rowKey(upper, fans int64) string {
- k := fmt.Sprintf("%d_%d", upper, fans%_hbaseShard)
- key := fmt.Sprintf("%x", md5.Sum([]byte(k)))
- return key
- }
- func (d *Dao) fansByKey(c context.Context, key string) (relations map[int64]int, err error) {
- relations = make(map[int64]int)
- var (
- query *hrpc.Get
- result *hrpc.Result
- ctx, cancel = context.WithTimeout(c, d.relationHBaseReadTimeout)
- )
- defer cancel()
- if result, err = d.relationHBase.GetStr(ctx, hbaseTable, key); err != nil {
- log.Error("d.relationHBase.Get error(%v) querytable(%v)", err, string(query.Table()))
-
- return
- } else if result == nil {
- return
- }
- for _, c := range result.Cells {
- if c != nil && bytes.Equal(c.Family, hbaseFamilyB) {
- if err = json.Unmarshal(c.Value, &relations); err != nil {
- log.Error("json.Unmarshal() error(%v)", err)
- return
- }
- break
- }
- }
- return
- }
|