task_report.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "sync"
  6. "go-common/app/job/main/aegis/model"
  7. "go-common/library/cache/redis"
  8. "go-common/library/log"
  9. )
  10. var rmux sync.Mutex
  11. //IncresByField .
  12. func (d *Dao) IncresByField(c context.Context, bizid, flowid, uid int64, field string, value int64) (err error) {
  13. var (
  14. conn = d.redis.Get(c)
  15. hk = model.PersonalHashKey(bizid, flowid, uid)
  16. )
  17. rmux.Lock()
  18. defer rmux.Unlock()
  19. defer conn.Close()
  20. if err = d.setSet(conn, hk); err != nil {
  21. return
  22. }
  23. if err = d.setHash(conn, hk, "ds"); err != nil {
  24. return
  25. }
  26. return d.setField(conn, hk, field, value)
  27. }
  28. //IncresTaskInOut 总进审量-出审量
  29. func (d *Dao) IncresTaskInOut(c context.Context, bizid, flowid int64, inOrOut string) (err error) {
  30. var (
  31. conn = d.redis.Get(c)
  32. hk = model.TotalHashKey(bizid, flowid)
  33. )
  34. rmux.Lock()
  35. defer rmux.Unlock()
  36. defer conn.Close()
  37. if err = d.setSet(conn, hk); err != nil {
  38. return
  39. }
  40. if err = d.setHash(conn, hk, inOrOut); err != nil {
  41. return
  42. }
  43. return d.setField(conn, hk, inOrOut, 1)
  44. }
  45. //FlushReport .
  46. func (d *Dao) FlushReport(c context.Context) (data map[string][]byte, err error) {
  47. data = make(map[string][]byte)
  48. rmux.Lock()
  49. defer rmux.Unlock()
  50. conn := d.redis.Get(c)
  51. defer conn.Close()
  52. keys, err := redis.Strings(conn.Do("SMEMBERS", model.SetKey))
  53. if err != nil {
  54. log.Error("SMEMBERS %s error(%v)", model.SetKey, err)
  55. return
  56. }
  57. if len(keys) == 0 {
  58. log.Info("FlushReport empty")
  59. return
  60. }
  61. for _, key := range keys {
  62. if err = conn.Send("HGETALL", key); err != nil {
  63. log.Error("HGETALL %s error(%v)", key, err)
  64. return
  65. }
  66. }
  67. conn.Flush()
  68. for _, key := range keys {
  69. var (
  70. bs []byte
  71. mp map[string]int64
  72. )
  73. if mp, err = redis.Int64Map(conn.Receive()); err != nil {
  74. log.Error("Receive error(%v)", err)
  75. return
  76. }
  77. if bs, err = json.Marshal(mp); err != nil {
  78. log.Error("Marshal mp(%+v) error(%v)", mp, err)
  79. }
  80. data[key] = bs
  81. }
  82. for _, key := range keys {
  83. conn.Do("DEL", key)
  84. }
  85. conn.Do("DEL", model.SetKey)
  86. return
  87. }
  88. //记录key
  89. func (d *Dao) setSet(conn redis.Conn, hk string) (err error) {
  90. if _, err := conn.Do("SADD", model.SetKey, hk); err != nil {
  91. log.Error("setSet SADD(%s,%s) error(%v)", model.SetKey, hk, err)
  92. }
  93. return
  94. }
  95. //创建hash
  96. func (d *Dao) setHash(conn redis.Conn, key string, defaultfield string) (err error) {
  97. var exist bool
  98. if exist, err = redis.Bool(conn.Do("EXISTS", key)); err != nil {
  99. log.Error("setHash EXISTS(%s) error(%v)", key, err)
  100. return
  101. }
  102. if !exist {
  103. if _, err = conn.Do("HMSET", key, defaultfield, 0); err != nil {
  104. log.Error("setHash HMSET(%s,%s,%d) error(%v)", key, defaultfield, 0, err)
  105. }
  106. }
  107. return
  108. }
  109. //每个field赋值
  110. func (d *Dao) setField(conn redis.Conn, key string, field string, value int64) (err error) {
  111. var exist bool
  112. if exist, err = redis.Bool(conn.Do("HEXISTS", key, field)); err != nil {
  113. log.Error("setField HEXISTS(%s,%s,%s) error(%v)", key, field, err)
  114. return
  115. }
  116. if !exist {
  117. if _, err = conn.Do("HMSET", key, field, 0); err != nil {
  118. log.Error("setField HMSET(%s,%s,%d) error(%v)", key, field, 0, err)
  119. }
  120. }
  121. if _, err = conn.Do("HINCRBY", key, field, value); err != nil {
  122. log.Error("setField HINCRBY(%s,%s,%d) error(%v)", key, field, 1, err)
  123. }
  124. return nil
  125. }