weightlog.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package hbase
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/binary"
  6. "encoding/json"
  7. "fmt"
  8. "strconv"
  9. "time"
  10. "go-common/app/job/main/videoup-report/model/task"
  11. "go-common/library/log"
  12. )
  13. var (
  14. tableInfo = "ugc:ArchiveTaskWeight"
  15. family = "weightlog"
  16. )
  17. // hashRowKey create rowkey(md5(tid)[:2]+tid) for track by tid.
  18. func hashRowKey(tid int64) string {
  19. var bs = make([]byte, 8)
  20. binary.LittleEndian.PutUint64(bs, uint64(tid))
  21. rk := md5.Sum(bs)
  22. return fmt.Sprintf("%x%d", rk[:2], tid)
  23. }
  24. // AddLog task weight log.
  25. func (d *Dao) AddLog(c context.Context, alog *task.WeightLog) (err error) {
  26. var (
  27. value []byte
  28. fvalues = make(map[string][]byte)
  29. column string
  30. key = hashRowKey(alog.TaskID)
  31. )
  32. column = strconv.FormatInt(int64(alog.Uptime.TimeValue().Unix()), 10)
  33. if value, err = json.Marshal(alog); err != nil {
  34. log.Error("json.Marshal(%v) error(%v)", value, err)
  35. return
  36. }
  37. fvalues[column] = value
  38. values := map[string]map[string][]byte{family: fvalues}
  39. ctx, cancel := context.WithTimeout(c, time.Duration(d.c.Hbase.WriteTimeout))
  40. defer cancel()
  41. // hbase info
  42. if _, err = d.hbase.PutStr(ctx, tableInfo, key, values); err != nil {
  43. log.Error("d.hbase.PutStr(%s,%s,%+v) error(%v)", tableInfo, key, values, err)
  44. }
  45. return
  46. }