roomNotice.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package notice
  2. import (
  3. "context"
  4. "go-common/app/service/live/xuser/api/grpc/v1"
  5. v1pb "go-common/app/service/live/xuserex/api/grpc/v1"
  6. "go-common/app/service/live/xuserex/conf"
  7. "go-common/library/cache/memcache"
  8. "bytes"
  9. "encoding/json"
  10. "fmt"
  11. "go-common/app/service/live/resource/sdk"
  12. "go-common/app/service/live/xuserex/model/roomNotice"
  13. "go-common/library/database/hbase.v2"
  14. "go-common/library/ecode"
  15. "go-common/library/log"
  16. bm "go-common/library/net/http/blademaster"
  17. "go-common/library/stat/prom"
  18. "strconv"
  19. "time"
  20. )
  21. // Dao dao
  22. type Dao struct {
  23. c *conf.Config
  24. mc *memcache.Pool
  25. // acc rpc
  26. client *bm.Client
  27. hbase *hbase.Client
  28. xuser *v1.Client
  29. }
  30. type guardGuideConf struct {
  31. Open int64 `json:"open"`
  32. Threshold int64 `json:"threshold"`
  33. }
  34. const (
  35. // 缓存过期时间
  36. keyShouldNoticeExpire = 3600
  37. keyNoticePre = "kn_v1_%d_%d_%s"
  38. // HBaseMonthlyConsumeTable .
  39. HBaseMonthlyConsumeTable = "livemonthconsume"
  40. // BuyGuardGuideTitanKey .
  41. BuyGuardGuideTitanKey = "buy_guard_guide"
  42. // TaskFinishKey .
  43. TaskFinishKey = "task_finish_%s"
  44. )
  45. // New init mysql db
  46. func New(c *conf.Config) (dao *Dao) {
  47. dao = &Dao{
  48. c: c,
  49. mc: memcache.NewPool(c.Memcache),
  50. client: bm.NewClient(c.BMClient),
  51. hbase: hbase.NewClient(c.HBase),
  52. }
  53. conn, err := v1.NewClient(c.Warden)
  54. if err != nil {
  55. panic(err)
  56. }
  57. dao.xuser = conn
  58. return
  59. }
  60. func keyShouldNotice(UID int64, targetID int64, date string) string {
  61. return fmt.Sprintf(keyNoticePre, UID, targetID, date)
  62. }
  63. // IsNotice returns whether should pop a purchase notice.
  64. func (dao *Dao) IsNotice(c context.Context, UID int64, targetID int64) (*v1pb.RoomNoticeBuyGuardResp, error) {
  65. term := dao.GetTermBegin()
  66. begin := term.Unix()
  67. end := dao.GetTermEnd()
  68. resp := &v1pb.RoomNoticeBuyGuardResp{
  69. Begin: begin,
  70. End: end.Unix(),
  71. Now: time.Now().Unix(),
  72. Title: "感谢支持主播",
  73. Content: "成为船员为主播保驾护航吧~",
  74. Button: "开通大航海",
  75. }
  76. shouldNotice, err := dao.getShouldNotice(c, UID, targetID, term)
  77. if err != nil {
  78. log.Error("dao getShouldNotice uid(%v)roomid(%v)term(%v) error(%v)", UID, targetID, term.Format("2006-01-02"), err)
  79. err = nil
  80. return resp, err
  81. }
  82. resp.ShouldNotice = int64(shouldNotice)
  83. return resp, nil
  84. }
  85. //go:generate $GOPATH/src/go-common/app/tool/cache/gen
  86. type _cache interface {
  87. // cache: -sync=true -nullcache=&roomNotice.MonthConsume{Amount:-1} -check_null_code=$.Amount==-1
  88. MonthConsume(c context.Context, UID int64, targetID int64, date string) (*roomNotice.MonthConsume, error)
  89. }
  90. //go:generate $GOPATH/src/go-common/app/tool/cache/mc
  91. type _mc interface {
  92. // 获取某个月消费
  93. // mc: -key=keyShouldNotice
  94. CacheMonthConsume(c context.Context, UID int64, targetID int64, date string) (*roomNotice.MonthConsume, error)
  95. // 保存获取某个月消费
  96. // mc: -key=keyShouldNotice -expire=d.keyShouldNoticeExpire -encode=json|gzip
  97. AddCacheMonthConsume(c context.Context, UID int64, targetID int64, date string, value *roomNotice.MonthConsume) error
  98. }
  99. func (dao *Dao) getThreshold() (threshold *guardGuideConf, err error) {
  100. threshold = &guardGuideConf{}
  101. guideConf, err := titansSdk.Get(BuyGuardGuideTitanKey)
  102. log.Info("getThreshold_key(%v) conf(%+v)", BuyGuardGuideTitanKey, guideConf)
  103. if err != nil {
  104. log.Error("getThreshold(%v) error(%v)", BuyGuardGuideTitanKey, err)
  105. return
  106. }
  107. if "" == guideConf {
  108. return
  109. }
  110. if err = json.Unmarshal([]byte(guideConf), threshold); err != nil {
  111. log.Error("json Unmarshal guideconf(%+v) error(%v)", guideConf, err)
  112. return
  113. }
  114. log.Info("getThreshold_unmarshal_succ key(%v) conf (%v) Threshold(%+v)", BuyGuardGuideTitanKey, guideConf, threshold)
  115. return
  116. }
  117. func (dao *Dao) getShouldNotice(ctx context.Context, UID int64, targetID int64, term time.Time) (shouldNotice int, err error) {
  118. shouldNotice = 0
  119. taskFinish, err := dao.GetTaskFinish(ctx, term)
  120. if err != nil {
  121. return shouldNotice, err
  122. }
  123. if !taskFinish {
  124. log.Info("task_not_finish")
  125. return shouldNotice, err
  126. }
  127. // 获取配置的收入门槛
  128. threshold, err := dao.getThreshold()
  129. if err != nil {
  130. log.Error("get_threshold_error(%v)", err)
  131. return
  132. }
  133. if nil == threshold {
  134. log.Error("get_threshold_nil")
  135. return
  136. }
  137. if 0 == threshold.Open {
  138. log.Info("guard_guide not Open (%+v)", threshold)
  139. return
  140. }
  141. monthConsume, err := dao.MonthConsume(ctx, UID, targetID, dao.termToString(term))
  142. log.Info("get_monthConsume(%+v) Threshold (%+v)", monthConsume, threshold)
  143. if err != nil {
  144. log.Error("get_monthConsum_err uid(%d) targetid(%v) term (%v) error(%v)", UID, targetID, term, err)
  145. return
  146. }
  147. if nil == monthConsume {
  148. return
  149. }
  150. if int64(monthConsume.Amount) >= threshold.Threshold*1000 { // coin to rmb
  151. isGuard, err := dao.isGuard(ctx, UID, targetID)
  152. log.Info("show guard guide uid(%v) target (%v) guard (%v) Threshold (%v)", UID, targetID, isGuard, threshold)
  153. if err != nil {
  154. log.Error("get gaurd UID(%v) targetid (%v) error(%v)", UID, targetID, err)
  155. return shouldNotice, err
  156. }
  157. if !isGuard {
  158. shouldNotice = 1
  159. }
  160. }
  161. return
  162. }
  163. func hbaseRowKey(UID int64, targetID int64, date string) string {
  164. return fmt.Sprintf("%s_%d_%d", date, UID, targetID)
  165. }
  166. //RawMonthConsume get month consume from hbase
  167. func (dao *Dao) RawMonthConsume(ctx context.Context, UID int64, targetID int64, date string) (res *roomNotice.MonthConsume, err error) {
  168. var (
  169. tableName = HBaseMonthlyConsumeTable
  170. key = hbaseRowKey(UID, targetID, date)
  171. )
  172. result, err := dao.hbase.GetStr(ctx, tableName, key)
  173. log.Info("RawMonthConsume_getstr tableName (%v) key (%v) res (%v)", tableName, key, result)
  174. if err != nil {
  175. log.Error("dao.hbase.GetStr tableName(%s)|mid(%d)|key(%v)|error(%v)", tableName, UID, key, err)
  176. err = ecode.CreativeDataErr
  177. return
  178. }
  179. if result == nil {
  180. return
  181. }
  182. res = &roomNotice.MonthConsume{}
  183. for _, c := range result.Cells {
  184. if c == nil {
  185. continue
  186. }
  187. v, _ := strconv.ParseInt(string(c.Value[:]), 10, 64)
  188. if !bytes.Equal(c.Family, []byte("info")) {
  189. log.Error("family_type_err(%v) error", c.Family)
  190. continue
  191. }
  192. switch {
  193. case bytes.Equal(c.Qualifier, []byte("uid")):
  194. res.Uid = v
  195. case bytes.Equal(c.Qualifier, []byte("ruid")):
  196. res.Ruid = v
  197. case bytes.Equal(c.Qualifier, []byte("amount")):
  198. res.Amount = v
  199. case bytes.Equal(c.Qualifier, []byte("time")):
  200. res.Date = v
  201. }
  202. }
  203. log.Info("RawMonthConsume_succ uid (%v) target (%v) date (%v) res (%+v)", UID, targetID, date, res)
  204. return
  205. }
  206. // GetTermBegin return first day of last month
  207. func (dao *Dao) GetTermBegin() time.Time {
  208. year, month, _ := time.Now().Date()
  209. thisMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local)
  210. return thisMonth.AddDate(0, -1, 0)
  211. }
  212. // GetTermEnd returns last second of last month
  213. func (dao *Dao) GetTermEnd() time.Time {
  214. year, month, _ := time.Now().Date()
  215. thisMonth := time.Date(year, month, 1, 0, 0, 0, 0, time.Local)
  216. second, _ := time.ParseDuration("-1s")
  217. return thisMonth.Add(second)
  218. }
  219. // IsValidTerm returns whether a term is valid.
  220. func (dao *Dao) IsValidTerm(term time.Time) bool {
  221. return true
  222. }
  223. // TOOD
  224. func (dao *Dao) isGuard(c context.Context, UID int64, targetID int64) (isGuard bool, err error) {
  225. ret, err := dao.xuser.GetByUIDTargetID(c, &v1.GetByUidTargetIdReq{
  226. Uid: UID,
  227. TargetId: targetID,
  228. })
  229. log.Info("dao.xuser.GetByUIDTargetID uid (%v) target (%v) res (%v)", UID, targetID, ret)
  230. if err != nil {
  231. log.Error("get guard uid (%v) target (%v) error(%v)", UID, targetID, err)
  232. return
  233. }
  234. if nil == ret || nil == ret.Data || 0 == len(ret.Data) {
  235. log.Info("not_guard uid (%v) target (%v) res (%v)", UID, targetID, ret)
  236. return
  237. }
  238. isGuard = true
  239. return
  240. }
  241. // GetTaskFinish .
  242. func (dao *Dao) GetTaskFinish(c context.Context, term time.Time) (isOn bool, err error) {
  243. conn := dao.mc.Get(c)
  244. defer conn.Close()
  245. key := dao.keyTaskFinish(term)
  246. reply, err := conn.Get(key)
  247. log.Info("GetTaskFinish key (%v) term (%v)", key, term)
  248. if err != nil {
  249. if err == memcache.ErrNotFound {
  250. log.Info("GetTaskFinish_not_found key (%v) term (%v)", key, term)
  251. err = nil
  252. return
  253. }
  254. prom.BusinessErrCount.Incr("mc:GetTaskFinish")
  255. log.Error("GetTaskFinish_fail key(%v) error(%v)", key, err)
  256. return
  257. }
  258. res := &roomNotice.TaskFinish{}
  259. err = conn.Scan(reply, &res)
  260. if err != nil {
  261. prom.BusinessErrCount.Incr("mc:GetTaskFinish")
  262. log.Error("GetTaskFinish_fail_scan key(%v) error(%v)", key, err)
  263. return
  264. }
  265. log.Info("GetTaskFinish_succ key (%v) term (%v) res(%+v)", key, term, res)
  266. if res == nil {
  267. return
  268. }
  269. if 1 == res.Finish {
  270. isOn = true
  271. }
  272. return
  273. }
  274. // SetTaskFinish .
  275. func (dao *Dao) SetTaskFinish(c context.Context, term time.Time, isFinish int64) (err error) {
  276. conn := dao.mc.Get(c)
  277. defer conn.Close()
  278. key := dao.keyTaskFinish(term)
  279. value := &roomNotice.TaskFinish{
  280. Finish: isFinish,
  281. }
  282. log.Info("SetTaskFinish key (%v) term (%v) value (%+v)", key, term, value)
  283. item := &memcache.Item{
  284. Key: key,
  285. Object: value,
  286. Flags: memcache.FlagJSON,
  287. Expiration: 0,
  288. }
  289. if err = conn.Set(item); err != nil {
  290. prom.BusinessErrCount.Incr("mc:SetTaskFinish")
  291. log.Error("SetTaskFinish_fail key(%v) value (%+v) error(%v)", key, value, err)
  292. return
  293. }
  294. log.Info("SetTaskFinish_succ key (%v) term (%v) value (%+v)", key, term, value)
  295. return
  296. }
  297. func (dao *Dao) termToString(term time.Time) string {
  298. return term.Format("20060102")
  299. }
  300. func (dao *Dao) keyTaskFinish(term time.Time) (key string) {
  301. return fmt.Sprintf(TaskFinishKey, dao.termToString(term))
  302. }