init_cloud.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "io/ioutil"
  6. "os"
  7. "strconv"
  8. "time"
  9. "go-common/app/job/main/passport-game-data/conf"
  10. "go-common/app/job/main/passport-game-data/dao"
  11. "go-common/app/job/main/passport-game-data/model"
  12. "go-common/library/log"
  13. )
  14. const (
  15. _defaultInitCloudOffsetFilePath = "/data/passport-game-data-job.initcloud.offset"
  16. _defaultInitCloudSleep = time.Second
  17. )
  18. type initCloudConfig struct {
  19. OffsetFilePath string
  20. UseOldOffset bool
  21. Start, End int64
  22. Batch int
  23. Sleep time.Duration
  24. }
  25. func newInitCloudConfigFrom(c *conf.Config) (ic *initCloudConfig) {
  26. ic = &initCloudConfig{
  27. OffsetFilePath: c.InitCloud.OffsetFilePath,
  28. UseOldOffset: c.InitCloud.UseOldOffset,
  29. Start: c.InitCloud.Start,
  30. End: c.InitCloud.End,
  31. Batch: c.InitCloud.Batch,
  32. Sleep: time.Duration(c.InitCloud.Sleep),
  33. }
  34. ic.fix()
  35. if ic.UseOldOffset {
  36. data, err := ioutil.ReadFile(ic.OffsetFilePath)
  37. if err != nil {
  38. log.Error("failed to read old offset, skip")
  39. return
  40. }
  41. oldOffset, err := strconv.ParseInt(string(data), 10, 64)
  42. if err != nil {
  43. log.Error("failed to parse offset, strconv.ParseInt(%s, 10, 64)", string(data), err)
  44. return
  45. }
  46. if oldOffset > 0 {
  47. ic.Start = oldOffset
  48. }
  49. }
  50. return
  51. }
  52. func (ic *initCloudConfig) fix() {
  53. if len(ic.OffsetFilePath) == 0 {
  54. ic.OffsetFilePath = _defaultInitCloudOffsetFilePath
  55. }
  56. if ic.Start < 0 {
  57. ic.Start = 0
  58. }
  59. if ic.End < 0 {
  60. ic.End = 0
  61. }
  62. if ic.Batch <= 0 {
  63. ic.Batch = _defaultBatchSize
  64. }
  65. if int64(ic.Sleep) < 0 {
  66. ic.Sleep = _defaultInitCloudSleep
  67. }
  68. }
  69. // NewInitCloud new a service for initiating cloud.
  70. func NewInitCloud(c *conf.Config) (s *Service) {
  71. s = &Service{
  72. c: c,
  73. d: dao.New(c),
  74. ic: newInitCloudConfigFrom(c),
  75. }
  76. return
  77. }
  78. // InitCloud init cloud.
  79. func (s *Service) InitCloud(c context.Context) {
  80. var err error
  81. ic := s.ic
  82. dstFile, err := os.Create(ic.OffsetFilePath)
  83. if err != nil {
  84. log.Error("failed to open file %s, error(%v)", ic.OffsetFilePath, err)
  85. return
  86. }
  87. defer dstFile.Close()
  88. for i := ic.Start; i <= ic.End; {
  89. time.Sleep(ic.Sleep)
  90. if err = ioutil.WriteFile(ic.OffsetFilePath, []byte(strconv.FormatInt(i, 10)), os.ModeAppend); err != nil {
  91. log.Error("failed to record offset, offsetFilePath: %s, offset: %d, error(%v)", ic.OffsetFilePath, i, err)
  92. continue
  93. }
  94. st := i
  95. ed := i + int64(ic.Batch)
  96. if ed > ic.End {
  97. ed = ic.End
  98. }
  99. mids := make([]int64, 0)
  100. for j := st; j <= ed; j++ {
  101. mids = append(mids, j)
  102. }
  103. var as []*model.OriginAsoAccount
  104. if as, err = s.d.AsoAccountsLocal(c, mids); err != nil {
  105. log.Error("failed to get local aso accounts by mids, service.dao.AsoAccountsLocal(%v) error(%v)", mids, err)
  106. continue
  107. }
  108. cloudAs := make([]*model.AsoAccount, 0)
  109. for _, a := range as {
  110. cloudAs = append(cloudAs, model.Default(a))
  111. }
  112. if err = s.d.AddAsoAccountsCloud(c, cloudAs); err != nil {
  113. str, _ := json.Marshal(cloudAs)
  114. log.Error("failed to add aso accounts to cloud, service.dao.AddAsoAccountsCloud(%v) error(%v)", str, err)
  115. continue
  116. }
  117. i += int64(ic.Batch)
  118. }
  119. }