package notice import ( "context" "go-common/app/service/live/xuser/api/grpc/v1" v1pb "go-common/app/service/live/xuserex/api/grpc/v1" "go-common/app/service/live/xuserex/conf" "go-common/library/cache/memcache" "bytes" "encoding/json" "fmt" "go-common/app/service/live/resource/sdk" "go-common/app/service/live/xuserex/model/roomNotice" "go-common/library/database/hbase.v2" "go-common/library/ecode" "go-common/library/log" bm "go-common/library/net/http/blademaster" "go-common/library/stat/prom" "strconv" "time" ) // Dao dao type Dao struct { c *conf.Config mc *memcache.Pool // acc rpc client *bm.Client hbase *hbase.Client xuser *v1.Client } type guardGuideConf struct { Open int64 `json:"open"` Threshold int64 `json:"threshold"` } const ( // 缓存过期时间 keyShouldNoticeExpire = 3600 keyNoticePre = "kn_v1_%d_%d_%s" // HBaseMonthlyConsumeTable . HBaseMonthlyConsumeTable = "livemonthconsume" // BuyGuardGuideTitanKey . BuyGuardGuideTitanKey = "buy_guard_guide" // TaskFinishKey . TaskFinishKey = "task_finish_%s" ) // New init mysql db func New(c *conf.Config) (dao *Dao) { dao = &Dao{ c: c, mc: memcache.NewPool(c.Memcache), client: bm.NewClient(c.BMClient), hbase: hbase.NewClient(c.HBase), } conn, err := v1.NewClient(c.Warden) if err != nil { panic(err) } dao.xuser = conn return } func keyShouldNotice(UID int64, targetID int64, date string) string { return fmt.Sprintf(keyNoticePre, UID, targetID, date) } // IsNotice returns whether should pop a purchase notice. func (dao *Dao) IsNotice(c context.Context, UID int64, targetID int64) (*v1pb.RoomNoticeBuyGuardResp, error) { term := dao.GetTermBegin() begin := term.Unix() end := dao.GetTermEnd() resp := &v1pb.RoomNoticeBuyGuardResp{ Begin: begin, End: end.Unix(), Now: time.Now().Unix(), Title: "感谢支持主播", Content: "成为船员为主播保驾护航吧~", Button: "开通大航海", } shouldNotice, err := dao.getShouldNotice(c, UID, targetID, term) if err != nil { log.Error("dao getShouldNotice uid(%v)roomid(%v)term(%v) error(%v)", UID, targetID, term.Format("2006-01-02"), err) err = nil return resp, err } resp.ShouldNotice = int64(shouldNotice) return resp, nil } //go:generate $GOPATH/src/go-common/app/tool/cache/gen type _cache interface { // cache: -sync=true -nullcache=&roomNotice.MonthConsume{Amount:-1} -check_null_code=$.Amount==-1 MonthConsume(c context.Context, UID int64, targetID int64, date string) (*roomNotice.MonthConsume, error) } //go:generate $GOPATH/src/go-common/app/tool/cache/mc type _mc interface { // 获取某个月消费 // mc: -key=keyShouldNotice CacheMonthConsume(c context.Context, UID int64, targetID int64, date string) (*roomNotice.MonthConsume, error) // 保存获取某个月消费 // mc: -key=keyShouldNotice -expire=d.keyShouldNoticeExpire -encode=json|gzip AddCacheMonthConsume(c context.Context, UID int64, targetID int64, date string, value *roomNotice.MonthConsume) error } func (dao *Dao) getThreshold() (threshold *guardGuideConf, err error) { threshold = &guardGuideConf{} guideConf, err := titansSdk.Get(BuyGuardGuideTitanKey) log.Info("getThreshold_key(%v) conf(%+v)", BuyGuardGuideTitanKey, guideConf) if err != nil { log.Error("getThreshold(%v) error(%v)", BuyGuardGuideTitanKey, err) return } if "" == guideConf { return } if err = json.Unmarshal([]byte(guideConf), threshold); err != nil { log.Error("json Unmarshal guideconf(%+v) error(%v)", guideConf, err) return } log.Info("getThreshold_unmarshal_succ key(%v) conf (%v) Threshold(%+v)", BuyGuardGuideTitanKey, guideConf, threshold) return } func (dao *Dao) getShouldNotice(ctx context.Context, UID int64, targetID int64, term time.Time) (shouldNotice int, err error) { shouldNotice = 0 taskFinish, err := dao.GetTaskFinish(ctx, term) if err != nil { return shouldNotice, err } if !taskFinish { log.Info("task_not_finish") return shouldNotice, err } // 获取配置的收入门槛 threshold, err := dao.getThreshold() if err != nil { log.Error("get_threshold_error(%v)", err) return } if nil == threshold { log.Error("get_threshold_nil") return } if 0 == threshold.Open { log.Info("guard_guide not Open (%+v)", threshold) return } monthConsume, err := dao.MonthConsume(ctx, UID, targetID, dao.termToString(term)) log.Info("get_monthConsume(%+v) Threshold (%+v)", monthConsume, threshold) if err != nil { log.Error("get_monthConsum_err uid(%d) targetid(%v) term (%v) error(%v)", UID, targetID, term, err) return } if nil == monthConsume { return } if int64(monthConsume.Amount) >= threshold.Threshold*1000 { // coin to rmb isGuard, err := dao.isGuard(ctx, UID, targetID) log.Info("show guard guide uid(%v) target (%v) guard (%v) Threshold (%v)", UID, targetID, isGuard, threshold) if err != nil { log.Error("get gaurd UID(%v) targetid (%v) error(%v)", UID, targetID, err) return shouldNotice, err } if !isGuard { shouldNotice = 1 } } return } func hbaseRowKey(UID int64, targetID int64, date string) string { return fmt.Sprintf("%s_%d_%d", date, UID, targetID) } //RawMonthConsume get month consume from hbase func (dao *Dao) RawMonthConsume(ctx context.Context, UID int64, targetID int64, date string) (res *roomNotice.MonthConsume, err error) { var ( tableName = HBaseMonthlyConsumeTable key = hbaseRowKey(UID, targetID, date) ) result, err := dao.hbase.GetStr(ctx, tableName, key) log.Info("RawMonthConsume_getstr tableName (%v) key (%v) res (%v)", tableName, key, result) if err != nil { log.Error("dao.hbase.GetStr tableName(%s)|mid(%d)|key(%v)|error(%v)", tableName, UID, key, err) err = ecode.CreativeDataErr return } if result == nil { return } res = &roomNotice.MonthConsume{} for _, c := range result.Cells { if c == nil { continue } v, _ := strconv.ParseInt(string(c.Value[:]), 10, 64) if !bytes.Equal(c.Family, []byte("info")) { log.Error("family_type_err(%v) error", c.Family) continue } switch { case bytes.Equal(c.Qualifier, []byte("uid")): res.Uid = v case bytes.Equal(c.Qualifier, []byte("ruid")): res.Ruid = v case bytes.Equal(c.Qualifier, []byte("amount")): res.Amount = v case bytes.Equal(c.Qualifier, []byte("time")): res.Date = v } } log.Info("RawMonthConsume_succ uid (%v) target (%v) date (%v) res (%+v)", UID, targetID, date, res) return } // GetTermBegin return first day of last month func (dao *Dao) GetTermBegin() time.Time { year, month, _ := time.Now().Date() thisMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local) return thisMonth.AddDate(0, -1, 0) } // GetTermEnd returns last second of last month func (dao *Dao) GetTermEnd() time.Time { year, month, _ := time.Now().Date() thisMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local) second, _ := time.ParseDuration("-1s") return thisMonth.Add(second) } // IsValidTerm returns whether a term is valid. func (dao *Dao) IsValidTerm(term time.Time) bool { return true } // TOOD func (dao *Dao) isGuard(c context.Context, UID int64, targetID int64) (isGuard bool, err error) { ret, err := dao.xuser.GetByUIDTargetID(c, &v1.GetByUidTargetIdReq{ Uid: UID, TargetId: targetID, }) log.Info("dao.xuser.GetByUIDTargetID uid (%v) target (%v) res (%v)", UID, targetID, ret) if err != nil { log.Error("get guard uid (%v) target (%v) error(%v)", UID, targetID, err) return } if nil == ret || nil == ret.Data || 0 == len(ret.Data) { log.Info("not_guard uid (%v) target (%v) res (%v)", UID, targetID, ret) return } isGuard = true return } // GetTaskFinish . func (dao *Dao) GetTaskFinish(c context.Context, term time.Time) (isOn bool, err error) { conn := dao.mc.Get(c) defer conn.Close() key := dao.keyTaskFinish(term) reply, err := conn.Get(key) log.Info("GetTaskFinish key (%v) term (%v)", key, term) if err != nil { if err == memcache.ErrNotFound { log.Info("GetTaskFinish_not_found key (%v) term (%v)", key, term) err = nil return } prom.BusinessErrCount.Incr("mc:GetTaskFinish") log.Error("GetTaskFinish_fail key(%v) error(%v)", key, err) return } res := &roomNotice.TaskFinish{} err = conn.Scan(reply, &res) if err != nil { prom.BusinessErrCount.Incr("mc:GetTaskFinish") log.Error("GetTaskFinish_fail_scan key(%v) error(%v)", key, err) return } log.Info("GetTaskFinish_succ key (%v) term (%v) res(%+v)", key, term, res) if res == nil { return } if 1 == res.Finish { isOn = true } return } // SetTaskFinish . func (dao *Dao) SetTaskFinish(c context.Context, term time.Time, isFinish int64) (err error) { conn := dao.mc.Get(c) defer conn.Close() key := dao.keyTaskFinish(term) value := &roomNotice.TaskFinish{ Finish: isFinish, } log.Info("SetTaskFinish key (%v) term (%v) value (%+v)", key, term, value) item := &memcache.Item{ Key: key, Object: value, Flags: memcache.FlagJSON, Expiration: 0, } if err = conn.Set(item); err != nil { prom.BusinessErrCount.Incr("mc:SetTaskFinish") log.Error("SetTaskFinish_fail key(%v) value (%+v) error(%v)", key, value, err) return } log.Info("SetTaskFinish_succ key (%v) term (%v) value (%+v)", key, term, value) return } func (dao *Dao) termToString(term time.Time) string { return term.Format("20060102") } func (dao *Dao) keyTaskFinish(term time.Time) (key string) { return fmt.Sprintf(TaskFinishKey, dao.termToString(term)) }