task.go 6.2 KB


  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "fmt"
  7. "io/ioutil"
  8. "math/big"
  9. "net"
  10. "net/http"
  11. "regexp"
  12. "strings"
  13. "time"
  14. "go-common/app/admin/main/dm/model"
  15. "go-common/library/ecode"
  16. "go-common/library/log"
  17. "go-common/library/xstr"
  18. )
  19. var (
  20. _csvTaskTitle = []string{"dmid", "oid", "mid", "state", "msg", "ip", "ctime"}
  21. )
  22. // TaskList .
  23. func (s *Service) TaskList(c context.Context, v *model.TaskListArg) (res *model.TaskList, err error) {
  24. taskSQL := make([]string, 0)
  25. if v.Creator != "" {
  26. taskSQL = append(taskSQL, fmt.Sprintf("creator LIKE %q", "%"+v.Creator+"%"))
  27. }
  28. if v.Reviewer != "" {
  29. taskSQL = append(taskSQL, fmt.Sprintf("reviewer LIKE %q", "%"+v.Reviewer+"%"))
  30. }
  31. if v.State >= 0 {
  32. taskSQL = append(taskSQL, fmt.Sprintf("state=%d", v.State))
  33. }
  34. if v.Title != "" {
  35. taskSQL = append(taskSQL, fmt.Sprintf("title LIKE %q", "%"+v.Title+"%"))
  36. }
  37. if v.Ctime != "" {
  38. taskSQL = append(taskSQL, fmt.Sprintf("ctime>=%q", v.Ctime))
  39. }
  40. tasks, total, err := s.dao.TaskList(c, taskSQL, v.Pn, v.Ps)
  41. if err != nil {
  42. return
  43. }
  44. res = &model.TaskList{
  45. Result: tasks,
  46. Page: &model.PageInfo{
  47. Num: v.Pn,
  48. Size: v.Ps,
  49. Total: total,
  50. },
  51. }
  52. return
  53. }
  54. // AddTask .
  55. func (s *Service) AddTask(c context.Context, v *model.AddTaskArg) (err error) {
  56. var taskID int64
  57. var sub int32
  58. tx, err := s.dao.BeginBiliDMTrans(c)
  59. if err != nil {
  60. log.Error("tx.BeginBiliDMTrans error(%v)", err)
  61. return
  62. }
  63. defer func() {
  64. if err != nil {
  65. if err1 := tx.Rollback(); err1 != nil {
  66. log.Error("tx.Rollback() error(%v)", err1)
  67. }
  68. return
  69. }
  70. if err = tx.Commit(); err != nil {
  71. log.Error("tx.Commit() error(%v)", err)
  72. }
  73. }()
  74. if v.Regex != "" {
  75. if len([]rune(v.Regex)) > model.TaskRegexLen {
  76. err = ecode.DMTaskRegexTooLong
  77. return
  78. }
  79. if _, err = regexp.Compile(v.Regex); err != nil {
  80. log.Error("regexp.Compile(v.Regex:%s) error(%v)", v.Regex, err)
  81. err = ecode.DMTaskRegexIllegal
  82. return
  83. }
  84. }
  85. if v.Operation >= 0 {
  86. sub = 1
  87. }
  88. if taskID, err = s.dao.AddTask(tx, v, sub); err != nil {
  89. return
  90. }
  91. if sub > 0 {
  92. _, err = s.dao.AddSubTask(tx, taskID, v.Operation, v.OpTime, v.OpRate)
  93. }
  94. return
  95. }
  96. // ReviewTask .
  97. func (s *Service) ReviewTask(c context.Context, v *model.ReviewTaskArg) (err error) {
  98. if v.State == model.TaskReviewPass {
  99. var (
  100. task *model.TaskView
  101. sTime, eTime time.Time
  102. )
  103. if task, err = s.dao.TaskView(c, v.ID); err != nil {
  104. return
  105. }
  106. taskSQL := make([]string, 0)
  107. if task.Regex != "" {
  108. taskSQL = append(taskSQL, fmt.Sprintf("content.msg regexp %q", task.Regex))
  109. }
  110. if task.KeyWords != "" {
  111. taskSQL = append(taskSQL, fmt.Sprintf("content.msg like %q", "%"+task.KeyWords+"%"))
  112. }
  113. if task.IPs != "" {
  114. ips := xstr.JoinInts(ipsToInts(task.IPs))
  115. taskSQL = append(taskSQL, fmt.Sprintf("content.ip in (%s)", ips))
  116. }
  117. if task.Mids != "" {
  118. taskSQL = append(taskSQL, fmt.Sprintf("index.mid in (%s)", task.Mids))
  119. }
  120. if task.Cids != "" {
  121. taskSQL = append(taskSQL, fmt.Sprintf("index.oid in (%s)", task.Cids))
  122. }
  123. if task.Start != "" {
  124. if sTime, err = time.ParseInLocation("2006-01-02 15:04:05", task.Start, time.Local); err != nil {
  125. return
  126. }
  127. taskSQL = append(taskSQL, fmt.Sprintf("content.log_date>=%s", sTime.Format("20060102")))
  128. taskSQL = append(taskSQL, fmt.Sprintf("content.ctime>=%q", task.Start))
  129. }
  130. if task.End != "" {
  131. if eTime, err = time.ParseInLocation("2006-01-02 15:04:05", task.End, time.Local); err != nil {
  132. return
  133. }
  134. taskSQL = append(taskSQL, fmt.Sprintf("content.log_date<=%s", eTime.Format("20060102")))
  135. taskSQL = append(taskSQL, fmt.Sprintf("content.ctime<=%q", task.End))
  136. }
  137. if v.Topic, err = s.dao.SendTask(c, taskSQL); err != nil {
  138. // err = nil
  139. // v.State = model.TaskStateFailed
  140. return
  141. }
  142. }
  143. _, err = s.dao.ReviewTask(c, v)
  144. return
  145. }
  146. // EditTaskState .
  147. func (s *Service) EditTaskState(c context.Context, v *model.EditTasksStateArg) (err error) {
  148. if _, err = s.dao.EditTaskState(c, v); err != nil {
  149. return
  150. }
  151. if v.State == model.TaskStateRun {
  152. _, err = s.dao.EditTaskPriority(c, v.IDs, time.Now().Unix())
  153. }
  154. return
  155. }
  156. // TaskView .
  157. func (s *Service) TaskView(c context.Context, v *model.TaskViewArg) (task *model.TaskView, err error) {
  158. if task, err = s.dao.TaskView(c, v.ID); err != nil || task == nil {
  159. return
  160. }
  161. if task.SubTask, err = s.dao.SubTask(c, task.ID); err != nil || task.SubTask == nil {
  162. return
  163. }
  164. task.Tcount = task.SubTask.Tcount
  165. return
  166. }
  167. // TaskCsv .
  168. func (s *Service) TaskCsv(c context.Context, id int64) (bs []byte, err error) {
  169. var (
  170. task *model.TaskView
  171. buf *bytes.Buffer
  172. csvWriter *csv.Writer
  173. )
  174. if task, err = s.dao.TaskView(c, id); err != nil {
  175. return
  176. }
  177. if task == nil || len(task.Result) == 0 {
  178. err = ecode.NothingFound
  179. return
  180. }
  181. res, err := http.Get(task.Result)
  182. if err != nil {
  183. log.Error("s.HttpGet(%s) error(%v)", task.Result, err)
  184. return
  185. }
  186. defer res.Body.Close()
  187. if res.StatusCode != http.StatusOK {
  188. err = ecode.NothingFound
  189. return
  190. }
  191. resp, err := ioutil.ReadAll(res.Body)
  192. if err != nil {
  193. log.Error("s.ioutilRead error(%v)", err)
  194. return
  195. }
  196. buf = &bytes.Buffer{}
  197. csvWriter = csv.NewWriter(buf)
  198. if err = csvWriter.Write(_csvTaskTitle); err != nil {
  199. log.Error("csvWriter.Write(%v) erorr(%v)", _csvTaskTitle, err)
  200. return
  201. }
  202. lines := bytes.Split(resp, []byte("\n"))
  203. if len(lines) == 0 {
  204. err = ecode.NothingFound
  205. return
  206. }
  207. for _, line := range lines {
  208. var (
  209. items []string
  210. )
  211. fields := bytes.Split(line, []byte("\001"))
  212. if len(fields) < 7 {
  213. log.Error("fields lenth too small:%d", len(fields))
  214. continue
  215. }
  216. for _, field := range fields {
  217. items = append(items, string(field))
  218. }
  219. if err = csvWriter.Write(items); err != nil {
  220. log.Error("csvWriter.Write(%v) erorr(%v)", items, err)
  221. return
  222. }
  223. }
  224. csvWriter.Flush()
  225. if err = csvWriter.Error(); err != nil {
  226. log.Error("csvWriter.Error(%v)", err)
  227. return
  228. }
  229. bs = buf.Bytes()
  230. if len(bs) == 0 {
  231. err = ecode.NothingFound
  232. }
  233. return
  234. }
  235. func ipsToInts(ips string) (ipInts []int64) {
  236. ipStrs := strings.Split(ips, ",")
  237. ipInts = make([]int64, 0, len(ipStrs))
  238. for _, ipStr := range ipStrs {
  239. ipInts = append(ipInts, ipToInt(ipStr))
  240. }
  241. return
  242. }
  243. func ipToInt(ip string) (ipInt int64) {
  244. ret := big.NewInt(0)
  245. if net.ParseIP(ip) == nil {
  246. return
  247. }
  248. ret.SetBytes(net.ParseIP(ip).To4())
  249. return ret.Int64()
  250. }