up_account.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package income
  2. import (
  3. "bytes"
  4. "context"
  5. "strconv"
  6. incomeD "go-common/app/job/main/growup/dao/income"
  7. model "go-common/app/job/main/growup/model/income"
  8. )
  9. // UpAccountSvr up account service
  10. type UpAccountSvr struct {
  11. batchSize int
  12. dao *incomeD.Dao
  13. }
  14. // NewUpAccountSvr new up account service
  15. func NewUpAccountSvr(dao *incomeD.Dao, batchSize int) (svr *UpAccountSvr) {
  16. return &UpAccountSvr{
  17. batchSize: batchSize,
  18. dao: dao,
  19. }
  20. }
  21. // UpAccount get up account
  22. func (s *UpAccountSvr) UpAccount(c context.Context, limit int64) (m map[int64]*model.UpAccount, err error) {
  23. var id int64
  24. m = make(map[int64]*model.UpAccount)
  25. for {
  26. var um map[int64]*model.UpAccount
  27. um, id, err = s.dao.UpAccounts(c, id, limit)
  28. if err != nil {
  29. return
  30. }
  31. if len(um) == 0 {
  32. break
  33. }
  34. for mid, acc := range um {
  35. m[mid] = acc
  36. }
  37. }
  38. return
  39. }
  40. // BatchInsertUpAccount batch insert up account
  41. func (s *UpAccountSvr) BatchInsertUpAccount(c context.Context, us map[int64]*model.UpAccount) (err error) {
  42. var (
  43. buff = make([]*model.UpAccount, s.batchSize)
  44. buffEnd = 0
  45. )
  46. for _, u := range us {
  47. if u.DataState != 1 {
  48. continue
  49. }
  50. buff[buffEnd] = u
  51. buffEnd++
  52. if buffEnd >= s.batchSize {
  53. values := upAccountValues(buff[:buffEnd])
  54. buffEnd = 0
  55. _, err = s.dao.InsertUpAccount(c, values)
  56. if err != nil {
  57. return
  58. }
  59. }
  60. }
  61. if buffEnd > 0 {
  62. values := upAccountValues(buff[:buffEnd])
  63. buffEnd = 0
  64. _, err = s.dao.InsertUpAccount(c, values)
  65. }
  66. return
  67. }
  68. // UpdateUpAccount update up account
  69. func (s *UpAccountSvr) UpdateUpAccount(c context.Context, us map[int64]*model.UpAccount) (err error) {
  70. for _, u := range us {
  71. if u.DataState != 2 {
  72. continue
  73. }
  74. var time int
  75. for {
  76. var rows int64
  77. rows, err = s.dao.UpdateUpAccount(c, u.MID, u.Version, u.TotalIncome, u.TotalUnwithdrawIncome)
  78. if err != nil {
  79. return
  80. }
  81. time++
  82. if rows > 0 {
  83. break
  84. }
  85. if time >= 10 {
  86. break
  87. }
  88. s.reload(c, u)
  89. }
  90. }
  91. return
  92. }
  93. func (s *UpAccountSvr) reload(c context.Context, upAccount *model.UpAccount) (err error) {
  94. result, err := s.dao.UpAccount(c, upAccount.MID)
  95. if err != nil {
  96. return
  97. }
  98. upAccount.TotalIncome = result.TotalIncome
  99. upAccount.TotalUnwithdrawIncome = result.TotalUnwithdrawIncome
  100. upAccount.Version = result.Version
  101. upAccount.WithdrawDateVersion = result.WithdrawDateVersion
  102. return
  103. }
  104. func upAccountValues(us []*model.UpAccount) (values string) {
  105. var buf bytes.Buffer
  106. for _, u := range us {
  107. buf.WriteString("(")
  108. buf.WriteString(strconv.FormatInt(u.MID, 10))
  109. buf.WriteByte(',')
  110. buf.WriteString(strconv.Itoa(u.HasSignContract))
  111. buf.WriteByte(',')
  112. buf.WriteString(strconv.FormatInt(u.TotalIncome, 10))
  113. buf.WriteByte(',')
  114. buf.WriteString(strconv.FormatInt(u.TotalUnwithdrawIncome, 10))
  115. buf.WriteByte(',')
  116. buf.WriteString("'" + u.WithdrawDateVersion + "'")
  117. buf.WriteByte(',')
  118. buf.WriteString(strconv.FormatInt(u.Version, 10))
  119. buf.WriteString(")")
  120. buf.WriteByte(',')
  121. }
  122. if buf.Len() > 0 {
  123. buf.Truncate(buf.Len() - 1)
  124. }
  125. values = buf.String()
  126. buf.Reset()
  127. return
  128. }