task.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package service
  2. import (
  3. "context"
  4. "go-common/app/job/main/videoup-report/model/archive"
  5. "go-common/library/log"
  6. "sort"
  7. "time"
  8. )
  9. func (s *Service) loadTask() {
  10. var (
  11. err error
  12. took *archive.TaskTook
  13. tooks []*archive.TaskTook
  14. tasks []*archive.Task
  15. )
  16. s.taskCache.Lock()
  17. defer s.taskCache.Unlock()
  18. if len(s.taskCache.Took) == 0 && len(s.taskCache.Task) == 0 {
  19. if took, err = s.arc.TaskTookByHalfHour(context.TODO()); err != nil {
  20. log.Error("s.arc.TaskTookByHalfHour error(%v)", err)
  21. return
  22. }
  23. if took != nil {
  24. if tooks, err = s.arc.TaskTooks(context.TODO(), took.Ctime); err != nil {
  25. log.Error("s.arc.TaskTooks(%v) error(%v)", took.Ctime, err)
  26. return
  27. }
  28. s.taskCache.Took = tooks
  29. }
  30. if tasks, err = s.arc.TaskByUntreated(context.TODO()); err != nil {
  31. log.Error("s.arc.TaskByUntreated() error(%v)", err)
  32. return
  33. }
  34. } else {
  35. var tasksOrig, tasksDone []*archive.Task
  36. if tasksOrig, err = s.arc.TaskByMtime(context.TODO(), s.taskCache.Mtime.Add(-time.Minute*1)); err != nil {
  37. log.Error("s.arc.TaskByMtime(%v) error(%v)", s.taskCache.Mtime, err)
  38. return
  39. }
  40. if tasksDone, err = s.arc.TaskDoneByMtime(context.TODO(), s.taskCache.Mtime.Add(-time.Minute*1)); err != nil {
  41. log.Error("s.arc.TaskDoneByMtime(%v) error(%v)", s.taskCache.Mtime, err)
  42. return
  43. }
  44. tasks = make([]*archive.Task, len(tasksOrig)+len(tasksDone))
  45. copy(tasks, tasksOrig)
  46. copy(tasks[len(tasksOrig):], tasksDone)
  47. }
  48. for _, task := range tasks {
  49. _, ok := s.taskCache.Task[task.ID]
  50. if ok && (task.State != archive.TaskStateUnclaimed && task.State != archive.TaskStateUntreated) {
  51. delete(s.taskCache.Task, task.ID)
  52. } else if task.State == archive.TaskStateUnclaimed || task.State == archive.TaskStateUntreated {
  53. s.taskCache.Task[task.ID] = task
  54. }
  55. }
  56. }
  57. func (s *Service) loadTaskTookSort() {
  58. var (
  59. took int
  60. tooks []int
  61. taskMinCtime *archive.Task
  62. )
  63. s.taskCache.Lock()
  64. defer s.taskCache.Unlock()
  65. for _, task := range s.taskCache.Task {
  66. if (s.taskCache.Mtime == time.Time{} || s.taskCache.Mtime.Unix() < task.Mtime.Unix()) {
  67. s.taskCache.Mtime = task.Mtime
  68. }
  69. if taskMinCtime == nil || taskMinCtime.Ctime.Unix() > task.Ctime.Unix() {
  70. taskMinCtime = task
  71. }
  72. took = int(time.Now().Unix() - task.Ctime.Unix())
  73. tooks = append(tooks, took)
  74. }
  75. if len(tooks) == 0 {
  76. return
  77. }
  78. sort.Ints(tooks)
  79. s.taskCache.Sort = tooks
  80. log.Info("s.loadTaskTookSort() 本轮统计: 耗时最久id(%d) ctime(%v)", taskMinCtime.ID, taskMinCtime.Ctime)
  81. }
  82. func (s *Service) hdlTaskTook() (lastID int64, err error) {
  83. s.taskCache.Lock()
  84. defer s.taskCache.Unlock()
  85. var (
  86. spacing float32
  87. m50 float32
  88. m50Index float32
  89. m50IndexPoint float32
  90. m50Value int
  91. m60 float32
  92. m60Index float32
  93. m60IndexPoint float32
  94. m60Value int
  95. m80 float32
  96. m80Index float32
  97. m80IndexPoint float32
  98. m80Value int
  99. m90 float32
  100. m90Index float32
  101. m90IndexPoint float32
  102. m90Value int
  103. took *archive.TaskTook
  104. taskTookSortLen = len(s.taskCache.Sort)
  105. )
  106. if taskTookSortLen > 1 {
  107. spacing = float32(taskTookSortLen-1) / 10
  108. m50Index = 1 + spacing*5
  109. m50IndexPoint = m50Index - float32(int(m50Index))
  110. m50Value = s.taskCache.Sort[int(m50Index)-1]
  111. m50 = float32(s.taskCache.Sort[int(m50Index)]-m50Value)*m50IndexPoint + float32(m50Value)
  112. m60Index = 1 + spacing*6
  113. m60IndexPoint = m60Index - float32(int(m60Index))
  114. m60Value = s.taskCache.Sort[int(m60Index)-1]
  115. m60 = float32(s.taskCache.Sort[int(m60Index)]-m60Value)*m60IndexPoint + float32(m60Value)
  116. m80Index = 1 + spacing*8
  117. m80IndexPoint = m80Index - float32(int(m80Index))
  118. m80Value = s.taskCache.Sort[int(m80Index)-1]
  119. m80 = float32(s.taskCache.Sort[int(m80Index)]-m80Value)*m80IndexPoint + float32(m80Value)
  120. m90Index = 1 + spacing*9
  121. m90IndexPoint = m90Index - float32(int(m90Index))
  122. m90Value = s.taskCache.Sort[int(m90Index)-1]
  123. m90 = float32(s.taskCache.Sort[int(m90Index)]-m90Value)*m90IndexPoint + float32(m90Value)
  124. took = &archive.TaskTook{}
  125. took.M50 = int(m50 + 0.5)
  126. took.M60 = int(m60 + 0.5)
  127. took.M80 = int(m80 + 0.5)
  128. took.M90 = int(m90 + 0.5)
  129. took.TypeID = archive.TookTypeMinute
  130. took.Ctime = time.Now()
  131. took.Mtime = took.Ctime
  132. s.taskCache.Took = append(s.taskCache.Took, took)
  133. lastID, err = s.arc.AddTaskTook(context.TODO(), took)
  134. }
  135. return
  136. }
  137. func (s *Service) hdlTaskTookByHourHalf() (lastID int64, err error) {
  138. s.taskCache.Lock()
  139. defer s.taskCache.Unlock()
  140. var (
  141. m50 int
  142. m60 int
  143. m80 int
  144. m90 int
  145. took *archive.TaskTook
  146. tookLen = len(s.taskCache.Took)
  147. )
  148. for _, v := range s.taskCache.Took {
  149. m50 += v.M50
  150. m60 += v.M60
  151. m80 += v.M80
  152. m90 += v.M90
  153. }
  154. if tookLen >= 30 {
  155. took = &archive.TaskTook{}
  156. m50 /= int(float32(tookLen) + 0.5)
  157. m60 /= int(float32(tookLen) + 0.5)
  158. m80 /= int(float32(tookLen) + 0.5)
  159. m90 /= int(float32(tookLen) + 0.5)
  160. took.M50 = m50
  161. took.M60 = m60
  162. took.M80 = m80
  163. took.M90 = m90
  164. took.Ctime = time.Now()
  165. took.Mtime = took.Ctime
  166. took.TypeID = archive.TookTypeHalfHour
  167. lastID, err = s.arc.AddTaskTook(context.TODO(), took)
  168. s.taskCache.Took = nil
  169. }
  170. return
  171. }
  172. // TaskTooksByHalfHour get task books by ctime
  173. func (s *Service) TaskTooksByHalfHour(c context.Context, stime, etime time.Time) (tooks []*archive.TaskTook, err error) {
  174. if tooks, err = s.arc.TaskTooksByHalfHour(c, stime, etime); err != nil {
  175. log.Error("s.arc.TaskTooksByHalfHour(%v,%v)", stime, etime)
  176. return
  177. }
  178. return
  179. }