hbase.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package toview
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/binary"
  7. "fmt"
  8. "strconv"
  9. "time"
  10. "go-common/app/interface/main/history/model"
  11. "go-common/library/log"
  12. "github.com/tsuna/gohbase/hrpc"
  13. )
  14. var (
  15. tableInfo = "ugc:history_to_view"
  16. familyAid = "aid"
  17. familyAidB = []byte(familyAid)
  18. )
  19. // hashRowKey create rowkey(md5(mid)[:2]+mid) for histroy by mid .
  20. func hashRowKey(mid int64) string {
  21. var bs = make([]byte, 8)
  22. binary.LittleEndian.PutUint64(bs, uint64(mid))
  23. rk := md5.Sum(bs)
  24. return fmt.Sprintf("%x%d", rk[:2], mid)
  25. }
  26. // Add add one toview.
  27. func (d *Dao) Add(ctx context.Context, mid, aid, now int64) (err error) {
  28. var (
  29. timeB = make([]byte, 8)
  30. key = hashRowKey(mid)
  31. column = strconv.FormatInt(aid, 10)
  32. )
  33. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  34. defer cancel()
  35. binary.BigEndian.PutUint64(timeB, uint64(now))
  36. values := map[string]map[string][]byte{familyAid: map[string][]byte{column: timeB}}
  37. if _, err = d.info.PutStr(ctx, tableInfo, key, values); err != nil {
  38. log.Error("toview info.Put error(%v)", err)
  39. }
  40. return
  41. }
  42. // Adds add some toview.
  43. func (d *Dao) Adds(ctx context.Context, mid int64, aids []int64, now int64) (err error) {
  44. var (
  45. timeB = make([]byte, 8)
  46. key = hashRowKey(mid)
  47. )
  48. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  49. defer cancel()
  50. binary.BigEndian.PutUint64(timeB, uint64(now))
  51. aidValues := make(map[string][]byte, len(aids))
  52. for _, aid := range aids {
  53. aidValues[strconv.FormatInt(aid, 10)] = timeB
  54. }
  55. values := map[string]map[string][]byte{familyAid: aidValues}
  56. if _, err = d.info.PutStr(ctx, tableInfo, key, values); err != nil {
  57. log.Error("toview info.Put error(%v)", err)
  58. }
  59. return
  60. }
  61. // AddMap add some toview.
  62. func (d *Dao) AddMap(ctx context.Context, mid int64, views map[int64]*model.ToView) (err error) {
  63. var (
  64. timeB = make([]byte, 8)
  65. key = hashRowKey(mid)
  66. )
  67. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  68. defer cancel()
  69. aidValues := make(map[string][]byte, len(views))
  70. for _, v := range views {
  71. binary.BigEndian.PutUint64(timeB, uint64(v.Unix))
  72. aidValues[strconv.FormatInt(v.Aid, 10)] = timeB
  73. }
  74. values := map[string]map[string][]byte{familyAid: aidValues}
  75. if _, err = d.info.PutStr(ctx, tableInfo, key, values); err != nil {
  76. log.Error("toview info.Put error(%v)", err)
  77. }
  78. return
  79. }
  80. // ListInfo get all ToViews from hbase.
  81. func (d *Dao) ListInfo(ctx context.Context, mid int64, aids []int64) (res []*model.ToView, err error) {
  82. var (
  83. result *hrpc.Result
  84. key = hashRowKey(mid)
  85. )
  86. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.ReadTimeout))
  87. defer cancel()
  88. var options []func(hrpc.Call) error
  89. if len(aids) != 0 {
  90. colunms := make([]string, 0, len(aids))
  91. for _, aid := range aids {
  92. colunms = append(colunms, fmt.Sprintf("%d", aid))
  93. }
  94. options = append(options, hrpc.Families(map[string][]string{familyAid: colunms}))
  95. }
  96. result, err = d.info.GetStr(ctx, tableInfo, key, options...)
  97. if err != nil && result == nil {
  98. log.Error("d.info.Get error(%v)", err)
  99. return
  100. }
  101. res = make([]*model.ToView, 0)
  102. for _, c := range result.Cells {
  103. if c != nil && len(c.Value) == 8 && bytes.Equal(c.Family, familyAidB) {
  104. aid, err := strconv.ParseInt(string(c.Qualifier), 10, 64)
  105. if err != nil {
  106. log.Error("strconv.ParseInt error(%v)", err)
  107. continue
  108. }
  109. t := &model.ToView{Aid: aid}
  110. t.Unix = int64(binary.BigEndian.Uint64(c.Value))
  111. res = append(res, t)
  112. }
  113. }
  114. return
  115. }
  116. // MapInfo get all ToViews from hbase.
  117. func (d *Dao) MapInfo(ctx context.Context, mid int64, aids []int64) (res map[int64]*model.ToView, err error) {
  118. var (
  119. result *hrpc.Result
  120. key = hashRowKey(mid)
  121. )
  122. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.ReadTimeout))
  123. defer cancel()
  124. var options []func(hrpc.Call) error
  125. if len(aids) != 0 {
  126. colunms := make([]string, 0, len(aids))
  127. for _, aid := range aids {
  128. colunms = append(colunms, fmt.Sprintf("%d", aid))
  129. }
  130. options = append(options, hrpc.Families(map[string][]string{familyAid: colunms}))
  131. }
  132. result, err = d.info.GetStr(ctx, tableInfo, key, options...)
  133. if err != nil {
  134. log.Error("d.info.Get error(%v)", err)
  135. return
  136. }
  137. res = make(map[int64]*model.ToView, len(aids))
  138. if result == nil {
  139. return
  140. }
  141. for _, c := range result.Cells {
  142. if c != nil && len(c.Value) == 8 && bytes.Equal(c.Family, familyAidB) {
  143. aid, err := strconv.ParseInt(string(c.Qualifier), 10, 64)
  144. if err != nil {
  145. log.Error("strconv.ParseInt error(%v)", err)
  146. continue
  147. }
  148. t := &model.ToView{Aid: aid}
  149. t.Unix = int64(binary.BigEndian.Uint64(c.Value))
  150. res[aid] = t
  151. }
  152. }
  153. return
  154. }
  155. // Del delete more toview.
  156. func (d *Dao) Del(ctx context.Context, mid int64, aids []int64) (err error) {
  157. key := hashRowKey(mid)
  158. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  159. defer cancel()
  160. columns := make(map[string][]byte, len(aids))
  161. for _, aid := range aids {
  162. columns[strconv.FormatInt(aid, 10)] = []byte{}
  163. }
  164. values := map[string]map[string][]byte{familyAid: columns}
  165. if _, err = d.info.Delete(ctx, tableInfo, key, values); err != nil {
  166. log.Error("toview info.Delete error(%v)", err)
  167. }
  168. return
  169. }
  170. // Clear clear ToView.
  171. func (d *Dao) Clear(ctx context.Context, mid int64) (err error) {
  172. key := hashRowKey(mid)
  173. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  174. defer cancel()
  175. if _, err = d.info.Delete(ctx, tableInfo, key, nil); err != nil {
  176. log.Error("toview info.Delete error(%v)", err)
  177. }
  178. return
  179. }