dataplatform.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "fmt"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. pamdl "go-common/app/admin/main/push/model"
  12. "go-common/app/job/main/push/model"
  13. pushmdl "go-common/app/service/main/push/model"
  14. xsql "go-common/library/database/sql"
  15. "go-common/library/log"
  16. )
  17. // txCond get a new condition by tx.
  18. func (s *Service) txCond(oldStatus, newStatus int) (cond *pamdl.DPCondition, err error) {
  19. ctx := context.Background()
  20. var tx *xsql.Tx
  21. if tx, err = s.dao.BeginTx(ctx); err != nil {
  22. log.Error("tx.BeginTx() error(%v)", err)
  23. return
  24. }
  25. if cond, err = s.dao.TxCondByStatus(tx, oldStatus); err != nil || cond == nil {
  26. if e := tx.Rollback(); e != nil {
  27. log.Error("tx.Rollback() error(%v)", e)
  28. }
  29. return
  30. }
  31. if err = s.dao.TxUpdateCondStatus(tx, cond.ID, newStatus); err != nil {
  32. if e := tx.Rollback(); e != nil {
  33. log.Error("tx.Rollback() error(%v)", e)
  34. }
  35. return
  36. }
  37. if err = tx.Commit(); err != nil {
  38. log.Error("tx.Commit() error(%v)", err)
  39. }
  40. return
  41. }
  42. // data platform query
  43. func (s *Service) dpQueryproc() {
  44. defer s.waiter.Done()
  45. for {
  46. if s.closed {
  47. return
  48. }
  49. cond, err := s.txCond(pushmdl.DpCondStatusPrepared, pushmdl.DpCondStatusSubmitting)
  50. if err != nil || cond == nil {
  51. time.Sleep(time.Second)
  52. continue
  53. }
  54. for i := 0; i < _retry; i++ {
  55. if cond.StatusURL, err = s.dao.DpSubmitQuery(context.Background(), cond.SQL); err == nil {
  56. break
  57. }
  58. time.Sleep(time.Second)
  59. }
  60. if err != nil {
  61. log.Error("data platform add query(%+v) error(%v)", cond, err)
  62. s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
  63. s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
  64. continue
  65. }
  66. cond.Status = pushmdl.DpCondStatusSubmitted
  67. for i := 0; i < _retry; i++ {
  68. if err = s.dao.UpdateDpCond(context.Background(), cond); err == nil {
  69. break
  70. }
  71. time.Sleep(10 * time.Millisecond)
  72. }
  73. if err != nil {
  74. log.Error("data platform update condition(%+v) error(%v)", cond, err)
  75. }
  76. time.Sleep(time.Second)
  77. }
  78. }
  79. // data platform get file
  80. func (s *Service) dpFileproc() {
  81. defer s.waiter.Done()
  82. for {
  83. if s.closed {
  84. return
  85. }
  86. cond, err := s.txCond(pushmdl.DpCondStatusSubmitted, pushmdl.DpCondStatusPolling)
  87. if err != nil || cond == nil {
  88. time.Sleep(time.Second)
  89. continue
  90. }
  91. var (
  92. path string
  93. files []string
  94. )
  95. if files = s.dpCheckJob(cond); len(files) == 0 {
  96. continue
  97. }
  98. for i := 0; i < _retry; i++ {
  99. if path, err = s.dpDownloadFiles(cond, files); err == nil {
  100. break
  101. }
  102. time.Sleep(10 * time.Millisecond)
  103. }
  104. if err != nil || path == "" {
  105. log.Error("data platform download query(%+v) file error(%v)", cond, err)
  106. s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
  107. s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
  108. continue
  109. }
  110. cond.File = path
  111. cond.Status = pushmdl.DpCondStatusDone
  112. for i := 0; i < _retry; i++ {
  113. if err = s.dao.UpdateDpCond(context.Background(), cond); err == nil {
  114. break
  115. }
  116. time.Sleep(10 * time.Millisecond)
  117. }
  118. if err != nil {
  119. log.Error("data platform UpdateDpCond(%+v) error(%v)", cond, err)
  120. continue
  121. }
  122. for i := 0; i < _retry; i++ {
  123. if err = s.dao.UpdateTask(context.Background(), strconv.FormatInt(cond.Task, 10), path, pushmdl.TaskStatusPretreatmentPrepared); err == nil {
  124. break
  125. }
  126. time.Sleep(10 * time.Millisecond)
  127. }
  128. if err != nil {
  129. log.Error("s.dao.UpdateTask(%d,%s,%d) error(%v)", cond.Task, path, pushmdl.TaskStatusPretreatmentPrepared)
  130. }
  131. time.Sleep(time.Second)
  132. }
  133. }
  134. func (s *Service) dpCheckJob(cond *pamdl.DPCondition) (files []string) {
  135. now := time.Now()
  136. for {
  137. if time.Since(now) > time.Duration(s.c.Job.DpPollingTime) {
  138. log.Error("polling stoped, more over than dpPollingTime, give job up")
  139. s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
  140. break
  141. }
  142. res, err := s.dao.DpCheckJob(context.Background(), cond.StatusURL)
  143. if err != nil {
  144. log.Error("s.dao.DpCheckJob(%s) error(%v)", cond.StatusURL, err)
  145. time.Sleep(time.Second)
  146. continue
  147. }
  148. if res.StatusID == model.CheckJobStatusDoing || res.StatusID == model.CheckJobStatusPending {
  149. log.Info("polling (%s) ing..., status(%d)", cond.StatusURL, res.StatusID)
  150. time.Sleep(5 * time.Second)
  151. continue
  152. }
  153. if res.StatusID == model.CheckJobStatusOk {
  154. if len(res.Files) == 0 {
  155. log.Info("polling (%s) success, no files found", cond.StatusURL)
  156. s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusNoFile)
  157. break
  158. }
  159. files = res.Files
  160. log.Info("polling (%s) success, files(%d)", cond.StatusURL, len(files))
  161. return
  162. }
  163. if res.StatusID == model.CheckJobStatusErr {
  164. log.Error("polling (%s) error, res(%+v)", cond.StatusURL, res)
  165. s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusFailed)
  166. break
  167. }
  168. }
  169. log.Error("polling cond(%d) error", cond.ID)
  170. s.dao.UpdateTaskStatus(context.Background(), cond.Task, pushmdl.TaskStatusFailed)
  171. return
  172. }
  173. func (s *Service) dpDownloadFiles(cond *pamdl.DPCondition, files []string) (path string, err error) {
  174. for i := 0; i < _retry; i++ {
  175. if err = s.dao.UpdateDpCondStatus(context.Background(), cond.ID, pushmdl.DpCondStatusDownloading); err == nil {
  176. break
  177. }
  178. time.Sleep(10 * time.Millisecond)
  179. }
  180. if err != nil {
  181. return
  182. }
  183. dir := fmt.Sprintf("%s/%s", strings.TrimSuffix(s.c.Job.MountDir, "/"), time.Now().Format("20060102"))
  184. if _, err = os.Stat(dir); err != nil {
  185. if !os.IsNotExist(err) {
  186. log.Error("os.IsNotExist(%s) error(%v)", dir, err)
  187. return
  188. }
  189. if err = os.MkdirAll(dir, 0777); err != nil {
  190. log.Error("os.MkdirAll(%s) error(%v)", dir, err)
  191. return
  192. }
  193. }
  194. name := strconv.FormatInt(time.Now().UnixNano(), 10)
  195. path = fmt.Sprintf("%s/%x", dir, md5.Sum([]byte(name)))
  196. for _, f := range files {
  197. if err = s.dpDownloadFile(f, path); err != nil {
  198. return
  199. }
  200. }
  201. return
  202. }
  203. func (s *Service) dpDownloadFile(url, path string) (err error) {
  204. var (
  205. res []byte
  206. content [][]byte
  207. )
  208. for i := 0; i < _retry; i++ {
  209. if res, err = s.dao.DpDownloadFile(context.Background(), url); err == nil {
  210. break
  211. }
  212. time.Sleep(10 * time.Millisecond)
  213. }
  214. if err != nil {
  215. log.Error("s.dao.DpDownloadFile(%s) error(%v)", url, err)
  216. return
  217. }
  218. for _, bs := range bytes.Split(res, []byte("\n")) {
  219. n := bytes.Split(bs, []byte("\u0001"))
  220. content = append(content, bytes.Join(n, []byte(" ")))
  221. }
  222. for i := 0; i < _retry; i++ {
  223. if err = s.saveDpFile(path, bytes.Join(content, []byte("\n"))); err == nil {
  224. break
  225. }
  226. time.Sleep(10 * time.Millisecond)
  227. }
  228. if err != nil {
  229. log.Error("s.saveNASFile(%s) error(%v)", url, err)
  230. }
  231. return
  232. }
  233. // saveDpFile writes data platform data into NAS.
  234. func (s *Service) saveDpFile(path string, data []byte) (err error) {
  235. f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  236. if err != nil {
  237. log.Error("s.saveDpFile(%s) OpenFile() error(%v)", path, err)
  238. return
  239. }
  240. defer f.Close()
  241. if _, err = f.Write(data); err != nil {
  242. log.Error("s.saveDpFile(%s) f.Write() error(%v)", path, err)
  243. }
  244. return
  245. }