flow.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "sort"
  6. "time"
  7. "go-common/app/admin/main/aegis/model"
  8. "go-common/app/admin/main/aegis/model/net"
  9. "go-common/library/ecode"
  10. "go-common/library/log"
  11. "github.com/jinzhu/gorm"
  12. )
  13. /**
  14. * 业务下所有需要分发任务的flow,用net+flow中文名拼接
  15. */
  16. func (s *Service) dispatchFlow(c context.Context, businessID []int64, limitFlow []int64) (res map[int64]map[int64]string, err error) {
  17. var (
  18. nets []*net.Net
  19. tranID []int64
  20. dirs []*net.Direction
  21. flows []*net.Flow
  22. )
  23. //biz:flow:flow_name
  24. res = map[int64]map[int64]string{}
  25. //业务下所有可用网
  26. if nets, err = s.gorm.NetsByBusiness(c, businessID, false); err != nil {
  27. log.Error("dispatchFlow s.gorm.NetsByBusiness(%v) error(%v)", businessID, err)
  28. return
  29. }
  30. netID := []int64{}
  31. netMap := map[int64]*net.Net{}
  32. for _, item := range nets {
  33. netID = append(netID, item.ID)
  34. netMap[item.ID] = item
  35. }
  36. //网下所有可用变迁
  37. if tranID, err = s.tranIDByNet(c, netID, true, false); err != nil {
  38. log.Error("dispatchFlow s.gorm.TransitionIDByNet(%v) error(%v) businessid(%d)", netID, err, businessID)
  39. return
  40. }
  41. if len(tranID) == 0 {
  42. return
  43. }
  44. //变迁所有可用的被指向的有向线
  45. if dirs, err = s.gorm.DirectionByTransitionID(c, tranID, net.DirInput, false); err != nil {
  46. log.Error("dispatchFlow s.gorm.DirectionByTransitionID error(%v) businessid(%d)", err, businessID)
  47. return
  48. }
  49. limitFlowMap := map[int64]int{}
  50. for _, item := range limitFlow {
  51. limitFlowMap[item] = 1
  52. }
  53. accessFlow := []int64{}
  54. for _, item := range dirs {
  55. if len(limitFlowMap) > 0 && limitFlowMap[item.FlowID] <= 0 {
  56. continue
  57. }
  58. accessFlow = append(accessFlow, item.FlowID)
  59. }
  60. if len(accessFlow) == 0 {
  61. return
  62. }
  63. //拼接每个节点的中文名
  64. if flows, err = s.flows(c, accessFlow, false); err != nil {
  65. log.Error("dispatchFlow s.flows error(%v) businessid(%d)", err, businessID)
  66. return
  67. }
  68. sort.Sort(net.FlowArr(flows))
  69. for _, item := range flows {
  70. if item == nil || netMap[item.NetID] == nil {
  71. continue
  72. }
  73. nt := netMap[item.NetID]
  74. if _, exist := res[nt.BusinessID]; !exist {
  75. res[nt.BusinessID] = map[int64]string{}
  76. }
  77. res[nt.BusinessID][item.ID] = nt.ChName + item.ChName
  78. }
  79. return
  80. }
  81. //ShowFlow .
  82. func (s *Service) ShowFlow(c context.Context, id int64) (r *net.ShowFlowResult, err error) {
  83. var (
  84. f *net.Flow
  85. details map[int64][]*net.TokenBind
  86. n *net.Net
  87. )
  88. if f, err = s.gorm.FlowByID(c, id); err != nil {
  89. return
  90. }
  91. if details, err = s.gorm.TokenBindByElement(c, []int64{id}, []int8{net.BindTypeFlow}, true); err != nil {
  92. return
  93. }
  94. if n, err = s.gorm.NetByID(c, f.NetID); err != nil {
  95. return
  96. }
  97. r = &net.ShowFlowResult{
  98. Flow: f,
  99. Tokens: details[id],
  100. IsStart: n.StartFlowID == id,
  101. }
  102. return
  103. }
  104. //GetFlowList .
  105. func (s *Service) GetFlowList(c context.Context, pm *net.ListNetElementParam) (result *net.ListFlowRes, err error) {
  106. var (
  107. flowID []int64
  108. tks map[int64][]*net.TokenBind
  109. n *net.Net
  110. uid = []int64{}
  111. unames map[int64]string
  112. )
  113. if result, err = s.gorm.FlowList(c, pm); err != nil {
  114. return
  115. }
  116. if len(result.Result) == 0 {
  117. return
  118. }
  119. for _, item := range result.Result {
  120. flowID = append(flowID, item.ID)
  121. uid = append(uid, item.UID)
  122. }
  123. if tks, err = s.gorm.TokenBindByElement(c, flowID, []int8{net.BindTypeFlow}, true); err != nil {
  124. return
  125. }
  126. if n, err = s.gorm.NetByID(c, pm.NetID); err != nil {
  127. return
  128. }
  129. if unames, err = s.http.GetUnames(c, uid); err != nil {
  130. log.Error("GetFlowList s.http.GetUnames error(%v)", err)
  131. err = nil
  132. }
  133. for _, item := range result.Result {
  134. item.IsStart = item.ID == n.StartFlowID
  135. item.Username = unames[item.UID]
  136. for _, bd := range tks[item.ID] {
  137. item.Tokens = append(item.Tokens, bd.ChName)
  138. }
  139. }
  140. return
  141. }
  142. //GetFlowByNet .
  143. func (s *Service) GetFlowByNet(c context.Context, netID int64) (result map[int64]string, err error) {
  144. var (
  145. flows []*net.Flow
  146. )
  147. result = map[int64]string{}
  148. if flows, err = s.gorm.FlowsByNet(c, []int64{netID}); err != nil {
  149. log.Error("GetFlowByNet s.gorm.FlowsByNet(%d) error(%v)", netID, err)
  150. return
  151. }
  152. for _, item := range flows {
  153. result[item.ID] = item.ChName
  154. }
  155. return
  156. }
  157. func (s *Service) checkFlowUnique(c context.Context, netID int64, name string) (err error, msg string) {
  158. var exist *net.Flow
  159. if exist, err = s.gorm.FlowByUnique(c, netID, name); err != nil {
  160. log.Error("checkFlowUnique s.gorm.FlowByUnique(%d,%s) error(%v)", netID, name, err)
  161. return
  162. }
  163. if exist != nil {
  164. err = ecode.AegisUniqueAlreadyExist
  165. msg = fmt.Sprintf(ecode.AegisUniqueAlreadyExist.Message(), "节点", name)
  166. }
  167. return
  168. }
  169. func (s *Service) checkStartFlowBind(oldFlow *net.Flow, tokenIDList []int64) (err error, msg string) {
  170. if oldFlow != nil && !oldFlow.IsAvailable() {
  171. err = ecode.AegisFlowDisabled
  172. msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowDisabled.Message())
  173. return
  174. }
  175. //第一版动态审核初始接入状态:敏感待审、非敏感待审、高频转发待审,非一个确定性值,而系统不提供条件判断和guard解析,由
  176. //配置初始节点没有令牌,而业务start时自动传入state支持(后续接入统一初始状态进而条件分状态的逻辑后,去掉state字段,且加上该判断)
  177. //if len(tokenIDList) == 0 {
  178. // err = ecode.AegisFlowNoToken
  179. // msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowNoToken.Message())
  180. //}
  181. return
  182. }
  183. //AddFlow .
  184. func (s *Service) AddFlow(c context.Context, uid int64, f *net.FlowEditParam) (id int64, err error, msg string) {
  185. var (
  186. tx *gorm.DB
  187. diff = []string{}
  188. diffBind string
  189. )
  190. if err, msg = s.checkFlowUnique(c, f.NetID, f.Name); err != nil {
  191. return
  192. }
  193. if f.IsStart {
  194. if err, msg = s.checkStartFlowBind(nil, f.TokenIDList); err != nil {
  195. return
  196. }
  197. diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, f.IsStart, false, false))
  198. }
  199. flow := &net.Flow{
  200. NetID: f.NetID,
  201. Name: f.Name,
  202. ChName: f.ChName,
  203. Description: f.Description,
  204. UID: uid,
  205. }
  206. //db update
  207. tx, err = s.gorm.BeginTx(c)
  208. if err != nil {
  209. log.Error("AddFlow s.gorm.BeginTx error(%v)", err)
  210. return
  211. }
  212. if err = s.gorm.AddItem(c, tx, flow); err != nil {
  213. tx.Rollback()
  214. return
  215. }
  216. if diffBind, _, err, msg = s.compareFlowBind(c, tx, flow.ID, f.TokenIDList, false); err != nil {
  217. log.Error("AddFlow s.compareFlowBind error(%v) params(%+v)", err, f)
  218. tx.Rollback()
  219. return
  220. }
  221. if diffBind != "" {
  222. diff = append(diff, diffBind)
  223. }
  224. if f.IsStart {
  225. if err = s.gorm.NetBindStartFlow(c, tx, flow.NetID, flow.ID); err != nil {
  226. tx.Rollback()
  227. return
  228. }
  229. }
  230. if err = tx.Commit().Error; err != nil {
  231. log.Error("AddFlow tx.Commit error(%v)", err)
  232. return
  233. }
  234. id = flow.ID
  235. //日志
  236. diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, "", false))
  237. diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, "", false))
  238. oper := &model.NetConfOper{
  239. OID: flow.ID,
  240. Action: model.LogNetActionNew,
  241. UID: flow.UID,
  242. NetID: flow.NetID,
  243. ChName: flow.ChName,
  244. FlowID: flow.ID,
  245. Diff: diff,
  246. }
  247. s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
  248. return
  249. }
  250. // UpdateFlow .
  251. func (s *Service) UpdateFlow(c context.Context, uid int64, f *net.FlowEditParam) (err error, msg string) {
  252. var (
  253. old *net.Flow
  254. n *net.Net
  255. startFlowID int64 = -1
  256. updates = map[string]interface{}{}
  257. tx *gorm.DB
  258. diff = []string{}
  259. diffBind string
  260. changedBind []int64
  261. )
  262. if old, err = s.gorm.FlowByID(c, f.ID); err != nil {
  263. log.Error("UpdateFlow s.gorm.FlowByID(%d) error(%v)", f.ID, err)
  264. return
  265. }
  266. if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
  267. log.Error("UpdateFlow s.gorm.NetByID(%d) error(%v) flowid(%d)", old.NetID, err, f.ID)
  268. return
  269. }
  270. if f.IsStart && n.StartFlowID != f.ID {
  271. startFlowID = f.ID
  272. } else if !f.IsStart && n.StartFlowID == f.ID {
  273. startFlowID = 0
  274. }
  275. if f.IsStart {
  276. if err, msg = s.checkStartFlowBind(old, f.TokenIDList); err != nil {
  277. return
  278. }
  279. diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, true, false, true))
  280. }
  281. if f.Name != old.Name {
  282. if err, msg = s.checkFlowUnique(c, old.NetID, f.Name); err != nil {
  283. return
  284. }
  285. diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, old.Name, true))
  286. old.Name = f.Name
  287. updates["name"] = f.Name
  288. }
  289. if f.ChName != old.ChName {
  290. diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, old.ChName, true))
  291. old.ChName = f.ChName
  292. updates["ch_name"] = f.ChName
  293. }
  294. if f.Description != old.Description {
  295. old.Description = f.Description
  296. updates["description"] = f.Description
  297. }
  298. //db update
  299. tx, err = s.gorm.BeginTx(c)
  300. if err != nil {
  301. log.Error("UpdateFlow s.gorm.BeginTx error(%v)", err)
  302. return
  303. }
  304. if len(updates) > 0 {
  305. if err = s.gorm.UpdateFields(c, tx, net.TableFlow, old.ID, updates); err != nil {
  306. tx.Rollback()
  307. return
  308. }
  309. }
  310. if startFlowID >= 0 {
  311. if err = s.gorm.NetBindStartFlow(c, tx, n.ID, startFlowID); err != nil {
  312. tx.Rollback()
  313. return
  314. }
  315. }
  316. if diffBind, changedBind, err, msg = s.compareFlowBind(c, tx, f.ID, f.TokenIDList, true); err != nil {
  317. log.Error("updateFlow s.compareFlowBind error(%v) params(%+v)", err, f)
  318. tx.Rollback()
  319. return
  320. }
  321. if diffBind != "" {
  322. diff = append(diff, diffBind)
  323. }
  324. if err = tx.Commit().Error; err != nil {
  325. log.Error("UpdateFlow tx.Commit error(%v)", err)
  326. return
  327. }
  328. s.delFlowCache(c, old, changedBind)
  329. //日志
  330. if len(diff) == 0 {
  331. return
  332. }
  333. oper := &model.NetConfOper{
  334. OID: old.ID,
  335. Action: model.LogNetActionUpdate,
  336. UID: uid,
  337. NetID: old.NetID,
  338. ChName: old.ChName,
  339. FlowID: old.ID,
  340. Diff: diff,
  341. }
  342. s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
  343. return
  344. }
  345. //SwitchFlow .
  346. func (s *Service) SwitchFlow(c context.Context, id int64, needDisable bool) (err error) {
  347. var (
  348. old *net.Flow
  349. n *net.Net
  350. dirs []*net.Direction
  351. action string
  352. )
  353. if old, err = s.gorm.FlowByID(c, id); err != nil {
  354. log.Error("SwitchFlow s.gorm.FlowByID(%d) error(%v) needDisable(%v)", id, err, needDisable)
  355. return
  356. }
  357. log.Info("SwitchFlow id(%d) needdisable(%v) old-flow(%+v)", id, needDisable, old)
  358. available := old.IsAvailable()
  359. if available == !needDisable {
  360. return
  361. }
  362. if needDisable {
  363. if dirs, err = s.gorm.DirectionByFlowID(c, []int64{id}, 0); err != nil {
  364. log.Error("SwitchFlow s.gorm.DirectionByFlowID(%d) error(%v)", id, err)
  365. return
  366. }
  367. if len(dirs) > 0 {
  368. log.Error("SwitchFlow dir by flow(%d) founded", id)
  369. err = ecode.AegisFlowBinded
  370. return
  371. }
  372. if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
  373. log.Error("SwitchFlow s.gorm.NetByID(%d) error(%v) flow(%d)", old.NetID, err, id)
  374. return
  375. }
  376. if n.StartFlowID == id {
  377. log.Error("SwitchFlow net(%d).startflow=flow(%d) founded", n.ID, id)
  378. err = ecode.AegisFlowBinded
  379. return
  380. }
  381. old.DisableTime = time.Now()
  382. action = model.LogNetActionDisable
  383. } else {
  384. old.DisableTime = net.Recovered
  385. action = model.LogNetActionAvailable
  386. }
  387. if err = s.gorm.UpdateFields(c, nil, net.TableFlow, id, map[string]interface{}{"disable_time": old.DisableTime}); err != nil {
  388. return
  389. }
  390. s.delFlowCache(c, old, nil)
  391. //日志
  392. oper := &model.NetConfOper{
  393. OID: old.ID,
  394. Action: action,
  395. UID: old.UID,
  396. NetID: old.NetID,
  397. ChName: old.ChName,
  398. FlowID: old.ID,
  399. }
  400. s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
  401. return
  402. }