initial.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package service
  2. // import (
  3. // "context"
  4. // "sync"
  5. // "time"
  6. // "go-common/library/log"
  7. // )
  8. // func (s *Service) initialproc(ctx context.Context) {
  9. // dataWg := sync.WaitGroup{}
  10. // workerWg := sync.WaitGroup{}
  11. // jobQueue := make(chan func(), 4096)
  12. // initialWorker := func() {
  13. // worker := uint64(50)
  14. // if s.c.AccountSummary.InitialWriteWorker > 0 {
  15. // worker = s.c.AccountSummary.InitialWriteWorker
  16. // }
  17. // log.Info("Start %d initial write worker", worker)
  18. // for i := uint64(0); i < worker; i++ {
  19. // workerWg.Add(1)
  20. // go func() {
  21. // defer workerWg.Done()
  22. // for job := range jobQueue {
  23. // job()
  24. // }
  25. // }()
  26. // }
  27. // }
  28. // initialWorker()
  29. // initBase := func() {
  30. // log.Info("Start to initial member base")
  31. // defer dataWg.Done()
  32. // baseCh := s.dao.AllMemberBase(ctx)
  33. // for chunk := range baseCh {
  34. // for _, b := range chunk {
  35. // b := b
  36. // jobQueue <- func() {
  37. // if err := s.SyncToHBase(ctx, b); err != nil {
  38. // log.Error("Failed to sync member base in initial process: base: %+v: %+v", b, err)
  39. // }
  40. // }
  41. // }
  42. // }
  43. // }
  44. // initExp := func() {
  45. // log.Info("Start to initial member exp")
  46. // defer dataWg.Done()
  47. // expCh := s.dao.AllMemberExp(ctx)
  48. // for chunk := range expCh {
  49. // for _, e := range chunk {
  50. // e := e
  51. // jobQueue <- func() {
  52. // if err := s.SyncToHBase(ctx, e); err != nil {
  53. // log.Error("Failed to sync member exp in initial process: exp: %+v: %+v", e, err)
  54. // }
  55. // }
  56. // }
  57. // time.Sleep(time.Second)
  58. // }
  59. // }
  60. // initOfficial := func() {
  61. // log.Info("Start to initial member official")
  62. // defer dataWg.Done()
  63. // official, err := s.dao.AllOfficial(ctx)
  64. // if err != nil {
  65. // log.Error("Failed to get all member official: %+v", err)
  66. // return
  67. // }
  68. // for _, o := range official {
  69. // o := o
  70. // jobQueue <- func() {
  71. // if err := s.SyncToHBase(ctx, o); err != nil {
  72. // log.Error("Failed to sync member official in initial process: official: %+v: %+v", o, err)
  73. // }
  74. // }
  75. // }
  76. // }
  77. // initStat := func() {
  78. // log.Info("Start to initial relation stat")
  79. // defer dataWg.Done()
  80. // statsCh := s.dao.AllRelationStat(ctx)
  81. // for chunk := range statsCh {
  82. // for _, stat := range chunk {
  83. // stat := stat
  84. // jobQueue <- func() {
  85. // if err := s.SyncToHBase(ctx, stat); err != nil {
  86. // log.Error("Failed to sync relation stat in initial process: stat: %+v: %+v", stat, err)
  87. // }
  88. // }
  89. // }
  90. // time.Sleep(time.Second)
  91. // }
  92. // }
  93. // if s.c.FeatureGate.InitialMemberBase {
  94. // dataWg.Add(1)
  95. // go initBase()
  96. // }
  97. // if s.c.FeatureGate.InitialMemberExp {
  98. // dataWg.Add(1)
  99. // go initExp()
  100. // }
  101. // if s.c.FeatureGate.InitialMemberOfficial {
  102. // dataWg.Add(1)
  103. // go initOfficial()
  104. // }
  105. // if s.c.FeatureGate.InitialRelationStat {
  106. // dataWg.Add(1)
  107. // go initStat()
  108. // }
  109. // dataWg.Wait()
  110. // close(jobQueue) // all job is enqueued
  111. // workerWg.Wait()
  112. // }