hbase.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package dao
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/binary"
  6. "encoding/json"
  7. "fmt"
  8. "strconv"
  9. "time"
  10. "go-common/app/interface/main/history/model"
  11. "go-common/library/log"
  12. )
  13. var (
  14. tableInfo = "ugc:history"
  15. family = "info"
  16. )
  17. // hashRowKey create rowkey(md5(mid)[:2]+mid) for histroy by mid .
  18. func hashRowKey(mid int64) string {
  19. var bs = make([]byte, 8)
  20. binary.LittleEndian.PutUint64(bs, uint64(mid))
  21. rk := md5.Sum(bs)
  22. return fmt.Sprintf("%x%d", rk[:2], mid)
  23. }
  24. func (d *Dao) column(aid int64, typ int8) string {
  25. if typ < model.TypeArticle {
  26. return strconv.FormatInt(aid, 10)
  27. }
  28. return fmt.Sprintf("%d_%d", aid, typ)
  29. }
  30. // Add add history list.
  31. func (d *Dao) Add(ctx context.Context, h *model.History) error {
  32. valueByte, err := json.Marshal(h)
  33. if err != nil {
  34. log.Error("json.Marshal(%v) error(%v)", h, err)
  35. return err
  36. }
  37. fValues := make(map[string][]byte)
  38. column := d.column(h.Aid, h.TP)
  39. fValues[column] = valueByte
  40. key := hashRowKey(h.Mid)
  41. values := map[string]map[string][]byte{family: fValues}
  42. ctx, cancel := context.WithTimeout(ctx, time.Duration(d.conf.Info.WriteTimeout))
  43. defer cancel()
  44. if _, err = d.info.PutStr(ctx, tableInfo, key, values); err != nil {
  45. log.Error("info.PutStr error(%v)", err)
  46. }
  47. return nil
  48. }