123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package dao
- import (
- "context"
- "encoding/json"
- "sync"
- "go-common/app/job/main/aegis/model"
- "go-common/library/cache/redis"
- "go-common/library/log"
- )
- var rmux sync.Mutex
- //IncresByField .
- func (d *Dao) IncresByField(c context.Context, bizid, flowid, uid int64, field string, value int64) (err error) {
- var (
- conn = d.redis.Get(c)
- hk = model.PersonalHashKey(bizid, flowid, uid)
- )
- rmux.Lock()
- defer rmux.Unlock()
- defer conn.Close()
- if err = d.setSet(conn, hk); err != nil {
- return
- }
- if err = d.setHash(conn, hk, "ds"); err != nil {
- return
- }
- return d.setField(conn, hk, field, value)
- }
- //IncresTaskInOut 总进审量-出审量
- func (d *Dao) IncresTaskInOut(c context.Context, bizid, flowid int64, inOrOut string) (err error) {
- var (
- conn = d.redis.Get(c)
- hk = model.TotalHashKey(bizid, flowid)
- )
- rmux.Lock()
- defer rmux.Unlock()
- defer conn.Close()
- if err = d.setSet(conn, hk); err != nil {
- return
- }
- if err = d.setHash(conn, hk, inOrOut); err != nil {
- return
- }
- return d.setField(conn, hk, inOrOut, 1)
- }
- //FlushReport .
- func (d *Dao) FlushReport(c context.Context) (data map[string][]byte, err error) {
- data = make(map[string][]byte)
- rmux.Lock()
- defer rmux.Unlock()
- conn := d.redis.Get(c)
- defer conn.Close()
- keys, err := redis.Strings(conn.Do("SMEMBERS", model.SetKey))
- if err != nil {
- log.Error("SMEMBERS %s error(%v)", model.SetKey, err)
- return
- }
- if len(keys) == 0 {
- log.Info("FlushReport empty")
- return
- }
- for _, key := range keys {
- if err = conn.Send("HGETALL", key); err != nil {
- log.Error("HGETALL %s error(%v)", key, err)
- return
- }
- }
- conn.Flush()
- for _, key := range keys {
- var (
- bs []byte
- mp map[string]int64
- )
- if mp, err = redis.Int64Map(conn.Receive()); err != nil {
- log.Error("Receive error(%v)", err)
- return
- }
- if bs, err = json.Marshal(mp); err != nil {
- log.Error("Marshal mp(%+v) error(%v)", mp, err)
- }
- data[key] = bs
- }
- for _, key := range keys {
- conn.Do("DEL", key)
- }
- conn.Do("DEL", model.SetKey)
- return
- }
- //记录key
- func (d *Dao) setSet(conn redis.Conn, hk string) (err error) {
- if _, err := conn.Do("SADD", model.SetKey, hk); err != nil {
- log.Error("setSet SADD(%s,%s) error(%v)", model.SetKey, hk, err)
- }
- return
- }
- //创建hash
- func (d *Dao) setHash(conn redis.Conn, key string, defaultfield string) (err error) {
- var exist bool
- if exist, err = redis.Bool(conn.Do("EXISTS", key)); err != nil {
- log.Error("setHash EXISTS(%s) error(%v)", key, err)
- return
- }
- if !exist {
- if _, err = conn.Do("HMSET", key, defaultfield, 0); err != nil {
- log.Error("setHash HMSET(%s,%s,%d) error(%v)", key, defaultfield, 0, err)
- }
- }
- return
- }
- //每个field赋值
- func (d *Dao) setField(conn redis.Conn, key string, field string, value int64) (err error) {
- var exist bool
- if exist, err = redis.Bool(conn.Do("HEXISTS", key, field)); err != nil {
- log.Error("setField HEXISTS(%s,%s,%s) error(%v)", key, field, err)
- return
- }
- if !exist {
- if _, err = conn.Do("HMSET", key, field, 0); err != nil {
- log.Error("setField HMSET(%s,%s,%d) error(%v)", key, field, 0, err)
- }
- }
- if _, err = conn.Do("HINCRBY", key, field, value); err != nil {
- log.Error("setField HINCRBY(%s,%s,%d) error(%v)", key, field, 1, err)
- }
- return nil
- }
|