hbase.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "fmt"
  7. "time"
  8. "go-common/app/job/main/figure/conf"
  9. "go-common/app/job/main/figure/model"
  10. "go-common/library/log"
  11. "github.com/pkg/errors"
  12. )
  13. func (d *Dao) rowKey(mid int64) (res string) {
  14. res = fmt.Sprintf("%d", mid)
  15. return
  16. }
  17. func (d *Dao) rowVerKey(mid int64, now time.Time) (res string) {
  18. res = fmt.Sprintf("%d_%d", mid, d.Version(now))
  19. return
  20. }
  21. // PutSpyScore add spy score info.
  22. func (d *Dao) PutSpyScore(c context.Context, mid int64, score int8) (err error) {
  23. var (
  24. key = d.rowKey(mid)
  25. scoreB = make([]byte, 8)
  26. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  27. )
  28. defer cancel()
  29. log.Info("Put spy score key [%s] score [%d]", key, score)
  30. binary.BigEndian.PutUint64(scoreB, uint64(score))
  31. values := map[string]map[string][]byte{model.USFamilyUser: map[string][]byte{model.USColumnSpyScore: scoreB}}
  32. if _, err = d.hbase.PutStr(ctx, model.UserInfoTable, key, values); err != nil {
  33. log.Error("hbase.Put error(%v)", err)
  34. }
  35. return
  36. }
  37. // PutReplyAct add spy score info.
  38. func (d *Dao) PutReplyAct(c context.Context, mid int64, column string, incr int64) (err error) {
  39. var (
  40. key = d.rowVerKey(mid, time.Now())
  41. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  42. )
  43. defer cancel()
  44. log.Info("Put reply act key [%s] c [%s] incr [%d]", key, column, incr)
  45. bytesBuffer := bytes.NewBuffer([]byte{})
  46. binary.Write(bytesBuffer, binary.BigEndian, incr)
  47. values := map[string]map[string][]byte{model.ACFamilyUser: map[string][]byte{column: bytesBuffer.Bytes()}}
  48. if _, err = d.hbase.Increment(ctx, model.ActionCounterTable, key, values); err != nil {
  49. err = errors.Wrapf(err, "msg(%d,%s,%d), hbase.Increment(key: %s values: %v)", mid, column, incr, key, values)
  50. }
  51. return
  52. }
  53. // PutCoinUnusual coin unusual.
  54. func (d *Dao) PutCoinUnusual(c context.Context, mid int64, column string) (err error) {
  55. var (
  56. key = d.rowVerKey(mid, time.Now())
  57. incrBytes = make([]byte, 8)
  58. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  59. )
  60. defer cancel()
  61. log.Info("Put coin unusual key [%s] c [%s]", key, column)
  62. binary.BigEndian.PutUint64(incrBytes, uint64(1))
  63. values := map[string]map[string][]byte{model.ACFamilyUser: map[string][]byte{column: incrBytes}}
  64. if _, err = d.hbase.Increment(ctx, model.ActionCounterTable, key, values); err != nil {
  65. err = errors.Wrapf(err, "msg(%d,%s), hbase.Increment(key: %s values: %v)", mid, column, key, values)
  66. }
  67. return
  68. }
  69. // PutCoinCount coin count.
  70. func (d *Dao) PutCoinCount(c context.Context, mid int64) (err error) {
  71. var (
  72. key = d.rowVerKey(mid, time.Now())
  73. incrBytes = make([]byte, 8)
  74. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  75. )
  76. defer cancel()
  77. log.Info("Put coin count key [%s]", key)
  78. binary.BigEndian.PutUint64(incrBytes, uint64(1))
  79. values := map[string]map[string][]byte{model.ACFamilyUser: map[string][]byte{model.ACColumnCoins: incrBytes}}
  80. if _, err = d.hbase.Increment(ctx, model.ActionCounterTable, key, values); err != nil {
  81. err = errors.Wrapf(err, "msg(%d), hbase.Increment(key: %s values: %v)", mid, key, values)
  82. }
  83. return
  84. }
  85. // PayOrderInfo user pay order info.
  86. func (d *Dao) PayOrderInfo(c context.Context, column string, mid, money int64) (err error) {
  87. var (
  88. key = d.rowVerKey(mid, time.Now())
  89. incrBytes = make([]byte, 8)
  90. ctx, cancel = context.WithTimeout(c, time.Duration(conf.Conf.HBase.WriteTimeout))
  91. )
  92. defer cancel()
  93. log.Info("Pay Order key [%s]", key)
  94. binary.BigEndian.PutUint64(incrBytes, uint64(money))
  95. values := map[string]map[string][]byte{model.ACFamilyUser: map[string][]byte{column: incrBytes}}
  96. if _, err = d.hbase.Increment(ctx, model.ActionCounterTable, key, values); err != nil {
  97. err = errors.Wrapf(err, "msg(%d,%s,%d), hbase.Increment(key: %s values: %v)", mid, column, key, values)
  98. }
  99. return
  100. }