net_control.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "strconv"
  7. "strings"
  8. "github.com/jinzhu/gorm"
  9. "go-common/app/admin/main/aegis/model"
  10. "go-common/app/admin/main/aegis/model/net"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. "go-common/library/xstr"
  14. )
  15. //启动流程网之前的检查
  16. func (s *Service) startNet(c context.Context, businessID int64, netID int64) (result *net.TriggerResult, err error) {
  17. var (
  18. n *net.Net
  19. resultTokenPackage *net.TokenPackage
  20. )
  21. //网可用性
  22. if n, err = s.netByID(c, netID); err != nil {
  23. log.Error("startNet s.netByID(%d) error(%v)", netID, err)
  24. return
  25. }
  26. if n == nil || !n.IsAvailable() || n.StartFlowID <= 0 {
  27. log.Error("startNet s.netByID(%d) not found/disabled/start_flow_id=0", netID)
  28. err = ecode.RequestErr
  29. return
  30. }
  31. if n.BusinessID != businessID {
  32. log.Error("startNet s.netByID(%d) business(%d) != param business(%d)", netID, n.BusinessID, businessID)
  33. err = ecode.RequestErr
  34. return
  35. }
  36. //初始节点的token集合
  37. if resultTokenPackage, err = s.computeNewFlowPackage(c, n.StartFlowID, nil, nil, true); err != nil {
  38. log.Error("startNet s.computeNewFlowPackage(%d) error(%+v) netid(%d)", n.StartFlowID, err, netID)
  39. return
  40. }
  41. result = &net.TriggerResult{
  42. NetID: netID,
  43. ResultToken: resultTokenPackage,
  44. NewFlowID: n.StartFlowID,
  45. From: model.LogFromStart,
  46. }
  47. return
  48. }
  49. /*FetchJumpFlowInfo ..
  50. * 跳流程详情页, 跳流程是单独db层面的更新
  51. * 参数:flowid(可选)
  52. * 获取网下所有flow和变迁的所有操作项
  53. */
  54. func (s *Service) FetchJumpFlowInfo(c context.Context, flowID int64) (res *net.JumpInfo, err error) {
  55. var (
  56. flow *net.Flow
  57. runningNet int64
  58. flowList []*net.Flow
  59. )
  60. res = &net.JumpInfo{
  61. Flows: []*net.SimpleInfo{},
  62. Operations: []*net.TranOperation{},
  63. }
  64. if flow, err = s.flowByID(c, flowID); err != nil {
  65. log.Error("FetchJumpFlowInfo s.flowByID(%d) error(%v)", flowID, err)
  66. return
  67. }
  68. if flow == nil {
  69. log.Error("FetchJumpFlowInfo flow(%d) not found", flowID)
  70. err = ecode.AegisFlowNotFound
  71. return
  72. }
  73. runningNet = flow.NetID
  74. if flowList, err = s.flowsByNet(c, runningNet); err != nil {
  75. log.Error("FetchJumpFlowInfo s.flowsByNet(%d) error(%v) flowid(%d)", runningNet, err, flowID)
  76. return
  77. }
  78. flowInfo := []*net.SimpleInfo{}
  79. for _, item := range flowList {
  80. flowInfo = append(flowInfo, &net.SimpleInfo{
  81. ID: item.ID,
  82. ChName: item.ChName,
  83. })
  84. }
  85. //网下的所有变迁可操作项
  86. if res.Operations, err = s.tranOpersByNet(c, []int64{runningNet}, []int8{net.BindTypeTransition}); err != nil {
  87. log.Error("FetchJumpFlowInfo s.tranOpersByNet(%d) error(%v) flowid(%d)", runningNet, err, flowID)
  88. return
  89. }
  90. res.Flows = flowInfo
  91. return
  92. }
  93. /**
  94. * 跳流程提交:rid, oldflowid, newflowid, binds
  95. * 可以单独提供flowid,单独提供binds
  96. * 单独提供flowid时,不做流程流转
  97. */
  98. func (s *Service) jumpFlow(c context.Context, tx *gorm.DB, rid int64, oldFlowID int64, newFlowID int64, binds []int64) (result *net.JumpFlowResult, err error) {
  99. var (
  100. frs []*net.FlowResource
  101. oldFlow, newFlow *net.Flow
  102. triggerResult *net.TriggerResult
  103. bindList []*net.TokenBind
  104. submitPackage *net.TokenPackage
  105. )
  106. log.Info("jumpFlow before rid(%d) oldflowid(%d) newflowid(%d) binds(%v)", rid, oldFlowID, newFlowID, binds)
  107. if rid <= 0 || oldFlowID <= 0 || (len(binds) == 0 && newFlowID <= 0) {
  108. err = ecode.RequestErr
  109. return
  110. }
  111. //检查资源是否运行在该节点
  112. if frs, err = s.gorm.FRByUniques(c, []int64{rid}, []int64{oldFlowID}, true); err != nil {
  113. log.Error("jumpFlow s.gorm.FRByUniques(%d,%d) error(%v)", rid, oldFlowID, err)
  114. return
  115. }
  116. if len(frs) == 0 {
  117. log.Error("jumpFlow rid(%d) not running at oldflowid(%d)", rid, oldFlowID)
  118. err = ecode.AegisNotRunInFlow
  119. return
  120. }
  121. if oldFlow, err = s.flowByID(c, oldFlowID); err != nil {
  122. log.Error("jumpFlow s.flowByID(%d) error(%v) rid(%d)", oldFlowID, err, rid)
  123. return
  124. }
  125. if oldFlow == nil {
  126. log.Error("jumpFlow oldflowid(%d) not found, rid(%d)", oldFlowID, rid)
  127. err = ecode.AegisFlowNotFound
  128. return
  129. }
  130. /**
  131. * jumpFlow 指定一批资源,更新为相同现状
  132. * 不需要前端指定现状flowid
  133. * 资源现状从单线跳到多线程的某个节点时----由运营决定是到sync-split节点还是并发分支上的某个节点------fr数目从1变成1/还是1变成n
  134. * 资源现状从多线程跳到某个节点时----节点在并发分支上,如何确定现状flow和新节点都在并发分支上----fr数目从1变成1
  135. * ----节点在单线上,结束所有运行中都并发现状,加一个单线分支---fr数目从n变成1
  136. */
  137. if len(binds) > 0 {
  138. if bindList, err = s.tokenBinds(c, binds, true); err != nil {
  139. log.Error("jumpFlow s.tokenBinds(%v) error(%v) rid(%d)", binds, err, rid)
  140. return
  141. }
  142. if len(bindList) == 0 {
  143. log.Error("jumpFlow binds(%v) not found, rid(%d)", binds, rid)
  144. err = ecode.RequestErr
  145. return
  146. }
  147. if submitPackage, err = s.newTokenPackage(c, bindList, true); err != nil {
  148. log.Error("jumpFlow s.newTokenPackage(%v) error(%v) rid(%d)", binds, err, rid)
  149. return
  150. }
  151. }
  152. if newFlowID > 0 && newFlowID != oldFlowID {
  153. if newFlow, err = s.flowByID(c, newFlowID); err != nil {
  154. log.Error("jumpFlow newflow(%d) error(%v) rid(%d)", newFlowID, err, rid)
  155. return
  156. }
  157. if newFlow == nil {
  158. log.Error("jumpFlow newflow(%d) not found, rid(%d)", newFlowID, rid)
  159. err = ecode.AegisFlowNotFound
  160. return
  161. }
  162. if oldFlow.NetID != newFlow.NetID {
  163. log.Error("jumpFlow rid(%d) run at oldflowid(%d)/net(%d), can't jump to newflowid(%d)/net(%d)", rid, oldFlowID, oldFlow.NetID, newFlowID, newFlow.NetID)
  164. err = ecode.RequestErr
  165. return
  166. }
  167. triggerResult = &net.TriggerResult{
  168. RID: rid,
  169. NetID: oldFlow.NetID,
  170. NewFlowID: newFlowID,
  171. OldFlowID: strconv.FormatInt(oldFlowID, 10),
  172. SubmitToken: submitPackage,
  173. ResultToken: submitPackage,
  174. }
  175. if err = s.reachNewFlowDB(c, tx, triggerResult); err != nil {
  176. log.Error("jumpFlow s.reachNewFlowDB error(%v) triggerresult(%+v)", err, triggerResult)
  177. return
  178. }
  179. } else {
  180. newFlowID = oldFlowID
  181. }
  182. result = &net.JumpFlowResult{
  183. RID: rid,
  184. NetID: oldFlow.NetID,
  185. SubmitToken: submitPackage,
  186. ResultToken: submitPackage,
  187. NewFlowID: newFlowID,
  188. OldFlowID: strconv.FormatInt(oldFlowID, 10),
  189. }
  190. return
  191. }
  192. func (s *Service) afterJumpFlow(c context.Context, res *net.JumpFlowResult, bizid int64) (err error) {
  193. trigger := &net.TriggerResult{
  194. RID: res.RID,
  195. NetID: res.NetID,
  196. SubmitToken: res.SubmitToken,
  197. ResultToken: res.ResultToken,
  198. NewFlowID: res.NewFlowID,
  199. OldFlowID: res.OldFlowID,
  200. From: model.LogFromJump,
  201. }
  202. s.afterReachNewFlow(c, trigger, bizid)
  203. return
  204. }
  205. /**
  206. * 批量审核提交--单个遍历处理
  207. * 参数:business, rid, binds
  208. * flowid(由rid反查现状)
  209. * transition_id(由flow_id查到的下游transition & token_bind集合 与 token_id的交集决定),非叶子节点若找不到则报错
  210. * 找到了,则计算新flow & result_token
  211. * 计算result_token规则:
  212. * 变迁trigger=人工,必须提供binds;
  213. * 变迁trigger=其他,binds可以为空 & (新flow要么有绑定token,要么有向线上output不为空)
  214. */
  215. func (s *Service) computeBatchTriggerResult(c context.Context, businessID int64, rid int64, bindID []int64) (result *net.TriggerResult, err error) {
  216. var (
  217. oldFlow *net.Flow
  218. )
  219. log.Info("computeBatchTriggerResult start business(%d) rid(%d) bindID(%+v)", businessID, rid, bindID)
  220. //反查指定资源所运行的节点
  221. if oldFlow, _, err = s.flowsByRunning(c, rid, businessID, 0, true); err != nil {
  222. return
  223. }
  224. if result, err = s.computeResult(c, rid, oldFlow, bindID, true); err != nil {
  225. log.Error("computeBatchTriggerResult s.computeResult error(%v) rid(%d) businessid(%d) bindID(%v) oldflow(%+v)", err, rid, businessID, bindID, oldFlow)
  226. return
  227. }
  228. result.From = model.LogFromBatch
  229. log.Info("computeBatchTriggerResult end business(%d) rid(%d) bindID(%+v) result(%+v)", businessID, rid, bindID, result)
  230. return
  231. }
  232. /**
  233. * 详情页审核提交
  234. * 参数:rid, flowid, binds
  235. * 当flowid=最后一个,不流转--binds可能为多个; 其余情况,均正常流转到下一个flow--binds只有一个
  236. * 计算result_token规则:
  237. * 变迁trigger=人工,必须提供binds;
  238. * 变迁trigger=其他,binds可以为空 & (新flow要么有绑定token,要么有向线上output不为空)
  239. */
  240. func (s *Service) computeTriggerResult(c context.Context, rid int64, oldFlowID int64, binds []int64) (result *net.TriggerResult, err error) {
  241. var (
  242. oldFlow *net.Flow
  243. frs []*net.FlowResource
  244. )
  245. log.Info("computeTriggerResult start rid(%d) oldflow(%d) binds(%+v)", rid, oldFlowID, binds)
  246. if rid <= 0 || oldFlowID <= 0 || len(binds) == 0 {
  247. err = ecode.RequestErr
  248. return
  249. }
  250. if frs, err = s.gorm.FRByUniques(c, []int64{rid}, []int64{oldFlowID}, true); err != nil {
  251. log.Error("computeTriggerResult s.gorm.FRByUniques(%d) error(%v) rid(%d) binds(%v)", oldFlowID, err, rid, binds)
  252. return
  253. }
  254. if len(frs) == 0 {
  255. log.Error("computeTriggerResult rid(%d) not running at oldflow(%d) binds(%v)", rid, oldFlowID, binds)
  256. err = ecode.AegisNotRunInFlow
  257. return
  258. }
  259. if oldFlow, err = s.flowByID(c, oldFlowID); err != nil {
  260. log.Error("computeTriggerResult s.flowByID(%d) error(%v) rid(%d) binds(%v)", oldFlowID, err, rid, binds)
  261. return
  262. }
  263. if oldFlow == nil {
  264. err = ecode.AegisFlowNotFound
  265. return
  266. }
  267. if result, err = s.computeResult(c, rid, oldFlow, binds, false); err != nil {
  268. log.Error("computeTriggerResult s.computeResult error(%v) binds(%v) flow(%+v) rid(%d)", err, binds, oldFlow, rid)
  269. return
  270. }
  271. result.From = model.LogFromSingle
  272. log.Info("computeTriggerResult end rid(%d) oldflow(%d) binds(%+v) result(%+v)", rid, oldFlowID, binds, result)
  273. return
  274. }
  275. func (s *Service) computeResult(c context.Context, rid int64, flow *net.Flow, bindID []int64, fromBatch bool) (result *net.TriggerResult, err error) {
  276. var (
  277. hitAudit, hitHelper bool //审核流转和非流转操作是互斥的,不能同时提交
  278. dirs []*net.Direction
  279. isLast bool
  280. triggerBind = []*net.TokenBind{}
  281. binds []*net.TokenBind
  282. trans []*net.Transition
  283. triggerTranID, newFlowID int64
  284. submitTokenPackage, resultTokenPackage *net.TokenPackage
  285. resultDir *net.Direction
  286. )
  287. //非叶子节点只能有一个bind
  288. if rid == 0 || flow == nil {
  289. log.Error("computeResult rid(%d)=0/flow(%+v)=ni", rid, flow)
  290. err = ecode.RequestErr
  291. return
  292. }
  293. if binds, err = s.tokenBinds(c, bindID, true); err != nil {
  294. log.Error("computeBatchTriggerResult s.tokenBinds(%+v) error(%v)", bindID, err)
  295. return
  296. }
  297. tranID := []int64{}
  298. for _, item := range binds {
  299. if item.IsBatch() != fromBatch {
  300. continue
  301. }
  302. tranID = append(tranID, item.ElementID)
  303. if (fromBatch && item.Type == net.BindTypeTranBatch) || (!fromBatch && item.Type == net.BindTypeTransition) {
  304. hitAudit = true
  305. }
  306. if (fromBatch && item.Type == net.BindTypeTranHelpBatch) || (!fromBatch && item.Type == net.BindTypeTranHelp) {
  307. hitHelper = true
  308. }
  309. }
  310. if len(binds) == 0 || (hitAudit && hitHelper) {
  311. log.Error("computeBatchTriggerResult binds(%v) not found/both hit audit && helper are not allowed", binds)
  312. err = ecode.RequestErr
  313. return
  314. }
  315. if hitAudit {
  316. //同网下、同flow下游的变迁过滤, 非叶子节点必须命中一个变迁
  317. if dirs, err = s.dirByFlow(c, []int64{flow.ID}, net.DirInput); err != nil {
  318. log.Error("computeResult s.dirByFlow(%d) error(%v)", flow.ID, err)
  319. return
  320. }
  321. isLast = len(dirs) == 0 //是否为叶子节点
  322. if !isLast {
  323. tranID = []int64{}
  324. for _, item := range dirs {
  325. tranID = append(tranID, item.TransitionID)
  326. }
  327. }
  328. }
  329. if trans, err = s.transitions(c, tranID, true); err != nil {
  330. log.Error("computeResult s.transitions(%v) error(%v) rid(%d) flow(%v)", tranID, err, rid, flow)
  331. return
  332. }
  333. tranMap := map[int64]*net.Transition{}
  334. for _, item := range trans {
  335. tranMap[item.ID] = item
  336. }
  337. triggerBindID := []int64{}
  338. triggerTrans := []int64{}
  339. for _, item := range binds {
  340. if tranMap[item.ElementID] == nil || tranMap[item.ElementID].Trigger != net.TriggerManual {
  341. continue
  342. }
  343. //叶子节点=同网过滤|非叶子节点=同flow下游变迁过滤
  344. if (isLast && tranMap[item.ElementID].NetID == flow.NetID) || !isLast {
  345. triggerBind = append(triggerBind, item)
  346. triggerTrans = append(triggerTrans, item.ElementID)
  347. triggerBindID = append(triggerBindID, item.ID)
  348. }
  349. }
  350. if len(triggerBind) == 0 || (!isLast && len(triggerBind) > 1) {
  351. log.Error("computeResult no triggered(%+v)/non-leaf-flow triggered >2, rid(%d) flow(%+v)", triggerBindID, rid, flow)
  352. err = ecode.AegisNotTriggerFlow
  353. return
  354. }
  355. //计算提交的变迁 & 提交令牌集合
  356. if submitTokenPackage, err = s.newTokenPackage(c, triggerBind, true); err != nil {
  357. log.Error("computeResult s.newTokenPackage error(%v)", err)
  358. return
  359. }
  360. if hitHelper || isLast {
  361. //不流转的情况
  362. newFlowID = flow.ID
  363. resultTokenPackage = submitTokenPackage
  364. } else {
  365. //中间节点找到下游新节点
  366. triggerTranID = triggerTrans[0]
  367. if resultDir, err = s.fetchTranNextEnableDirs(c, triggerTranID); err != nil {
  368. log.Error("computeResult s.fetchTranNextEnableDirs(%d) error(%v)", triggerTranID, err)
  369. return
  370. }
  371. newFlowID = resultDir.FlowID
  372. if resultTokenPackage, err = s.computeNewFlowPackage(c, newFlowID, resultDir, submitTokenPackage, false); err != nil {
  373. log.Error("computeResult s.computeNewFlowPackage error(%v)", err)
  374. return
  375. }
  376. }
  377. log.Info("submitpk(%+v) result(%+v)", submitTokenPackage, resultTokenPackage)
  378. result = &net.TriggerResult{
  379. RID: rid,
  380. NetID: flow.NetID,
  381. SubmitToken: submitTokenPackage,
  382. ResultToken: resultTokenPackage,
  383. NewFlowID: newFlowID,
  384. OldFlowID: strconv.FormatInt(flow.ID, 10),
  385. TransitionID: triggerTrans,
  386. }
  387. log.Info("computeResult end islast(%v) frombatch(%v) result(%+v)", isLast, fromBatch, result)
  388. return
  389. }
  390. /**
  391. * 资源审核详情页,获取运行资源所在网的节点下的所有可允许下游可操作项,去掉批量可操作项:
  392. * 若为网中的最后一个flow, 获取网中所有变迁的可操作项(bind级别去重)--提交时只改token,不改flow
  393. * 若为中间节点:只有一个(非并发),直接获取变迁的可操作项; 并发,需指定哪个变迁(先抛错)
  394. * 前提:同一rid只在同一业务下的一个net下运行
  395. * 任务列表进入详情页参数:rid,businessid,netid(可选)
  396. */
  397. func (s *Service) fetchResourceTranInfo(c context.Context, rid int64, businessID int64, netID int64) (result *net.TransitionInfo, err error) {
  398. var (
  399. runningFlow *net.Flow
  400. runningNet int64
  401. )
  402. if rid <= 0 || businessID <= 0 {
  403. err = ecode.RequestErr
  404. return
  405. }
  406. if runningFlow, runningNet, err = s.flowsByRunning(c, rid, businessID, netID, false); err != nil {
  407. return
  408. }
  409. flowID := runningFlow.ID
  410. if result, err = s.fetchTaskTranInfo(c, rid, flowID, runningNet); err != nil {
  411. log.Error("fetchResourceTranInfo s.fetchTaskTranInfo error(%v) rid(%d) businessid(%d) netid(%d) runningnet(%d)", err, rid, businessID, netID, runningNet)
  412. }
  413. return
  414. }
  415. /**
  416. * 任务详情页,获取flowid下的所有可允许变迁的可操作项,去掉批量可操作项:
  417. * 若为网中的最后一个flow, 获取网中所有变迁的可操作项(bind级别去重)--提交时只改token,不改flow
  418. * 若为中间节点:只有一个(非并发),直接获取变迁的可操作项;并发,需指定哪个变迁(先抛错)
  419. * 前提:同一rid只在同一业务下的一个net下运行
  420. * 资源列表进入详情页参数:rid, flowid, netid(可选)
  421. */
  422. func (s *Service) fetchTaskTranInfo(c context.Context, rid int64, flowID int64, netID int64) (result *net.TransitionInfo, err error) {
  423. var (
  424. transitionID []int64
  425. flow *net.Flow
  426. enableDir []*net.Direction
  427. trans []*net.Transition
  428. )
  429. if rid <= 0 || flowID <= 0 {
  430. err = ecode.RequestErr
  431. return
  432. }
  433. if enableDir, err = s.fetchFlowNextEnableDirs(c, flowID); err != nil {
  434. log.Error("fetchTaskTranInfo s.fetchFlowNextEnableDirs(%d) error(%v)", flowID, err)
  435. return
  436. }
  437. //作为分支的叶子节点,获取整个网的所有可操作项
  438. if len(enableDir) == 0 {
  439. if netID <= 0 {
  440. if flow, err = s.flowByID(c, flowID); err != nil {
  441. log.Error("fetchTaskTranInfo s.flowByID error(%v) rid(%d) flowid(%d)", err, rid, flowID)
  442. return
  443. }
  444. if flow == nil {
  445. log.Error("fetchTaskTranInfo flow(%d) not found rid(%d) ", flowID, rid)
  446. err = ecode.AegisFlowNotFound
  447. return
  448. }
  449. netID = flow.NetID
  450. }
  451. if transitionID, err = s.tranIDByNet(c, []int64{netID}, true, true); err != nil {
  452. log.Error("fetchTaskTranInfo s.tranIDByNet(%d) error(%v) rid(%d) flowid(%d)", netID, err, rid, flowID)
  453. return
  454. }
  455. } else {
  456. transitionID = []int64{}
  457. for _, item := range enableDir {
  458. transitionID = append(transitionID, item.TransitionID)
  459. }
  460. if trans, err = s.transitions(c, transitionID, false); err != nil {
  461. log.Error("fetchTaskTranInfo s.transitions(%v) error(%v) rid(%d) flowid(%d)", transitionID, err, rid, flowID)
  462. return
  463. }
  464. transitionID = []int64{}
  465. for _, item := range trans {
  466. if item.Trigger == net.TriggerManual {
  467. transitionID = append(transitionID, item.ID)
  468. }
  469. }
  470. }
  471. result = &net.TransitionInfo{
  472. RID: rid,
  473. FlowID: flowID,
  474. }
  475. if len(transitionID) == 0 {
  476. return
  477. }
  478. if result.Operations, err = s.tranOpers(c, transitionID, []int8{net.BindTypeTransition, net.BindTypeTranHelp}); err != nil {
  479. log.Error("fetchTaskTranInfo s.tranOpers(%v) error(%v) rid(%d) flowid(%d)", transitionID, err, rid, flowID)
  480. return
  481. }
  482. return
  483. }
  484. /**
  485. * 全部资源列表页,获取所有变迁的所有批量操作项
  486. * 参数:businessid,netid(选填)
  487. */
  488. func (s *Service) fetchBatchOperations(c context.Context, businessID int64, netID int64) (operations []*net.TranOperation, err error) {
  489. var (
  490. netIDList []int64
  491. )
  492. operations = []*net.TranOperation{}
  493. if netID > 0 {
  494. netIDList = []int64{netID}
  495. } else {
  496. //查询business_id下的所有net
  497. if netIDList, err = s.netIDByBusiness(c, businessID); err != nil {
  498. log.Error("fetchBatchOperations s.netIDByBusiness error(%v) businessid(%d) netid(%d)", err, businessID, netID)
  499. return
  500. }
  501. if len(netIDList) == 0 {
  502. log.Error("fetchBatchOperations business(%d) no net", businessID)
  503. return
  504. }
  505. }
  506. //查询所有net下的所有变迁可操作项
  507. if operations, err = s.tranOpersByNet(c, netIDList, []int8{net.BindTypeTranBatch, net.BindTypeTranHelpBatch}); err != nil {
  508. log.Error("fetchBatchOperations s.tranOpersByNet(%v) error(%v) businessid(%d) netid(%d)", netIDList, err, businessID, netID)
  509. }
  510. return
  511. }
  512. func (s *Service) tranOpersByNet(c context.Context, netIDs []int64, bindTp []int8) (opers []*net.TranOperation, err error) {
  513. var (
  514. transitionID = []int64{}
  515. )
  516. opers = []*net.TranOperation{}
  517. if len(netIDs) == 0 {
  518. return
  519. }
  520. if transitionID, err = s.tranIDByNet(c, netIDs, true, true); err != nil {
  521. log.Error("tranOpersByNet s.tranIDByNet(%v) error(%v) isBatch(%v)", netIDs, err, bindTp)
  522. return
  523. }
  524. if len(transitionID) == 0 {
  525. return
  526. }
  527. if opers, err = s.tranOpers(c, transitionID, bindTp); err != nil {
  528. log.Error("tranOpersByNet s.tranOpers(%v) error(%v) netid(%v) bindtp(%v)", transitionID, err, netIDs, bindTp)
  529. }
  530. return
  531. }
  532. func (s *Service) tranOpers(c context.Context, tranID []int64, bindTp []int8) (opers []*net.TranOperation, err error) {
  533. var (
  534. binds = []*net.TokenBind{}
  535. )
  536. opers = []*net.TranOperation{}
  537. if binds, err = s.tokenBindByElement(c, tranID, bindTp); err != nil {
  538. log.Error("tranOpers s.tokenBindByElement error(%v) tranid(%v) bindtp(%v)", err, tranID, bindTp)
  539. return
  540. }
  541. //bind.token_id维度去重 + bind.id维度排序
  542. tokenIDBind := map[string][]int64{}
  543. unique := map[string]*net.TranOperation{}
  544. for _, item := range binds {
  545. if _, exist := tokenIDBind[item.TokenID]; !exist {
  546. tokenIDBind[item.TokenID] = []int64{item.ID}
  547. unique[item.TokenID] = &net.TranOperation{
  548. ChName: item.ChName,
  549. }
  550. continue
  551. }
  552. tokenIDBind[item.TokenID] = append(tokenIDBind[item.TokenID], item.ID)
  553. }
  554. for uniqueTokenID, item := range unique {
  555. item.BindIDList = xstr.JoinInts(tokenIDBind[uniqueTokenID])
  556. opers = append(opers, item)
  557. }
  558. sort.Sort(net.TranOperationArr(opers))
  559. return
  560. }
  561. /**
  562. * 计算新节点的token集合
  563. * 启动流程节点,只提供newflowID, 找绑定的token
  564. * 流转过程中,根据bindIDs + 触发变迁的可允许有向线(变迁id + newflowid)计算新flow的结果:
  565. * 1. 若有静态绑定token,返回绑定的
  566. * 2. 若没静态绑定,且output="",返回bindID
  567. * 3. 若没静态绑定,且output!="",解析output计算---计算可能涉及到触发变迁/上一个flowid
  568. * 4. 其他情况,为配置错误,应该在配置时避免---todo
  569. * 5. start临时支持无令牌绑定情况
  570. */
  571. func (s *Service) computeNewFlowPackage(c context.Context, newFlowID int64, fromDir *net.Direction, submitPackage *net.TokenPackage, start bool) (resultTokens *net.TokenPackage, err error) {
  572. var (
  573. binds []*net.TokenBind
  574. )
  575. //flow绑定了tokens,直接返回
  576. if binds, err = s.tokenBindByElement(c, []int64{newFlowID}, []int8{net.BindTypeFlow}); err != nil {
  577. log.Error("computeNewFlowPackage s.tokenBindByElement error(%v) newflowid(%d) fromdir(%v) submit(%+v)", err, newFlowID, fromDir, submitPackage)
  578. return
  579. }
  580. if len(binds) > 0 {
  581. if resultTokens, err = s.newTokenPackage(c, binds, false); err != nil {
  582. log.Error("computeNewFlowPackage s.newTokenPackage error(%v) newflowid(%d) fromdir(%v) submit(%+v)", err, newFlowID, fromDir, submitPackage)
  583. }
  584. return
  585. }
  586. if start {
  587. return
  588. }
  589. //没绑定的,通过有向线的output计算
  590. if fromDir == nil {
  591. err = ecode.AegisFlowNoFromDir
  592. log.Error("computeNewFlowPackage newflowid(%d) has no tokens & fromdir, submit(%+v)", newFlowID, submitPackage)
  593. return
  594. }
  595. //output为空,根据提交内容
  596. if fromDir.Output == "" {
  597. resultTokens = submitPackage
  598. return
  599. }
  600. //todo--compute output,解析与prevFlowID + rid + submitTokens相关,return {{"rids":[],"tokens":[]}} --version2
  601. return
  602. }
  603. /**
  604. * 计算指定令牌关联的打包形式,包括:各类型的值+关联中文名+关联对应的令牌
  605. * sametokenid=true,需检查binds对应的tokenid是否一致
  606. */
  607. func (s *Service) newTokenPackage(c context.Context, binds []*net.TokenBind, sameTokenID bool) (res *net.TokenPackage, err error) {
  608. var (
  609. tokenIDList []int64
  610. tokens []*net.Token
  611. value interface{}
  612. sameTokenName string
  613. hitAudit bool
  614. )
  615. tokenIDStr := ""
  616. tokenID := ""
  617. if sameTokenID {
  618. tokenID = binds[0].TokenID
  619. sameTokenName = binds[0].ChName
  620. }
  621. for _, item := range binds {
  622. if sameTokenID && tokenID != item.TokenID {
  623. err = ecode.RequestErr
  624. log.Error("newTokenPackage binds diff token(%s!=%s) sameTokenID(%v)", tokenID, item.TokenID, sameTokenID)
  625. return
  626. }
  627. if item.Type == net.BindTypeTransition || item.Type == net.BindTypeTranBatch {
  628. hitAudit = true
  629. }
  630. tokenIDStr = tokenIDStr + "," + item.TokenID
  631. }
  632. tokenIDStr = strings.TrimLeft(tokenIDStr, ",")
  633. if tokenIDList, err = xstr.SplitInts(tokenIDStr); err != nil {
  634. log.Error("newTokenPackage xstr.SplitInts(%s) error(%v) sameTokenID(%v)", tokenIDStr, err, sameTokenID)
  635. return
  636. }
  637. if tokens, err = s.tokens(c, tokenIDList); err != nil {
  638. log.Error("newTokenPackage s.tokens(%v) error(%v) sameTokenID(%v)", tokenIDStr, err, sameTokenID)
  639. return
  640. }
  641. if len(tokens) == 0 {
  642. log.Error("newTokenPackage tokens(%s) not found sameTokenID(%v)", tokenIDStr, sameTokenID)
  643. err = ecode.AegisTokenNotFound
  644. return
  645. }
  646. values := map[string]interface{}{}
  647. chName := sameTokenName
  648. for _, item := range tokens {
  649. if value, err = item.FormatValue(); err != nil {
  650. log.Error("NewTokenPackage item.FormatValue(%+v) error(%v) sameTokenID(%v)", item, err, sameTokenID)
  651. return
  652. }
  653. values[item.Name] = value
  654. if sameTokenName == "" {
  655. chName = chName + item.ChName
  656. }
  657. }
  658. res = &net.TokenPackage{
  659. Values: values,
  660. TokenIDList: tokenIDList,
  661. ChName: chName,
  662. HitAudit: hitAudit,
  663. }
  664. return
  665. }
  666. /**
  667. * flowsByRunning 指定业务或netid,获取资源的运行节点, 只在一个节点上运行
  668. * rid反查现状得到flows,netid或businessid做过滤
  669. * businessID可选,netID可选,2者必须提供一个
  670. * onlyRunning 只过滤正常运行的资源现状
  671. */
  672. func (s *Service) flowsByRunning(c context.Context, rid int64, businessID int64, netID int64, onlyRunning bool) (runningFlow *net.Flow, runningNetID int64, err error) {
  673. var (
  674. n *net.Net
  675. nets []int64
  676. frs []*net.FlowResource
  677. )
  678. if rid <= 0 || (businessID <= 0 && netID == 0) {
  679. err = ecode.RequestErr
  680. log.Error("flowsByRunning rid(%d)/businessid(%d)+netid(%d) are empty, onlyrunning(%v)", rid, businessID, netID, onlyRunning)
  681. return
  682. }
  683. if netID > 0 {
  684. if n, err = s.netByID(c, netID); err != nil {
  685. log.Error("flowsByRunning s.netByID(%d) error(%v) rid(%d) businessid(%d) onlyrunning(%v)", netID, err, rid, businessID, onlyRunning)
  686. return
  687. }
  688. if n == nil || n.BusinessID != businessID {
  689. log.Error("flowsByRunning net(%d) in business(%d) not found, rid(%d) onlyrunning(%v)", netID, businessID, rid, onlyRunning)
  690. err = ecode.RequestErr
  691. return
  692. }
  693. nets = []int64{netID}
  694. } else {
  695. if nets, err = s.netIDByBusiness(c, businessID); err != nil {
  696. log.Error("flowsByRunning s.netIDByBusiness(%d) error(%v) rid(%d) onlyrunning(%v)", businessID, err, rid, onlyRunning)
  697. return
  698. }
  699. }
  700. if frs, err = s.gorm.FRByNetRID(c, nets, []int64{rid}, onlyRunning); err != nil {
  701. log.Error("flowsByRunning s.gorm.FRByNetRID(%d) error(%v) businessid(%d) netid(%d)", rid, err, businessID, netID)
  702. return
  703. }
  704. if len(frs) == 0 {
  705. log.Error("flowsByRunning rid(%d) not running in business(%d)/netid(%d) onlyrunning(%v)", rid, businessID, netID, onlyRunning)
  706. err = ecode.AegisNotRunInRange
  707. return
  708. }
  709. flowID := []int64{}
  710. for _, item := range frs {
  711. if runningNetID == 0 {
  712. runningNetID = item.NetID
  713. } else if item.NetID != runningNetID {
  714. log.Error("flowsByRunning rid(%d) running is both net(%d) & net(%d), business(%d), onlyrunning(%v)", rid, runningNetID, item.NetID, businessID, onlyRunning)
  715. err = ecode.AegisRunInDiffNet
  716. return
  717. }
  718. flowID = append(flowID, item.FlowID)
  719. }
  720. if len(flowID) > 1 {
  721. log.Error("flowsByRunning rid(%d) running in flows(%+v)>=2, businessid(%d)/netid(%d) runningnet(%d), onlyrunning(%v)", rid, flowID, businessID, netID, runningNetID, onlyRunning)
  722. err = ecode.AegisNotRunInFlow
  723. return
  724. }
  725. if runningFlow, err = s.flowByID(c, flowID[0]); err != nil {
  726. log.Error("flowsByRunning s.flowByID(%d) error(%v), rid(%d) businessid(%d) netid(%d), onlyrunning(%v)", flowID[0], err, rid, businessID, netID, onlyRunning)
  727. return
  728. }
  729. if runningFlow == nil {
  730. log.Error("flowsByRunning rid(%d) running in flows(%d) not found, businessid(%d) netid(%d) onlyrunnning(%v)", rid, flowID[0], businessID, netID, onlyRunning)
  731. err = ecode.AegisFlowNotFound
  732. return
  733. }
  734. return
  735. }
  736. /**
  737. * 流转到新节点且已更新现状表后,所需做的处理
  738. * 1. 现状流转日志
  739. * 2. 下游变迁处理,比如:人工变迁是否需要分发
  740. */
  741. func (s *Service) afterReachNewFlow(c context.Context, pm *net.TriggerResult, bizid int64) (err error) {
  742. s.sendNetTriggerLog(c, pm)
  743. err = s.prepareBeforeTrigger(c, []int64{pm.RID}, pm.NewFlowID, bizid)
  744. return
  745. }
  746. /**
  747. * 流转到达新节点,更新db操作,需在新节点计算token之后执行
  748. * 1. 现状表更新
  749. */
  750. func (s *Service) reachNewFlowDB(c context.Context, tx *gorm.DB, pm *net.TriggerResult) (err error) {
  751. var (
  752. flow *net.Flow
  753. )
  754. if pm.NetID <= 0 || pm.RID <= 0 || pm.NewFlowID <= 0 {
  755. log.Error("reachNewFlowDB params error(%+v)", pm)
  756. err = ecode.RequestErr
  757. return
  758. }
  759. //新节点与netid的同网检查
  760. if flow, err = s.flowByID(c, pm.NewFlowID); err != nil {
  761. log.Error("reachNewFlowDB s.flowByID error(%v) params(%+v)", err, pm)
  762. return
  763. }
  764. if flow == nil || flow.NetID != pm.NetID {
  765. log.Error("reachNewFlowDB s.flowByID not found/not same net, params(%+v), flow(%+v)", pm, flow)
  766. err = ecode.RequestErr
  767. return
  768. }
  769. if err = s.updateFlowResources(c, tx, pm.NetID, pm.RID, pm.NewFlowID); err != nil {
  770. log.Error("reachNewFlowDB s.changeFlowResource error(%v) params(%+v)", err, pm)
  771. return
  772. }
  773. //todo--更新现状token情况
  774. return
  775. }
  776. //取消指定资源的所有运行中流程
  777. func (s *Service) cancelNet(c context.Context, tx *gorm.DB, rids []int64) (res map[int64]string, err error) {
  778. var (
  779. frs []*net.FlowResource
  780. )
  781. if len(rids) == 0 {
  782. return
  783. }
  784. res = map[int64]string{}
  785. if frs, err = s.gorm.FRByUniques(c, rids, nil, true); err != nil {
  786. log.Error("cancelNet s.gorm.FRByUniques error(%v) rids(%+v)", err, rids)
  787. return
  788. }
  789. if len(frs) == 0 {
  790. return
  791. }
  792. if err = s.gorm.CancelFlowResource(c, tx, rids); err != nil {
  793. log.Error("cancelNet s.gorm.CancelFlowResource error(%+v) rids(%+v)", err, rids)
  794. return
  795. }
  796. //添加日志
  797. trigger := &net.TriggerResult{
  798. From: model.LogFromCancle,
  799. }
  800. for _, item := range frs {
  801. pref := res[item.RID]
  802. if pref != "" {
  803. pref = "," + pref
  804. }
  805. res[item.RID] = fmt.Sprintf("%s%d", pref, item.FlowID)
  806. trigger.RID = item.RID
  807. trigger.NetID = item.NetID
  808. trigger.OldFlowID = strconv.FormatInt(item.FlowID, 10)
  809. trigger.NewFlowID = item.FlowID
  810. s.sendNetTriggerLog(c, trigger)
  811. }
  812. return
  813. }