task.go 13 KB


  1. package service
  2. import (
  3. "bufio"
  4. "context"
  5. "crypto/md5"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "go-common/app/job/main/push/dao"
  15. pb "go-common/app/service/main/push/api/grpc/v1"
  16. pushmdl "go-common/app/service/main/push/model"
  17. xsql "go-common/library/database/sql"
  18. "go-common/library/log"
  19. "go-common/library/sync/errgroup"
  20. )
  21. const (
  22. _delTaskLimit = 5000
  23. )
  24. var (
  25. errEmptyLine = errors.New("empty line")
  26. errInvalidMid = errors.New("invalid mid format")
  27. errInvalidToken = errors.New("invalid token format")
  28. )
  29. func (s *Service) addTaskproc() {
  30. defer s.addTaskWg.Done()
  31. var err error
  32. for {
  33. task, ok := <-s.addTaskCh
  34. if !ok {
  35. log.Info("add task channel exit")
  36. return
  37. }
  38. if task == nil {
  39. continue
  40. }
  41. task.Status = pushmdl.TaskStatusPrepared
  42. for i := 0; i < _retry; i++ {
  43. if err = s.dao.AddTask(context.Background(), task); err == nil {
  44. break
  45. }
  46. }
  47. if err != nil {
  48. log.Error("add task(%+v) error(%v)", task, err)
  49. s.cache.Save(func() {
  50. s.dao.SendWechat(fmt.Sprintf("add task(%d)", task.Job))
  51. })
  52. continue
  53. }
  54. dao.PromInfo("add task")
  55. time.Sleep(time.Millisecond)
  56. }
  57. }
  58. func (s *Service) delTasksproc() {
  59. for {
  60. now := time.Now()
  61. // 每天2点时删除一个月前的task数据
  62. if now.Hour() != 2 {
  63. time.Sleep(time.Minute)
  64. continue
  65. }
  66. var (
  67. err error
  68. deleted int64
  69. b = now.Add(time.Duration(-s.c.Job.DelTaskInterval*24) * time.Hour)
  70. loc, _ = time.LoadLocation("Local")
  71. t = time.Date(b.Year(), b.Month(), b.Day(), 23, 59, 59, 0, loc)
  72. )
  73. for {
  74. if deleted, err = s.dao.DelTasks(context.TODO(), t, _delTaskLimit); err != nil {
  75. log.Error("s.delTasks(%v) error(%v)", t, err)
  76. s.dao.SendWechat("DB操作失败:push-job删除task数据错误")
  77. time.Sleep(time.Second)
  78. continue
  79. }
  80. if deleted < _delTaskLimit {
  81. break
  82. }
  83. time.Sleep(time.Second)
  84. }
  85. time.Sleep(time.Hour)
  86. }
  87. }
  88. func (s *Service) pretreatTaskproc() {
  89. defer s.waiter.Done()
  90. for {
  91. if s.closed {
  92. return
  93. }
  94. task, err := s.pickPretreatmentTask()
  95. if err != nil {
  96. time.Sleep(5 * time.Second)
  97. continue
  98. }
  99. if task != nil {
  100. log.Info("pretreat task job(%d) id(%s)", task.Job, task.ID)
  101. if err = s.pretreatTask(task); err != nil {
  102. log.Error("pretreat task(%+v) error(%v)", task, err)
  103. s.cache.Save(func() { s.dao.SendWechat(fmt.Sprintf("pretreat task(%s) error", task.ID)) })
  104. }
  105. }
  106. time.Sleep(time.Duration(s.c.Job.LoadTaskInteval))
  107. }
  108. }
  109. func (s *Service) pickPretreatmentTask() (t *pushmdl.Task, err error) {
  110. c := context.Background()
  111. var tx *xsql.Tx
  112. if tx, err = s.dao.BeginTx(c); err != nil {
  113. log.Error("tx.BeginTx() error(%v)", err)
  114. return
  115. }
  116. if t, err = s.dao.TxTaskByStatus(tx, pushmdl.TaskStatusPretreatmentPrepared); err != nil {
  117. if e := tx.Rollback(); e != nil {
  118. dao.PromError("task:获取新任务")
  119. log.Error("tx.Rollback() error(%v)", e)
  120. }
  121. return
  122. }
  123. if t == nil {
  124. if e := tx.Rollback(); e != nil {
  125. dao.PromError("task:获取新任务")
  126. log.Error("tx.Rollback() error(%v)", e)
  127. }
  128. return
  129. }
  130. if err = s.dao.TxUpdateTaskStatus(tx, t.ID, pushmdl.TaskStatusPretreatmentDoing); err != nil {
  131. if e := tx.Rollback(); e != nil {
  132. dao.PromError("task:更新任务状态")
  133. log.Error("tx.Rollback() error(%v)", e)
  134. }
  135. return
  136. }
  137. if err = tx.Commit(); err != nil {
  138. dao.PromError("task:获取新任务commit")
  139. log.Error("tx.Commit() error(%v)", err)
  140. }
  141. return
  142. }
  143. func (s *Service) pretreatTask(t *pushmdl.Task) (err error) {
  144. id, _ := strconv.ParseInt(t.ID, 10, 64)
  145. switch t.Type {
  146. case pushmdl.TaskTypeAll:
  147. err = s.pretreatTaskAll(t)
  148. case pushmdl.TaskTypeMngToken, pushmdl.TaskTypeDataPlatformToken, pushmdl.TaskTypeDataPlatformMid:
  149. err = s.pretreatTaskToken(t)
  150. case pushmdl.TaskTypeStrategyMid, pushmdl.TaskTypeMngMid:
  151. err = s.pretreatTaskMid(t)
  152. default:
  153. log.Error("invalid task type, (%+v)", t)
  154. }
  155. if err != nil {
  156. err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentFailed)
  157. return
  158. }
  159. err = s.dao.UpdateTaskStatus(context.Background(), id, pushmdl.TaskStatusPretreatmentDone)
  160. return
  161. }
  162. func (s *Service) pretreatTaskAll(t *pushmdl.Task) (err error) {
  163. log.Info("AddTaskAll start, task(%+v)", t)
  164. var (
  165. maxID int64
  166. group = errgroup.Group{}
  167. )
  168. maxID, err = s.dao.ReportLastID(context.Background())
  169. if err != nil || maxID <= 0 {
  170. log.Error("s.pretreatTaskAll() error(%v)", err)
  171. s.cache.Save(func() {
  172. s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportLastID(%d) error", t.ID, maxID))
  173. })
  174. return
  175. }
  176. log.Info("AddTaskAll get last report ID(%d)", maxID)
  177. buildCount := len(t.Build)
  178. batch := maxID / int64(s.c.Job.TaskGoroutines)
  179. for j := 0; j < s.c.Job.TaskGoroutines; j++ {
  180. begin := int64(j) * batch
  181. end := begin + batch
  182. group.Go(func() (e error) {
  183. var (
  184. path string
  185. rows *xsql.Rows
  186. tokens = make(map[int][]string)
  187. )
  188. for {
  189. if begin >= end {
  190. break
  191. }
  192. l := begin + int64(_dbBatch)
  193. if l >= end {
  194. l = end
  195. }
  196. log.Info("AddTaskAll load reports start(%d) end(%d)", begin, l)
  197. if rows, e = s.dao.ReportsTaskAll(context.Background(), begin, l, t.APPID); e != nil {
  198. log.Error("s.dao.ReportsTaskAll(%d,%d,%d) error(%v)", begin, l, t.APPID)
  199. s.cache.Save(func() {
  200. s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
  201. })
  202. return
  203. }
  204. for rows.Next() {
  205. var (
  206. platformID int
  207. build int
  208. token string
  209. )
  210. if e = rows.Scan(&platformID, &token, &build); e != nil {
  211. log.Error("AddTaskAll rows.Scan() error(%v)", e)
  212. s.cache.Save(func() {
  213. s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) ReportsTaskAll(%d,%d,%d) error", t.ID, begin, l, t.APPID))
  214. })
  215. return
  216. }
  217. if buildCount > 0 && !pushmdl.ValidateBuild(platformID, build, t.Build) {
  218. continue
  219. }
  220. tokens[platformID] = append(tokens[platformID], token)
  221. if len(tokens[platformID]) >= s.c.Job.LimitPerTask {
  222. if path, e = s.saveFile(tokens[platformID]); e != nil {
  223. log.Error("AddTaskAll s.saveTokens error(%v)", e)
  224. s.cache.Save(func() {
  225. s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) saveTokens error(%v)", t.ID, e))
  226. })
  227. return
  228. }
  229. tokens[platformID] = []string{}
  230. task := *t
  231. task.MidFile = path
  232. task.PlatformID = platformID
  233. s.addTaskCh <- &task
  234. }
  235. }
  236. begin = l
  237. }
  238. for p, v := range tokens {
  239. if len(v) == 0 {
  240. continue
  241. }
  242. if path, e = s.saveFile(v); e == nil {
  243. task := *t
  244. task.MidFile = path
  245. task.PlatformID = p
  246. s.addTaskCh <- &task
  247. }
  248. }
  249. return
  250. })
  251. }
  252. if err = group.Wait(); err != nil {
  253. log.Error("add task all, task(%+v) error(%v)", t, err)
  254. s.cache.Save(func() {
  255. s.dao.SendWechat(fmt.Sprintf("pretreatTaskAll(%v) error(%v)", t.ID, err))
  256. })
  257. return
  258. }
  259. log.Info("AddTaskAll end, task(%+v)", t)
  260. s.cache.Save(func() {
  261. s.dao.SendWechat(fmt.Sprintf("add task all success, job(%d)", t.Job))
  262. })
  263. return
  264. }
  265. func (s *Service) pretreatTaskMid(t *pushmdl.Task) (err error) {
  266. f, err := os.Open(t.MidFile)
  267. if err != nil {
  268. log.Error("pretreatTaskMid(%+v) open file error(%v)", t, err)
  269. return
  270. }
  271. defer f.Close()
  272. var (
  273. exit bool
  274. line string
  275. path string
  276. mid int64
  277. counter int
  278. midTotal int64
  279. midValid int64
  280. mu sync.Mutex
  281. mids []int64
  282. tokens = make(map[int][]string)
  283. group = errgroup.Group{}
  284. reader = bufio.NewReader(f)
  285. )
  286. for {
  287. if exit {
  288. break
  289. }
  290. if line, err = reader.ReadString('\n'); err != nil {
  291. if err == io.EOF {
  292. exit = true
  293. } else {
  294. log.Error("read file error(%v)", err)
  295. continue
  296. }
  297. }
  298. if mid, err = parseMidLine(line); err != nil {
  299. log.Error("parse mid line(%s) error(%v)", line, err)
  300. continue
  301. }
  302. midTotal++
  303. mids = append(mids, mid)
  304. if len(mids) >= s.c.Job.PushPartSize {
  305. midsCp := make([]int64, len(mids))
  306. copy(midsCp, mids)
  307. mids = []int64{}
  308. group.Go(func() (e error) {
  309. ts, valid, e := s.tokensByMids(t, midsCp)
  310. if e != nil {
  311. log.Error("s.tokensByMids(%v) error(%v)", t.ID, e)
  312. return
  313. }
  314. tcopy := make(map[int][]string)
  315. mu.Lock()
  316. midValid += valid
  317. for p, v := range ts {
  318. tokens[p] = append(tokens[p], v...)
  319. if len(tokens[p]) >= s.c.Job.LimitPerTask {
  320. tcopy[p] = append(tcopy[p], tokens[p]...)
  321. tokens[p] = []string{}
  322. }
  323. }
  324. mu.Unlock()
  325. for p, v := range tcopy {
  326. if path, err = s.saveFile(v); err != nil {
  327. log.Error("pretreatTaskMid s.saveFild error(%v)", err)
  328. s.cache.Save(func() {
  329. s.dao.SendWechat(fmt.Sprintf("pretreatTaskMid(%v) saveTokens error(%v)", t.ID, err))
  330. })
  331. return
  332. }
  333. task := *t
  334. task.MidFile = path
  335. task.PlatformID = p
  336. s.addTaskCh <- &task
  337. }
  338. return
  339. })
  340. counter++
  341. if counter == s.c.Job.PushPartChanSize {
  342. group.Wait()
  343. counter = 0
  344. }
  345. }
  346. }
  347. if counter > 0 {
  348. group.Wait()
  349. }
  350. if len(mids) > 0 {
  351. var (
  352. valid int64
  353. ts map[int][]string
  354. )
  355. if ts, valid, err = s.tokensByMids(t, mids); err == nil {
  356. midValid += valid
  357. for p, v := range ts {
  358. tokens[p] = append(tokens[p], v...)
  359. }
  360. } else {
  361. log.Error("s.tokensByMids(%+v) error(%v)", t, err)
  362. }
  363. }
  364. s.cache.Save(func() {
  365. arg := &pb.AddMidProgressRequest{Task: t.ID, MidTotal: midTotal, MidValid: midValid}
  366. if _, e := s.pushRPC.AddMidProgress(context.Background(), arg); e != nil {
  367. log.Error("s.pushRPC.AddMidProgress(%+v) error(%v)", arg, e)
  368. }
  369. })
  370. for p, v := range tokens {
  371. if len(v) == 0 {
  372. continue
  373. }
  374. if path, err = s.saveFile(v); err != nil {
  375. log.Error("pretreatTaskMid s.saveFild error(%v)", err)
  376. return
  377. }
  378. task := *t
  379. task.MidFile = path
  380. task.PlatformID = p
  381. s.addTaskCh <- &task
  382. }
  383. log.Info("pretreatTaskMid task(%+v)", t)
  384. return
  385. }
  386. func (s *Service) pretreatTaskToken(t *pushmdl.Task) (err error) {
  387. f, err := os.Open(t.MidFile)
  388. if err != nil {
  389. log.Error("pretreatTaskToken(%+v) open file error(%v)", t, err)
  390. return
  391. }
  392. defer f.Close()
  393. var (
  394. exit bool
  395. plat int
  396. line string
  397. token string
  398. path string
  399. tokens = make(map[int][]string)
  400. reader = bufio.NewReader(f)
  401. )
  402. for {
  403. if exit {
  404. break
  405. }
  406. if line, err = reader.ReadString('\n'); err != nil {
  407. if err == io.EOF {
  408. exit = true // no 'continue', solve the last line whitout '\n'
  409. } else {
  410. log.Error("read file error(%v)", err)
  411. continue
  412. }
  413. }
  414. if plat, token, err = parseTokenLine(line); err != nil {
  415. log.Error("parse token line(%s) error(%v)", line, err)
  416. continue
  417. }
  418. tokens[plat] = append(tokens[plat], token)
  419. if len(tokens[plat]) >= s.c.Job.LimitPerTask {
  420. if path, err = s.saveFile(tokens[plat]); err != nil {
  421. log.Error("pretreatTaskToken s.saveFile error(%v)", err)
  422. s.cache.Save(func() {
  423. s.dao.SendWechat(fmt.Sprintf("pretreatTaskToken(%v) saveTokens error(%v)", t.ID, err))
  424. })
  425. return
  426. }
  427. tokens[plat] = []string{}
  428. task := *t
  429. task.MidFile = path
  430. task.PlatformID = plat
  431. s.addTaskCh <- &task
  432. }
  433. }
  434. for p, v := range tokens {
  435. if len(v) == 0 {
  436. continue
  437. }
  438. if path, err = s.saveFile(v); err == nil {
  439. task := *t
  440. task.MidFile = path
  441. task.PlatformID = p
  442. s.addTaskCh <- &task
  443. }
  444. }
  445. log.Info("pretreatTaskToken task(%+v)", t)
  446. return
  447. }
  448. func parseTokenLine(line string) (plat int, token string, err error) {
  449. line = strings.Trim(line, " \r\n")
  450. if line == "" {
  451. err = errEmptyLine
  452. return
  453. }
  454. res := strings.Split(line, "\t")
  455. if len(res) != 2 {
  456. err = errInvalidToken
  457. return
  458. }
  459. if res[0] == "" || res[1] == "" {
  460. err = errInvalidToken
  461. return
  462. }
  463. if plat, err = strconv.Atoi(res[0]); err != nil || plat <= 0 {
  464. err = errInvalidToken
  465. return
  466. }
  467. token = res[1]
  468. return
  469. }
  470. func parseMidLine(line string) (mid int64, err error) {
  471. line = strings.Trim(line, " \r\t\n")
  472. if line == "" {
  473. err = errEmptyLine
  474. return
  475. }
  476. if mid, err = strconv.ParseInt(line, 10, 64); err != nil || mid <= 0 {
  477. err = errInvalidMid
  478. }
  479. return
  480. }
  481. func (s *Service) saveFile(tokens []string) (path string, err error) {
  482. name := strconv.FormatInt(time.Now().UnixNano(), 10) + tokens[0]
  483. data := []byte(strings.Join(tokens, "\n"))
  484. for i := 0; i < _retry; i++ {
  485. if path, err = s.saveNASFile(name, data); err == nil {
  486. break
  487. }
  488. time.Sleep(100 * time.Millisecond)
  489. }
  490. return
  491. }
  492. // saveNASFile writes data into NAS.
  493. func (s *Service) saveNASFile(name string, data []byte) (path string, err error) {
  494. name = fmt.Sprintf("%x", md5.Sum([]byte(name)))
  495. dir := fmt.Sprintf("%s/%s/%s", strings.TrimSuffix(s.c.Job.MountDir, "/"), time.Now().Format("20060102"), name[:2])
  496. if _, err = os.Stat(dir); err != nil {
  497. if !os.IsNotExist(err) {
  498. log.Error("os.IsNotExist(%s) error(%v)", dir, err)
  499. return
  500. }
  501. if err = os.MkdirAll(dir, 0777); err != nil {
  502. log.Error("os.MkdirAll(%s) error(%v)", dir, err)
  503. return
  504. }
  505. }
  506. path = fmt.Sprintf("%s/%s", dir, name)
  507. f, err := os.OpenFile(path, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
  508. if err != nil {
  509. log.Error("s.saveNASFile(%s) OpenFile() error(%v)", path, err)
  510. return
  511. }
  512. defer f.Close()
  513. if _, err = f.Write(data); err != nil {
  514. log.Error("s.saveNASFile(%s) f.Write() error(%v)", err)
  515. }
  516. return
  517. }