hbase.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package archive
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/binary"
  7. "encoding/json"
  8. "fmt"
  9. "time"
  10. "github.com/tsuna/gohbase/hrpc"
  11. "go-common/app/admin/main/videoup/model/archive"
  12. "go-common/library/log"
  13. )
  14. var (
  15. tableInfo = "ugc:ArchiveTaskWeight"
  16. family = "weightlog"
  17. familyB = []byte(family)
  18. )
  19. // hashRowKey create rowkey(md5(tid)[:2]+tid) for track by tid.
  20. func hashRowKey(tid int64) string {
  21. var bs = make([]byte, 8)
  22. binary.LittleEndian.PutUint64(bs, uint64(tid))
  23. rk := md5.Sum(bs)
  24. return fmt.Sprintf("%x%d", rk[:2], tid)
  25. }
  26. // WeightLog get weight log.
  27. func (d *Dao) WeightLog(c context.Context, taskid int64) (ls []*archive.TaskWeightLog, err error) {
  28. var (
  29. result *hrpc.Result
  30. key = hashRowKey(taskid)
  31. ctx, cancel = context.WithTimeout(c, time.Duration(d.c.HBase.ReadTimeout))
  32. )
  33. defer cancel()
  34. if result, err = d.hbase.Get(ctx, []byte(tableInfo), []byte(key)); err != nil {
  35. log.Error("d.hbase.Get error(%v)", err)
  36. return
  37. }
  38. for _, c := range result.Cells {
  39. if c == nil || !bytes.Equal(c.Family, familyB) {
  40. return
  41. }
  42. aLog := &archive.TaskWeightLog{}
  43. if err = json.Unmarshal(c.Value, aLog); err != nil {
  44. log.Warn("json.Unmarshal(%s) error(%v)", string(c.Value), err)
  45. err = nil
  46. continue
  47. }
  48. ls = append(ls, aLog)
  49. }
  50. return
  51. }