danmaku.go 963 B

12345678910111213141516171819202122232425262728293031
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "time"
  7. "go-common/app/job/main/figure/conf"
  8. "go-common/app/job/main/figure/model"
  9. "go-common/library/log"
  10. "github.com/pkg/errors"
  11. )
  12. // DanmakuReport .
  13. func (d *Dao) DanmakuReport(c context.Context, mid int64, column string, incr int64) (err error) {
  14. var (
  15. key = d.rowVerKey(mid, time.Now())
  16. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  17. )
  18. defer cancel()
  19. log.Info("Put danmaku act key [%s] c [%s] incr [%d]", key, column, incr)
  20. bytesBuffer := bytes.NewBuffer([]byte{})
  21. binary.Write(bytesBuffer, binary.BigEndian, incr)
  22. values := map[string]map[string][]byte{model.ACFamilyUser: map[string][]byte{column: bytesBuffer.Bytes()}}
  23. if _, err = d.hbase.Increment(ctx, model.ActionCounterTable, key, values); err != nil {
  24. err = errors.Wrapf(err, "msg(%d,%s,%d), hbase.Increment(key: %s values: %v)", mid, column, incr, key, values)
  25. }
  26. return
  27. }