engine.go 30 KB


  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "reflect"
  7. "sort"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "go-common/app/admin/main/aegis/model"
  12. "go-common/app/admin/main/aegis/model/business"
  13. "go-common/app/admin/main/aegis/model/common"
  14. "go-common/app/admin/main/aegis/model/net"
  15. "go-common/app/admin/main/aegis/model/resource"
  16. taskmod "go-common/app/admin/main/aegis/model/task"
  17. uprpc "go-common/app/service/main/up/api/v1"
  18. "go-common/library/ecode"
  19. "go-common/library/log"
  20. "go-common/library/sync/errgroup"
  21. "go-common/library/xstr"
  22. )
  23. // ListBizFlow .
  24. func (s *Service) ListBizFlow(c context.Context, tp int8, bizID []int64, flowID []int64) (list []*business.BizItem, err error) {
  25. // 1. 获取每个类型的业务
  26. list = []*business.BizItem{}
  27. res, err := s.gorm.BusinessList(c, tp, bizID, true)
  28. if err != nil {
  29. err = ecode.AegisBusinessCfgErr
  30. return
  31. }
  32. if len(res) == 0 {
  33. return
  34. }
  35. bizMap := map[int64]*business.Business{}
  36. accessBiz := []int64{}
  37. for _, item := range res {
  38. bizMap[item.ID] = item
  39. accessBiz = append(accessBiz, item.ID)
  40. }
  41. //获取指定业务下的可分发流程节点
  42. var bizFlow map[int64]map[int64]string
  43. if bizFlow, err = s.dispatchFlow(c, accessBiz, flowID); err != nil {
  44. return
  45. }
  46. for bizID, item := range bizFlow {
  47. if bizMap[bizID] == nil {
  48. continue
  49. }
  50. bizItem := &business.BizItem{
  51. BizID: bizID,
  52. BizName: bizMap[bizID].Name,
  53. BizType: bizMap[bizID].TP,
  54. Flows: item,
  55. }
  56. list = append(list, bizItem)
  57. }
  58. sort.Sort(business.BizItemArr(list))
  59. return
  60. }
  61. // Next reveive next auditing task
  62. func (s *Service) Next(c context.Context, opt *taskmod.NextOptions) (infos []*model.AuditInfo, err error) {
  63. log.Info("Next opt(%+v)", opt)
  64. if opt.Debug == 1 {
  65. info := model.GetEmptyInfo()
  66. infos = append(infos, info)
  67. return
  68. }
  69. // 1. TODO: 根据业务 动态判定dispatch_count和seize_count
  70. /*
  71. 获取任务后,要根据资源状态判断释放需要审核,不用审核的直接关任务。 再次循环获取
  72. */
  73. var (
  74. tasks []*taskmod.Task
  75. )
  76. for i := 0; i < 3; i++ {
  77. tasks, _, err = s.NextTask(c, opt)
  78. if err != nil {
  79. log.Error("NextTask err(%v)", err)
  80. err = ecode.AegisTaskErr
  81. return
  82. }
  83. if len(tasks) == 0 {
  84. log.Info("NextTask empty task")
  85. return
  86. }
  87. for _, task := range tasks {
  88. log.Info("Next task(%+v)", task)
  89. info, err := s.auditInfoByTask(c, task, &opt.BaseOptions)
  90. if err != nil {
  91. err = nil
  92. continue
  93. }
  94. infos = append(infos, info)
  95. }
  96. if len(infos) > 0 {
  97. break
  98. }
  99. }
  100. return
  101. }
  102. // InfoTask .
  103. func (s *Service) InfoTask(c context.Context, opt *common.BaseOptions, taskid int64) (info *model.AuditInfo, err error) {
  104. task, err := s.Task(c, taskid)
  105. if err != nil || task == nil {
  106. err = ecode.AegisTaskErr
  107. return
  108. }
  109. return s.auditInfoByTask(c, task, opt)
  110. }
  111. // ListByTask 任务列表
  112. func (s *Service) ListByTask(c context.Context, opt *taskmod.ListOptions) (list []*model.ListTaskItem, err error) {
  113. log.Info("ListByTask opt(%+v)", opt)
  114. var (
  115. tasks []*taskmod.Task
  116. count int64
  117. )
  118. if opt.Debug == 1 {
  119. list = []*model.ListTaskItem{{}}
  120. opt.Total = 1
  121. } else {
  122. tasks, count, err = s.ListTasks(c, opt)
  123. if err != nil {
  124. log.Error("ListTasks err(%v)", err)
  125. err = ecode.AegisTaskErr
  126. return
  127. }
  128. if len(tasks) == 0 {
  129. return
  130. }
  131. opt.Total = int(count)
  132. }
  133. for _, task := range tasks {
  134. item := &model.ListTaskItem{}
  135. item.Task = task
  136. if int(time.Since(task.Gtime.Time()).Minutes()) < 10 {
  137. item.GTstr = common.WaitTime(task.Gtime.Time())
  138. }
  139. item.CTstr = task.Ctime.Time().Format("2006-01-02 15:04:05")
  140. item.MTstr = task.Mtime.Time().Format("2006-01-02 15:04:05")
  141. item.WaitTime = common.WaitTime(task.Ctime.Time())
  142. ids, _ := xstr.SplitInts(task.Group)
  143. item.UserGroup = s.getUserGroup(c, ids)
  144. list = append(list, item)
  145. }
  146. // 补充oid,content
  147. s.mulIDtoName(c, list, s.gorm.ListHelperForTask, "RID", "OID", "Content", "Metas")
  148. // 补充user_info,user_group
  149. s.mulIDtoName(c, list, s.listHelpUser, "MID", "UserGroup", "UserInfo")
  150. // 补充uname
  151. s.mulIDtoName(c, list, s.transUnames, "UID", "UserName")
  152. // 将mid替换为昵称 或者 cuser
  153. for _, item := range list {
  154. if item.MID != 0 && item.UserInfo != nil {
  155. item.MidStr = item.UserInfo.Name
  156. } else if item.Metas != nil {
  157. if val, ok := item.Metas["cuser"]; ok {
  158. item.MidStr = fmt.Sprint(val)
  159. }
  160. }
  161. }
  162. return
  163. }
  164. // InfoResource .
  165. func (s *Service) InfoResource(c context.Context, opt *common.BaseOptions) (info *model.AuditInfo, err error) {
  166. if opt.Debug == 1 {
  167. return model.GetEmptyInfo(), nil
  168. }
  169. // 根据oid查资源
  170. rsc, err := s.gorm.ResByOID(c, opt.BusinessID, opt.OID)
  171. if err != nil || rsc == nil {
  172. err = ecode.AegisResourceErr
  173. return
  174. }
  175. return s.auditInfoByRsc(c, rsc, opt.NetID)
  176. }
  177. // ListByResource 资源列表
  178. func (s *Service) ListByResource(c context.Context, arg *model.SearchParams) (columns []*model.Column, list []*model.ListRscItem, op []*net.TranOperation, err error) {
  179. if arg.Debug == 1 {
  180. arg.Total = 1
  181. return []*model.Column{}, []*model.ListRscItem{model.EmptyListItem()}, []*net.TranOperation{{}}, nil
  182. }
  183. // 搜索返回资源信息
  184. var sres *model.SearchRes
  185. if sres, err = s.http.ResourceES(c, arg); err != nil || sres == nil {
  186. err = ecode.AegisSearchErr
  187. return
  188. }
  189. arg.Total = sres.Page.Total
  190. if arg.FilterOff {
  191. list = sres.Resources
  192. } else {
  193. list = s.listParseState(c, arg.State, sres.Resources)
  194. }
  195. // 补充粉丝数和分组
  196. if len(list) > 0 {
  197. g, _ := errgroup.WithContext(c)
  198. g.Go(func() error {
  199. s.mulIDtoName(c, list, s.listHelpUser, "MID", "UserGroup", "UserInfo")
  200. return nil
  201. })
  202. g.Go(func() error {
  203. s.listHightLight(c, arg.BusinessID, list)
  204. return nil
  205. })
  206. g.Go(func() error {
  207. s.mulIDtoName(c, list, s.listMetas, "ID", "MetaData", "Metas")
  208. return nil
  209. })
  210. g.Wait()
  211. }
  212. columns = s.getColumns(c, arg.BusinessID)
  213. // 批量操作项
  214. op, err = s.fetchBatchOperations(c, arg.BusinessID, 0)
  215. return
  216. }
  217. // listMetas 补充列表里面的 metadata metas
  218. func (s *Service) listMetas(c context.Context, ids []int64) (res map[int64][]interface{}, err error) {
  219. res = make(map[int64][]interface{})
  220. var metas map[int64]string
  221. if metas, err = s.gorm.MetaByRID(c, ids); err != nil {
  222. return
  223. }
  224. for id, meta := range metas {
  225. mmeta := make(map[string]interface{})
  226. if len(meta) > 0 {
  227. if err = json.Unmarshal([]byte(meta), &mmeta); err != nil {
  228. log.Error("listMetas json.Unmarshal error(%v)", err)
  229. err = nil
  230. }
  231. }
  232. res[id] = []interface{}{meta, mmeta}
  233. }
  234. return
  235. }
  236. //状态筛选,防止搜索列表更新不及时
  237. func (s *Service) listParseState(c context.Context, state int64, list []*model.ListRscItem) (hitlist []*model.ListRscItem) {
  238. var arrids []int64
  239. for _, item := range list {
  240. arrids = append(arrids, item.ID)
  241. }
  242. hitids, err := s.gorm.ResourceHit(c, arrids)
  243. if err != nil {
  244. return list
  245. }
  246. //搜索待审列表时,过滤掉已被领取的任务,避免提交冲突
  247. taskhitids, _ := s.gorm.TaskHitAuditing(c, arrids)
  248. for _, item := range list {
  249. if _, ok := taskhitids[item.ID]; ok {
  250. continue
  251. }
  252. if st, ok := hitids[item.ID]; ok {
  253. if state == -12345 || (state == st) {
  254. item.State = st
  255. hitlist = append(hitlist, item)
  256. }
  257. }
  258. }
  259. return
  260. }
  261. // listHelpUser 补充列表里面的 user_info user_group
  262. func (s *Service) listHelpUser(c context.Context, mids []int64) (res map[int64][]interface{}, err error) {
  263. res = make(map[int64][]interface{})
  264. //mids去零
  265. for i, v := range mids {
  266. if v > 0 {
  267. continue
  268. }
  269. if i == len(mids)-1 {
  270. mids = mids[:i]
  271. } else {
  272. mids = append(mids[:i], mids[i+1:]...)
  273. }
  274. }
  275. if len(mids) == 0 {
  276. return
  277. }
  278. infos, err := s.rpc.UserInfos(c, mids)
  279. if err != nil {
  280. infos = make(map[int64]*model.UserInfo)
  281. }
  282. upspecials, err := s.rpc.UpsSpecial(c, mids)
  283. if err != nil {
  284. upspecials = make(map[int64]*uprpc.UpSpecial)
  285. }
  286. for _, mid := range mids {
  287. gids := []int64{}
  288. if gs, ok := upspecials[mid]; ok {
  289. gids = gs.GroupIDs
  290. }
  291. res[mid] = []interface{}{s.getUserGroup(c, gids), infos[mid]}
  292. }
  293. log.Info("listRscHelper res(%+v)", res)
  294. return
  295. }
  296. /*
  297. 避免超时,控制文本长度小于3000
  298. 过滤相同的文本,减少不必要请求
  299. */
  300. func (s *Service) listHightLight(c context.Context, bizid int64, list []*model.ListRscItem) {
  301. var (
  302. area string
  303. err error
  304. )
  305. if area = s.getConfig(c, bizid, business.TypeFiler); len(area) == 0 {
  306. log.Warn("sigleHightLight(%d) 没有文本高亮配置(%v)", bizid, err)
  307. return
  308. }
  309. arrcontent := []string{}
  310. for _, item := range list {
  311. arrcontent = append(arrcontent, item.Content)
  312. }
  313. arrset := stringset(arrcontent)
  314. hits, err := s.concurrentHightList(c, area, arrset)
  315. if err != nil {
  316. log.Error("listHightLight error(%v)", err)
  317. return
  318. }
  319. hitset := stringset(hits)
  320. for _, item := range list {
  321. item.Hit = hitset
  322. }
  323. }
  324. func (s *Service) concurrentHightList(c context.Context, area string, mapset []string) (hits []string, err error) {
  325. var eg errgroup.Group
  326. msgs := joinstr(mapset, "msg=", 3000)
  327. for _, msg := range msgs {
  328. var m = msg
  329. eg.Go(func() error {
  330. var (
  331. e error
  332. hit []string
  333. )
  334. hit, e = s.http.FilterMulti(context.Background(), area, m)
  335. hits = append(hits, hit...)
  336. return e
  337. })
  338. }
  339. err = eg.Wait()
  340. return
  341. }
  342. func (s *Service) sigleHightLight(c context.Context, bizid int64, content string) (hit []string) {
  343. var (
  344. area string
  345. err error
  346. )
  347. if area = s.getConfig(c, bizid, business.TypeFiler); len(area) == 0 {
  348. log.Warn("sigleHightLight(%d) 没有文本高亮配置(%v)", bizid, err)
  349. return
  350. }
  351. hits, err := s.http.FilterMulti(c, area, "msg="+content)
  352. if err != nil {
  353. log.Error("sigleHightLight error(%v)", err)
  354. return
  355. }
  356. return hits
  357. }
  358. func (s *Service) getColumns(c context.Context, bizid int64) (columns []*model.Column) {
  359. cfg := s.getConfig(c, bizid, business.TypeRscListAdapter)
  360. if len(cfg) == 0 {
  361. log.Warn("getColumns empty config")
  362. return
  363. }
  364. if err := json.Unmarshal([]byte(cfg), &columns); err != nil {
  365. log.Error("getColumns err(%v)", err)
  366. }
  367. return
  368. }
  369. //Submit .
  370. func (s *Service) Submit(c context.Context, opt *model.SubmitOptions) (err error) {
  371. log.Info("Pre submit(%+v)", opt)
  372. // 1. 获取flow流转结果
  373. result, err := s.computeTriggerResult(c, opt.RID, opt.FlowID, opt.Binds)
  374. if err != nil {
  375. return err
  376. }
  377. esupsert, err := s.submit(c, "submit", opt, result)
  378. if err != nil {
  379. return err
  380. }
  381. //更新es
  382. s.http.UpsertES(c, []*model.UpsertItem{esupsert})
  383. return
  384. }
  385. func (s *Service) submit(c context.Context, action string, opt *model.SubmitOptions, result *net.TriggerResult) (esupsert *model.UpsertItem, err error) {
  386. ormTx, err := s.gorm.BeginTx(c)
  387. if err != nil {
  388. log.Error("tx error(%v)", err)
  389. return nil, ecode.ServerErr
  390. }
  391. var (
  392. rsc *resource.Resource
  393. mid, ouid, otaskid int64
  394. ostate int8
  395. taskstate int8
  396. )
  397. if rsc, err = s.gorm.ResourceByOID(c, opt.OID, opt.BusinessID); err != nil || rsc == nil {
  398. ormTx.Rollback()
  399. err = ecode.AegisResourceErr
  400. return
  401. }
  402. if rsc.ID != opt.RID {
  403. ormTx.Rollback()
  404. err = ecode.AegisResourceErr
  405. return
  406. }
  407. mid = rsc.MID
  408. // 2. 更新resource 和 resource_result
  409. if err = s.TxUpdateResource(ormTx, opt.RID, result.ResultToken.Values, opt.Result); err != nil {
  410. log.Error("submit TxUpdateResource(%v)", err)
  411. ormTx.Rollback()
  412. err = ecode.AegisResourceErr
  413. return
  414. }
  415. log.Info("submit TxUpdateResource success")
  416. // 3. 更新task
  417. if result.ResultToken.HitAudit && opt.TaskID > 0 {
  418. taskstate = taskmod.TaskStateSubmit
  419. if action == "batch" { // 目前的接口我们区分不了 资源提交还是任务提交
  420. taskstate = taskmod.TaskStateRscSb
  421. }
  422. // TODO 看来还是要把资源提交和任务提交分开。 资源提交不能确定要关的是哪个任务
  423. if ostate, otaskid, ouid, err = s.TxSubmitTask(c, ormTx, &opt.BaseOptions, taskstate); err != nil {
  424. log.Error("submit TxSubmitTask(%v)", err)
  425. ormTx.Rollback()
  426. err = ecode.AegisTaskErr
  427. return
  428. }
  429. log.Info("submit TxSubmitTask success")
  430. if otaskid != opt.TaskID {
  431. log.Warn("submit different taskid(%d-->%d)", opt.TaskID, otaskid)
  432. opt.TaskID = otaskid
  433. }
  434. }
  435. if result.ResultToken.HitAudit {
  436. // 4. 更新flow
  437. if err = s.reachNewFlowDB(c, ormTx, result); err != nil {
  438. log.Error("submit reachNewFlowDB(%v)", err)
  439. ormTx.Rollback()
  440. return
  441. }
  442. log.Info("submit reachNewFlowDB success")
  443. opt.Result.State, _ = strconv.Atoi(fmt.Sprint(result.ResultToken.Values["state"]))
  444. }
  445. // 5. 更新业务
  446. if err = s.syncResource(c, opt, mid, result.ResultToken); err != nil {
  447. log.Error("submit syncResource(%v)", err)
  448. ormTx.Rollback()
  449. return
  450. }
  451. log.Info("submit syncResource success")
  452. if err = ormTx.Commit().Error; err != nil {
  453. ormTx.Rollback()
  454. return
  455. }
  456. // 6. 任务缓存更新
  457. if result.ResultToken.HitAudit && opt.TaskID > 0 {
  458. s.submitTaskCache(c, &opt.BaseOptions, ostate, otaskid, ouid)
  459. }
  460. // 7. 任务流转
  461. if err = s.afterReachNewFlow(c, result, opt.BusinessID); err != nil {
  462. log.Error("submit afterReachNewFlow(%v)", err)
  463. return
  464. }
  465. log.Info("submit afterReachNewFlow success")
  466. // 8. 查询下最终数据库的结果,记录
  467. res, err := s.gorm.ResourceRes(c, opt.RID)
  468. if err != nil || res == nil {
  469. return
  470. }
  471. //8. 记录日志
  472. if opt.Result != nil {
  473. opt.Result.State = int(res.State)
  474. }
  475. s.logSubmit(c, action, opt, result)
  476. //9.更新es
  477. esupsert = &model.UpsertItem{
  478. ID: res.ID,
  479. State: int(res.State),
  480. Extra1: res.Extra1,
  481. Extra2: res.Extra2,
  482. Extra3: res.Extra3,
  483. Extra4: res.Extra4,
  484. }
  485. return
  486. }
  487. func (s *Service) logSubmit(c context.Context, action string, opt *model.SubmitOptions, res interface{}) {
  488. s.async.Do(c, func(ctx context.Context) {
  489. // 1. 操作日志
  490. s.sendAuditLog(ctx, action, opt, res, model.LogTypeAuditSubmit)
  491. // 2. resource日志
  492. s.sendRscSubmitLog(ctx, action, opt, res)
  493. })
  494. }
  495. // JumpFlow 跳流程提交
  496. func (s *Service) JumpFlow(c context.Context, opt *model.SubmitOptions) (err error) {
  497. var (
  498. ostate int8
  499. ouid, otaskid int64
  500. esupsert *model.UpsertItem
  501. )
  502. ormTx, err := s.gorm.BeginTx(c)
  503. if err != nil {
  504. log.Error("tx error(%v)", err)
  505. return ecode.ServerErr
  506. }
  507. // 1. 提交结果到flow
  508. result, err := s.jumpFlow(c, ormTx, opt.RID, opt.FlowID, opt.NewFlowID, opt.Binds)
  509. if err != nil {
  510. ormTx.Rollback()
  511. return
  512. }
  513. // 2. 更新task
  514. if result.ResultToken.HitAudit && opt.TaskID > 0 { //资源列表直接提交可能没有任务信息
  515. if ostate, otaskid, ouid, err = s.TxSubmitTask(c, ormTx, &opt.BaseOptions, taskmod.TaskStateClosed); err != nil {
  516. ormTx.Rollback()
  517. err = ecode.AegisTaskErr
  518. return
  519. }
  520. if otaskid != opt.TaskID {
  521. log.Warn("submit different taskid(%d-->%d)", opt.TaskID, otaskid)
  522. opt.TaskID = otaskid
  523. }
  524. }
  525. if result.SubmitToken != nil {
  526. var (
  527. rsc *resource.Resource
  528. mid int64
  529. )
  530. if rsc, err = s.gorm.ResourceByOID(c, opt.OID, opt.BusinessID); err != nil || rsc == nil {
  531. ormTx.Rollback()
  532. err = ecode.AegisResourceErr
  533. return
  534. }
  535. if rsc.ID != opt.RID {
  536. ormTx.Rollback()
  537. err = ecode.AegisResourceErr
  538. return
  539. }
  540. mid = rsc.MID
  541. // 3. 更新resource 和 resource_result
  542. if err = s.TxUpdateResource(ormTx, opt.RID, result.ResultToken.Values, opt.Result); err != nil {
  543. ormTx.Rollback()
  544. err = ecode.AegisResourceErr
  545. return
  546. }
  547. // 4. 更新业务
  548. if err = s.syncResource(c, opt, mid, result.ResultToken); err != nil {
  549. ormTx.Rollback()
  550. err = ecode.AegisBusinessSyncErr
  551. return
  552. }
  553. }
  554. if err = ormTx.Commit().Error; err != nil {
  555. return
  556. }
  557. // 5. 任务缓存更新
  558. if result.ResultToken.HitAudit && opt.TaskID > 0 {
  559. s.submitTaskCache(c, &opt.BaseOptions, ostate, otaskid, ouid)
  560. }
  561. // 6. 任务流转
  562. if result.ResultToken.HitAudit {
  563. s.afterJumpFlow(c, result, opt.BusinessID)
  564. }
  565. rscr, err := s.gorm.ResourceRes(c, opt.RID)
  566. if err != nil || rscr != nil {
  567. return
  568. }
  569. //7. 记录日志
  570. if opt.Result != nil {
  571. opt.Result.State = int(rscr.State)
  572. }
  573. s.logSubmit(c, "jump", opt, result)
  574. //8.更新es
  575. esupsert = &model.UpsertItem{
  576. ID: rscr.ID,
  577. State: int(rscr.State),
  578. Extra1: rscr.Extra1,
  579. Extra2: rscr.Extra2,
  580. Extra3: rscr.Extra3,
  581. Extra4: rscr.Extra4,
  582. }
  583. s.http.UpsertES(c, []*model.UpsertItem{esupsert})
  584. return
  585. }
  586. // BatchSubmit 批量提交, 超过10个的做异步
  587. func (s *Service) BatchSubmit(c context.Context, opt *model.BatchOption) (tip *model.Tip, err error) {
  588. tip = &model.Tip{Fail: make(map[int64]string)}
  589. if len(opt.RIDs) > 10 {
  590. go s.processBatch(context.Background(), opt.RIDs[10:], opt, nil)
  591. rids := opt.RIDs[:10]
  592. s.processBatch(c, rids, opt, tip)
  593. tip.Async = append(tip.Async, opt.RIDs[10:]...)
  594. } else {
  595. s.processBatch(c, opt.RIDs, opt, tip)
  596. }
  597. return
  598. }
  599. func (s *Service) processBatch(c context.Context, rids []int64, opt *model.BatchOption, tip *model.Tip) {
  600. var (
  601. err error
  602. esupdate *model.UpsertItem
  603. esupdates = []*model.UpsertItem{}
  604. )
  605. for _, rid := range rids {
  606. var (
  607. taskid int64
  608. oid string
  609. )
  610. if oid, err = s.gorm.OidByRID(c, rid); err != nil {
  611. if tip != nil {
  612. tip.Fail[rid] = err.Error()
  613. }
  614. continue
  615. }
  616. res, err := s.computeBatchTriggerResult(c, opt.BusinessID, rid, opt.Binds)
  617. if err != nil {
  618. if tip != nil {
  619. tip.Fail[rid] = ecode.Cause(err).Message()
  620. }
  621. continue
  622. }
  623. if task, _ := s.gorm.TaskByRID(c, rid, 0); task != nil {
  624. taskid = task.ID
  625. }
  626. smtOpt := &model.SubmitOptions{
  627. EngineOption: model.EngineOption{
  628. BaseOptions: common.BaseOptions{
  629. BusinessID: opt.BusinessID,
  630. OID: oid,
  631. UID: opt.UID,
  632. RID: rid,
  633. Uname: opt.Uname,
  634. },
  635. Result: &resource.Result{
  636. Attribute: -1, // -1表示不更新
  637. ReasonID: opt.ReasonID,
  638. RejectReason: opt.RejectReason,
  639. },
  640. TaskID: taskid,
  641. ExtraData: map[string]interface{}{
  642. "notify": opt.Notify,
  643. },
  644. },
  645. Binds: opt.Binds,
  646. }
  647. if esupdate, err = s.submit(c, "batch", smtOpt, res); err != nil {
  648. if tip != nil {
  649. tip.Fail[rid] = ecode.Cause(err).Message()
  650. }
  651. } else {
  652. if tip != nil {
  653. tip.Success = append(tip.Success, rid)
  654. }
  655. esupdates = append(esupdates, esupdate)
  656. }
  657. }
  658. //更新es
  659. s.http.UpsertES(c, esupdates)
  660. }
  661. // Add 业务方添加资源
  662. func (s *Service) Add(c context.Context, opt *model.AddOption) (err error) {
  663. var res *net.TriggerResult
  664. defer func() {
  665. // 5. 记录资源添加日志
  666. s.sendRscLog(c, "add", opt, res, nil, err)
  667. }()
  668. business.AdaptAddOpt(opt, s.getAdapter(c, opt.BusinessID))
  669. if b, _ := s.gorm.Business(c, opt.BusinessID); b == nil || b.State != 0 {
  670. err = ecode.AegisBusinessSyncErr
  671. return
  672. }
  673. var rid int64
  674. ormTx, err := s.gorm.BeginTx(c)
  675. if err != nil {
  676. log.Error("tx error(%v)", err)
  677. return ecode.ServerErr
  678. }
  679. // 2. 根据net_id,计算是否可添加
  680. res, err = s.startNet(c, opt.BusinessID, opt.NetID)
  681. if err != nil {
  682. ormTx.Rollback()
  683. return
  684. }
  685. // 3. 事务新增资源
  686. rsc := &resource.Result{State: opt.State}
  687. if res.ResultToken != nil {
  688. if state, ok := res.ResultToken.Values["state"]; ok {
  689. rsc.State, _ = strconv.Atoi(fmt.Sprint(state))
  690. }
  691. }
  692. rid, err = s.TxAddResource(ormTx, &opt.Resource, rsc)
  693. if err != nil {
  694. ormTx.Rollback()
  695. err = ecode.AegisResourceErr
  696. return
  697. }
  698. res.RID = rid
  699. // 4. 事务新增flow,根据创建rid
  700. if err = s.reachNewFlowDB(c, ormTx, res); err != nil {
  701. ormTx.Rollback()
  702. return
  703. }
  704. if err = ormTx.Commit().Error; err != nil {
  705. log.Error("Commit opt(%+v) error(%v)", opt, err)
  706. return
  707. }
  708. go func() {
  709. s.afterReachNewFlow(context.TODO(), res, opt.BusinessID)
  710. // 1. 操作日志
  711. s.sendAuditLog(context.TODO(), "add", &model.SubmitOptions{
  712. EngineOption: model.EngineOption{
  713. BaseOptions: common.BaseOptions{
  714. BusinessID: opt.BusinessID,
  715. FlowID: res.NewFlowID,
  716. RID: rid,
  717. UID: 399,
  718. Uname: "业务方",
  719. },
  720. Result: rsc,
  721. },
  722. }, res, model.LogTypeAuditAdd)
  723. }()
  724. return
  725. }
  726. // Update 业务方修改资源参数
  727. func (s *Service) Update(c context.Context, opt *model.UpdateOption) (err error) {
  728. var rows int64
  729. business.AdaptUpdateOpt(opt, s.getAdapter(c, opt.BusinessID))
  730. for key, val := range opt.Update {
  731. if _, ok := model.UpdateKeys[key]; !ok {
  732. delete(opt.Update, key)
  733. continue
  734. }
  735. switch key {
  736. case "extra1", "extra2", "extra3", "extra4", "extra5", "extra6":
  737. if reflect.TypeOf(val).Kind() != reflect.Float64 && reflect.TypeOf(val).Kind() != reflect.Int {
  738. return ecode.RequestErr
  739. }
  740. case "extra1s", "extra2s", "extra3s", "extra4s":
  741. if reflect.TypeOf(val).Kind() != reflect.String {
  742. return ecode.RequestErr
  743. }
  744. case "extratime1", "octime", "ptime": //时间类型不校验了
  745. case "metadata": //如果是map,json转化为字符串
  746. if reflect.TypeOf(val).Kind() == reflect.Map {
  747. var bs []byte
  748. if bs, err = json.Marshal(val); err != nil {
  749. return ecode.RequestErr
  750. }
  751. opt.Update["metadata"] = string(bs)
  752. }
  753. }
  754. }
  755. if len(opt.Update) == 0 {
  756. err = ecode.RequestErr
  757. } else {
  758. rows, err = s.gorm.UpdateResource(c, opt.BusinessID, opt.OID, opt.Update)
  759. }
  760. if rows > 0 || err != nil {
  761. s.sendRscLog(c, "update", &model.AddOption{
  762. Resource: resource.Resource{
  763. BusinessID: opt.BusinessID,
  764. OID: opt.OID,
  765. }, NetID: opt.NetID,
  766. }, nil, opt.Update, err)
  767. }
  768. return
  769. }
  770. //CancelByOper 手动删除任务
  771. func (s *Service) CancelByOper(c context.Context, businessID int64, oids []string, uid int64, username string) (err error) {
  772. if err = s.Cancel(c, businessID, oids, uid, username); err != nil {
  773. return
  774. }
  775. //upsert es
  776. go func(businessID int64, oids []string) {
  777. var (
  778. esupserts []*model.UpsertItem
  779. ctx = context.TODO()
  780. )
  781. if esupserts, err = s.gorm.UpsertByOIDs(ctx, businessID, oids); err != nil || len(esupserts) == 0 {
  782. return
  783. }
  784. s.http.UpsertES(ctx, esupserts)
  785. }(businessID, oids)
  786. return
  787. }
  788. // Cancel 取消相关资源的所有流程
  789. func (s *Service) Cancel(c context.Context, businessID int64, oids []string, uid int64, username string) (err error) {
  790. var (
  791. dstate int
  792. ridFlow map[int64]string
  793. )
  794. defer func() {
  795. // 5. 记录资源注销日志
  796. s.sendRscCancleLog(c, businessID, oids, uid, username, err)
  797. }()
  798. ridstr, err := s.gorm.RidsByOids(c, businessID, oids)
  799. if err != nil {
  800. err = nil
  801. return
  802. }
  803. rids, err := xstr.SplitInts(ridstr)
  804. if err != nil || len(rids) == 0 {
  805. err = nil
  806. return
  807. }
  808. ormTx, err := s.gorm.BeginTx(c)
  809. if err != nil {
  810. log.Error("tx error(%v)", err)
  811. return ecode.ServerErr
  812. }
  813. defer ormTx.Commit()
  814. // 1. 资源修改为已删除状态
  815. if dstate, err = s.TxDelResource(c, ormTx, businessID, rids); err != nil {
  816. ormTx.Rollback()
  817. err = ecode.AegisResourceErr
  818. return
  819. }
  820. // 2. 取消相关flow流程
  821. if ridFlow, err = s.cancelNet(c, ormTx, rids); err != nil {
  822. ormTx.Rollback()
  823. return
  824. }
  825. // 3. 取消相关task
  826. if err = s.gorm.TxCloseTasks(ormTx, rids, uid); err != nil {
  827. ormTx.Rollback()
  828. err = ecode.AegisTaskErr
  829. return
  830. }
  831. // 操作日志
  832. for _, rid := range rids {
  833. s.sendAuditLog(c, "cancel", &model.SubmitOptions{
  834. EngineOption: model.EngineOption{
  835. BaseOptions: common.BaseOptions{
  836. BusinessID: businessID,
  837. RID: rid,
  838. UID: uid,
  839. Uname: username,
  840. },
  841. Result: &resource.Result{
  842. State: dstate,
  843. },
  844. },
  845. }, &net.TriggerResult{
  846. OldFlowID: ridFlow[rid],
  847. }, model.LogTypeAuditCancel)
  848. }
  849. return
  850. }
  851. func (s *Service) closeTask(c context.Context, task *taskmod.Task) (err error) {
  852. if err = s.gorm.CloseTask(c, task.ID); err != nil {
  853. return
  854. }
  855. if task.UID > 0 {
  856. s.submitTaskCache(c, &common.BaseOptions{
  857. BusinessID: task.BusinessID,
  858. FlowID: task.FlowID,
  859. }, task.State, task.ID, task.UID)
  860. }
  861. return
  862. }
  863. // Upload to bfs
  864. func (s *Service) Upload(c context.Context, fileName string, fileType string, timing int64, body []byte) (location string, err error) {
  865. if len(body) == 0 {
  866. err = ecode.FileNotExists
  867. return
  868. }
  869. if location, err = s.http.Upload(c, fileName, fileType, timing, body); err != nil {
  870. log.Error("s.upload.Upload() error(%v)", err)
  871. }
  872. return
  873. }
  874. func (s *Service) auditInfoByTask(c context.Context, task *taskmod.Task, opt *common.BaseOptions) (info *model.AuditInfo, err error) {
  875. var (
  876. userinfo *model.UserInfo
  877. operhistorys, hit []string
  878. iframeurl string
  879. actions []*model.Action
  880. )
  881. // 1. 未完成的任务信息
  882. undoStat, _ := s.UnDoStat(c, opt)
  883. // 2. 获取业务信息 resource
  884. resource, err := s.ResourceRes(c, &resource.Args{RID: task.RID})
  885. if err != nil || resource == nil || !s.checkaudit(task.FlowID, resource.State) {
  886. log.Error("资源查找失败,删除任务 ResourceRes err(%v)", err)
  887. s.closeTask(c, task)
  888. err = ecode.AegisResourceErr
  889. return
  890. }
  891. // 3. 获取操作项 flow
  892. flow, err := s.fetchTaskTranInfo(c, task.RID, task.FlowID, opt.NetID)
  893. if err != nil {
  894. log.Error("fetchTransitionInfo(%d,%d) err(%v)", task.RID, task.FlowID, err)
  895. return
  896. }
  897. g, _ := errgroup.WithContext(c)
  898. // 4. 获取用户信息 account
  899. g.Go(func() error {
  900. userinfo, err = s.rpc.Profile(c, task.MID)
  901. if err != nil {
  902. log.Error("Profile(%d) err(%v)", task.MID, err)
  903. }
  904. return nil
  905. })
  906. // 5. 审核历史
  907. g.Go(func() error {
  908. operhistorys, err = s.auditLogByRID(c, task.RID)
  909. if err != nil {
  910. log.Error("AuditLog err(%v)", err)
  911. }
  912. return nil
  913. })
  914. // 6. iframe url
  915. g.Go(func() error {
  916. iframeurl = s.getConfig(c, task.BusinessID, business.TypeIframe)
  917. actions = s.getActions(c, task.BusinessID)
  918. var attrcfg map[string]uint
  919. if attrcfg, err = s.AttributeCFG(c, task.BusinessID); len(attrcfg) > 0 {
  920. resource.AttrParse(attrcfg)
  921. }
  922. resource.MetaParse()
  923. return nil
  924. })
  925. // 7. filter
  926. g.Go(func() error {
  927. hit = s.sigleHightLight(c, task.BusinessID, resource.Content)
  928. return nil
  929. })
  930. g.Wait()
  931. info = &model.AuditInfo{
  932. UnDoStat: undoStat,
  933. Task: task,
  934. Resource: resource,
  935. UserInfo: userinfo,
  936. Flow: flow,
  937. OperHistorys: operhistorys,
  938. IFrame: iframeurl,
  939. Actions: actions,
  940. Hit: hit,
  941. }
  942. if task.MID > 0 {
  943. s.getUserGroup(c, []int64{task.MID})
  944. }
  945. return
  946. }
  947. func (s *Service) auditInfoByRsc(c context.Context, rsc *resource.Res, netid int64) (info *model.AuditInfo, err error) {
  948. var (
  949. task *taskmod.Task
  950. userinfo *model.UserInfo
  951. flow *net.TransitionInfo
  952. iframeurl string
  953. attrcfg map[string]uint
  954. operhistorys, hit []string
  955. actions []*model.Action
  956. )
  957. g, _ := errgroup.WithContext(c)
  958. // 搜索未指定flow, 则检索task
  959. g.Go(func() error {
  960. task, err = s.gorm.TaskByRID(c, rsc.ID, 0)
  961. if err != nil {
  962. err = nil
  963. task = nil
  964. }
  965. return nil
  966. })
  967. // 1. 获取用户信息 account
  968. g.Go(func() error {
  969. if userinfo, err = s.rpc.Profile(c, rsc.MID); err != nil {
  970. log.Error("Profile(%d) err(%v)", rsc.MID, err)
  971. }
  972. return nil
  973. })
  974. // 2. 获取操作项 flow
  975. g.Go(func() error {
  976. flow, err = s.fetchResourceTranInfo(c, rsc.ID, rsc.BusinessID, netid)
  977. if err != nil {
  978. log.Error("fetchTransitionInfo(%d,%d,%d) err(%v)", rsc.ID, rsc.BusinessID, netid, err)
  979. }
  980. return nil
  981. })
  982. // 3. 审核历史
  983. g.Go(func() error {
  984. operhistorys, err = s.auditLogByRID(c, rsc.ID)
  985. if err != nil {
  986. log.Error("AuditLog(%d) err(%v)", rsc.ID, err)
  987. }
  988. return nil
  989. })
  990. // 4. iframe url
  991. g.Go(func() error {
  992. iframeurl = s.getConfig(c, rsc.BusinessID, business.TypeIframe)
  993. actions = s.getActions(c, rsc.BusinessID)
  994. if attrcfg, err = s.AttributeCFG(c, rsc.BusinessID); len(attrcfg) > 0 {
  995. rsc.AttrParse(attrcfg)
  996. }
  997. rsc.MetaParse()
  998. return nil
  999. })
  1000. // 5. filter
  1001. g.Go(func() error {
  1002. hit = s.sigleHightLight(c, rsc.BusinessID, rsc.Content)
  1003. return nil
  1004. })
  1005. g.Wait()
  1006. info = &model.AuditInfo{
  1007. Task: task,
  1008. Resource: rsc,
  1009. UserInfo: userinfo,
  1010. Flow: flow,
  1011. OperHistorys: operhistorys,
  1012. IFrame: iframeurl,
  1013. Actions: actions,
  1014. Hit: hit,
  1015. }
  1016. if rsc.MID > 0 {
  1017. if upspecial, _ := s.rpc.UpSpecial(c, rsc.MID); upspecial != nil {
  1018. info.UserGroup = s.getUserGroup(c, upspecial.GroupIDs)
  1019. }
  1020. }
  1021. return
  1022. }
  1023. func (s *Service) getActions(c context.Context, bizid int64) (actions []*model.Action) {
  1024. cfg := s.getConfig(c, bizid, business.TypeAction)
  1025. if len(cfg) == 0 {
  1026. log.Error("getActions(%d) empty", bizid)
  1027. return
  1028. }
  1029. if err := json.Unmarshal([]byte(cfg), &actions); err != nil {
  1030. log.Error("getActions(%d) error(%v)", bizid, err)
  1031. }
  1032. return
  1033. }
  1034. func (s *Service) getAdapter(c context.Context, bizid int64) (adps []*business.Adapter) {
  1035. cfg := s.getConfig(c, bizid, business.TypeAdapter)
  1036. if len(cfg) == 0 {
  1037. return
  1038. }
  1039. if err := json.Unmarshal([]byte(cfg), &adps); err != nil {
  1040. log.Error("getAdapter cfg(%s) err(%v)", cfg, err)
  1041. }
  1042. return
  1043. }
  1044. // Gray 灰度
  1045. func (s *Service) Gray(opt *model.AddOption) (next bool) {
  1046. if opt == nil {
  1047. return
  1048. }
  1049. //未配置,则默认全量
  1050. if len(s.gray[opt.BusinessID]) == 0 {
  1051. next = true
  1052. return
  1053. }
  1054. optval := reflect.ValueOf(opt).Elem()
  1055. opttp := optval.Type()
  1056. for _, opts := range s.gray[opt.BusinessID] {
  1057. //策略与策略之间or, 策略的fields之间and
  1058. okcnt := 0
  1059. for _, item := range opts {
  1060. f, exist := opttp.FieldByName(item.Name)
  1061. if !exist {
  1062. break
  1063. }
  1064. v := optval.FieldByIndex(f.Index)
  1065. if strings.Contains(item.Value, fmt.Sprintf(",%v,", v.Interface())) {
  1066. okcnt++
  1067. continue
  1068. }
  1069. }
  1070. if okcnt > 0 && okcnt == len(opts) {
  1071. next = true
  1072. return
  1073. }
  1074. }
  1075. return
  1076. }
  1077. //checkaudit 检查任务对应的资源是否需要审核,不需要的不下发
  1078. func (s *Service) checkaudit(flowid int64, state int64) bool {
  1079. if len(s.c.Auditstate) > 0 {
  1080. if states, ok := s.c.Auditstate[fmt.Sprint(flowid)]; ok {
  1081. if !strings.Contains(","+states+",", fmt.Sprint(state)) {
  1082. return false
  1083. }
  1084. }
  1085. }
  1086. return true
  1087. }
  1088. //Auth 用户权限查询
  1089. func (s *Service) Auth(c context.Context, uid int64) (a *model.Auth, err error) {
  1090. var (
  1091. roles []*taskmod.Role
  1092. )
  1093. a = &model.Auth{}
  1094. if s.Debug() == "local" {
  1095. a.OK = true
  1096. return
  1097. }
  1098. if s.IsAdmin(uid) {
  1099. a.OK = true
  1100. a.Admin = true
  1101. return
  1102. }
  1103. //查看业务级别&任务级别的绑定权限
  1104. bidBiz := map[int64]int64{}
  1105. //任务级别权限
  1106. for biz, bidFlows := range s.taskRoleCache {
  1107. for bid := range bidFlows {
  1108. bidBiz[bid] = biz
  1109. }
  1110. }
  1111. //业务级别权限
  1112. for biz, cfgs := range s.bizRoleCache {
  1113. if bid, exist := cfgs[business.BizBIDMngID]; exist && bid > 0 {
  1114. bidBiz[bid] = biz
  1115. }
  1116. }
  1117. log.Info("Auth bidbiz(%+v) \r\n taskrole(%+v) \r\n bizrole(%+v)", bidBiz, s.taskRoleCache, s.bizRoleCache)
  1118. //用户角色
  1119. if roles, err = s.http.GetUserRoles(c, uid); err != nil {
  1120. log.Error("Auth s.http.GetUserRoles(%d) error(%v)", uid, err)
  1121. return
  1122. }
  1123. a.Business = map[int64]int64{}
  1124. for _, item := range roles {
  1125. if bidBiz[item.BID] > 0 {
  1126. a.Business[item.BID] = bidBiz[item.BID]
  1127. }
  1128. }
  1129. a.OK = len(a.Business) > 0
  1130. return
  1131. }