activity.go 7.0 KB


  1. package service
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/csv"
  7. "fmt"
  8. "io/ioutil"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "go-common/app/admin/main/coupon/model"
  14. "go-common/library/database/sql"
  15. "go-common/library/ecode"
  16. "go-common/library/log"
  17. "go-common/library/net/metadata"
  18. xtime "go-common/library/time"
  19. "go-common/library/xstr"
  20. "github.com/pkg/errors"
  21. )
  22. // ActivitySalaryCoupon activity salary coupon.
  23. func (s *Service) ActivitySalaryCoupon(c context.Context, req *model.ArgBatchSalaryCoupon) (err error) {
  24. var (
  25. ok bool
  26. r *model.CouponBatchInfo
  27. )
  28. if r, err = s.dao.BatchInfo(c, req.BranchToken); err != nil {
  29. return
  30. }
  31. if r == nil {
  32. return ecode.CouPonBatchNotExistErr
  33. }
  34. if r.State != model.BatchStateNormal {
  35. return ecode.CouPonHadBlockErr
  36. }
  37. if r.ExpireDay != -1 {
  38. return ecode.CouponTypeNotSupportErr
  39. }
  40. if ok = s.dao.AddGrantUniqueLock(c, req.BranchToken, _lockseconds); !ok {
  41. return ecode.CouponSalaryHadRunErr
  42. }
  43. // mids index
  44. s.runSalary(metadata.String(c, metadata.RemoteIP), r, req)
  45. return
  46. }
  47. // runSalary run batch salary.
  48. func (s *Service) runSalary(ip string, b *model.CouponBatchInfo, req *model.ArgBatchSalaryCoupon) (err error) {
  49. go func() {
  50. var (
  51. err error
  52. fails []int64
  53. tmp []int64
  54. mids = []int64{}
  55. total int64
  56. midmap map[int64][]int64
  57. c = context.Background()
  58. )
  59. defer func() {
  60. if x := recover(); x != nil {
  61. x = errors.WithStack(x.(error))
  62. log.Error("runtime error caught: %+v", x)
  63. }
  64. //Write fail mids
  65. if len(fails) > 0 {
  66. s.OutFile(c, []byte(xstr.JoinInts(fails)), req.BranchToken+"_failmids.csv")
  67. }
  68. }()
  69. log.Info("batch salary analysis slary file[req(%v)]", req)
  70. // analysis slary file
  71. if midmap, total, err = s.AnalysisFile(c, req.FileURL); err != nil {
  72. return
  73. }
  74. if total != req.Count {
  75. log.Error("[batch salary count err req(%v),filetotal(%d)]", req, total)
  76. s.dao.DelGrantUniqueLock(c, b.BatchToken)
  77. return
  78. }
  79. log.Info("batch salary start[count(%d),token(%s)]", total, req.BranchToken)
  80. var crrent int
  81. for _, allmids := range midmap {
  82. for i, v := range allmids {
  83. mids = append(mids, v)
  84. if len(mids)%req.SliceSize != 0 && i != len(allmids)-1 {
  85. continue
  86. }
  87. crrent = crrent + len(mids)
  88. log.Info("batch salary ing[mid(%d),i(%d),token(%s),crrent(%d)]", v, i, req.BranchToken, crrent)
  89. if tmp, err = s.batchSalary(c, mids, ip, b); err != nil && len(tmp) > 0 {
  90. fails = append(fails, tmp...)
  91. log.Error("batch salary err[mids(%v),i(%d),token(%s)] err(%v)", mids, i, req.BranchToken, err)
  92. }
  93. mids = []int64{}
  94. }
  95. }
  96. log.Info("batch salary end[count(%d),token(%s),faillen(%d)]", total, req.BranchToken, len(fails))
  97. }()
  98. return
  99. }
  100. // batchSalary batch salary.
  101. func (s *Service) batchSalary(c context.Context, mids []int64, ip string, b *model.CouponBatchInfo) (falls []int64, err error) {
  102. var (
  103. tx *sql.Tx
  104. aff int64
  105. )
  106. if tx, err = s.dao.BeginTran(c); err != nil {
  107. return
  108. }
  109. defer func() {
  110. if err != nil {
  111. falls = mids
  112. if err1 := tx.Rollback(); err1 != nil {
  113. log.Error("batch salary rollback: %+v", err1)
  114. }
  115. return
  116. }
  117. if err = tx.Commit(); err != nil {
  118. falls = mids
  119. log.Error("batch salary commit: %+v", err)
  120. tx.Rollback()
  121. return
  122. }
  123. //del cache
  124. s.cache.Do(c, func(c context.Context) {
  125. for _, v := range mids {
  126. if err = s.dao.DelCouponAllowancesKey(context.Background(), v, model.NotUsed); err != nil {
  127. log.Error("batch salary cache del(%d) err: %+v", v, err)
  128. }
  129. }
  130. })
  131. //send msg
  132. if s.c.Prop.SalaryMsgOpen {
  133. s.sendMsg(mids, ip, 1, true, _msgMeng)
  134. }
  135. time.Sleep(time.Duration(s.c.Prop.SalarySleepTime))
  136. }()
  137. cps := make([]*model.CouponAllowanceInfo, len(mids))
  138. for i, v := range mids {
  139. cps[i] = &model.CouponAllowanceInfo{
  140. CouponToken: s.tokeni(i),
  141. Mid: v,
  142. State: model.NotUsed,
  143. StartTime: b.StartTime,
  144. ExpireTime: b.ExpireTime,
  145. Origin: model.AllowanceSystemAdmin,
  146. CTime: xtime.Time(time.Now().Unix()),
  147. BatchToken: b.BatchToken,
  148. Amount: b.Amount,
  149. FullAmount: b.FullAmount,
  150. AppID: b.AppID,
  151. }
  152. }
  153. // mid should be same index.
  154. if aff, err = s.dao.BatchAddAllowanceCoupon(c, tx, cps); err != nil {
  155. return
  156. }
  157. if len(mids) != int(aff) {
  158. err = fmt.Errorf("batch salary midslen(%d) != aff(%d)", len(mids), aff)
  159. return
  160. }
  161. if _, err = s.dao.UpdateBatchInfo(c, tx, b.BatchToken, len(mids)); err != nil {
  162. return
  163. }
  164. return
  165. }
  166. // AnalysisFile analysis slary file.
  167. func (s *Service) AnalysisFile(c context.Context, fileURL string) (midmap map[int64][]int64, total int64, err error) {
  168. cntb, err := ioutil.ReadFile(fileURL)
  169. if err != nil {
  170. return
  171. }
  172. var (
  173. mid int64
  174. records [][]string
  175. )
  176. r := csv.NewReader(strings.NewReader(string(cntb)))
  177. records, err = r.ReadAll()
  178. if err != nil {
  179. err = errors.WithStack(err)
  180. return
  181. }
  182. midmap = make(map[int64][]int64, s.c.Prop.AllowanceTableCount)
  183. for i, v := range records {
  184. if len(v) <= 0 {
  185. continue
  186. }
  187. if mid, err = strconv.ParseInt(v[0], 10, 64); err != nil {
  188. err = errors.Wrapf(err, "read csv line err(%d,%v)", i, v)
  189. break
  190. }
  191. index := mid % s.c.Prop.AllowanceTableCount
  192. if midmap[index] == nil {
  193. midmap[index] = []int64{}
  194. }
  195. midmap[index] = append(midmap[index], mid)
  196. total++
  197. }
  198. return
  199. }
  200. // OutFile out file.
  201. func (s *Service) OutFile(c context.Context, bs []byte, url string) (err error) {
  202. var outFile *os.File
  203. outFile, err = os.Create(url)
  204. if err != nil {
  205. os.Exit(1)
  206. }
  207. defer outFile.Close()
  208. b := bufio.NewWriter(outFile)
  209. _, err = b.Write(bs)
  210. if err != nil {
  211. os.Exit(1)
  212. }
  213. err = b.Flush()
  214. if err != nil {
  215. os.Exit(1)
  216. }
  217. return
  218. }
  219. // get coupon tokeni
  220. func (s *Service) tokeni(i int) string {
  221. var b bytes.Buffer
  222. b.WriteString(fmt.Sprintf("%05d", i))
  223. b.WriteString(fmt.Sprintf("%02d", s.r.Int63n(99)))
  224. b.WriteString(fmt.Sprintf("%03d", time.Now().UnixNano()/1e6%1000))
  225. b.WriteString(time.Now().Format("20060102150405"))
  226. return b.String()
  227. }
  228. func (s *Service) sendMsg(mids []int64, ip string, times int, b bool, msgType string) (err error) {
  229. msg, ok := msgMap[msgType]
  230. if !ok {
  231. log.Warn("sendMsg not support msgType(%s)", msgType)
  232. return
  233. }
  234. for i := 1; i <= times; i++ {
  235. if err = s.dao.SendMessage(context.Background(), xstr.JoinInts(mids), msg["title"], msg["content"], ip); err != nil {
  236. if i != times {
  237. continue
  238. }
  239. if b {
  240. log.Error("batch salary msg send mids(%v) err: %+v", mids, err)
  241. }
  242. }
  243. break
  244. }
  245. return
  246. }
  247. const (
  248. _msgMeng = "meng"
  249. )
  250. var (
  251. msgMap = map[string]map[string]string{
  252. "vip": {
  253. "title": "大会员代金券到账通知",
  254. "content": "大会员代金券已到账,快到“我的代金券”看看吧!IOS端需要在网页使用。#{\"传送门→\"}{\"https://account.bilibili.com/account/big/voucher\"}",
  255. },
  256. "meng": {
  257. "title": "大会员代金券到账提醒",
  258. "content": "您的专属大会员代金券即将在10月10日0点到账,年费半价基础上再享折上折,请注意查收!代金券有效期1天,暂不支持苹果支付 #{\"点击查看详情\"}{\"https://www.bilibili.com/read/cv1291213\"}",
  259. },
  260. }
  261. )