123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- package service
- import (
- "context"
- "fmt"
- "sort"
- "time"
- "go-common/app/admin/main/aegis/model"
- "go-common/app/admin/main/aegis/model/net"
- "go-common/library/ecode"
- "go-common/library/log"
- "github.com/jinzhu/gorm"
- )
- /**
- * 业务下所有需要分发任务的flow,用net+flow中文名拼接
- */
- func (s *Service) dispatchFlow(c context.Context, businessID []int64, limitFlow []int64) (res map[int64]map[int64]string, err error) {
- var (
- nets []*net.Net
- tranID []int64
- dirs []*net.Direction
- flows []*net.Flow
- )
- //biz:flow:flow_name
- res = map[int64]map[int64]string{}
- //业务下所有可用网
- if nets, err = s.gorm.NetsByBusiness(c, businessID, false); err != nil {
- log.Error("dispatchFlow s.gorm.NetsByBusiness(%v) error(%v)", businessID, err)
- return
- }
- netID := []int64{}
- netMap := map[int64]*net.Net{}
- for _, item := range nets {
- netID = append(netID, item.ID)
- netMap[item.ID] = item
- }
- //网下所有可用变迁
- if tranID, err = s.tranIDByNet(c, netID, true, false); err != nil {
- log.Error("dispatchFlow s.gorm.TransitionIDByNet(%v) error(%v) businessid(%d)", netID, err, businessID)
- return
- }
- if len(tranID) == 0 {
- return
- }
- //变迁所有可用的被指向的有向线
- if dirs, err = s.gorm.DirectionByTransitionID(c, tranID, net.DirInput, false); err != nil {
- log.Error("dispatchFlow s.gorm.DirectionByTransitionID error(%v) businessid(%d)", err, businessID)
- return
- }
- limitFlowMap := map[int64]int{}
- for _, item := range limitFlow {
- limitFlowMap[item] = 1
- }
- accessFlow := []int64{}
- for _, item := range dirs {
- if len(limitFlowMap) > 0 && limitFlowMap[item.FlowID] <= 0 {
- continue
- }
- accessFlow = append(accessFlow, item.FlowID)
- }
- if len(accessFlow) == 0 {
- return
- }
- //拼接每个节点的中文名
- if flows, err = s.flows(c, accessFlow, false); err != nil {
- log.Error("dispatchFlow s.flows error(%v) businessid(%d)", err, businessID)
- return
- }
- sort.Sort(net.FlowArr(flows))
- for _, item := range flows {
- if item == nil || netMap[item.NetID] == nil {
- continue
- }
- nt := netMap[item.NetID]
- if _, exist := res[nt.BusinessID]; !exist {
- res[nt.BusinessID] = map[int64]string{}
- }
- res[nt.BusinessID][item.ID] = nt.ChName + item.ChName
- }
- return
- }
- //ShowFlow .
- func (s *Service) ShowFlow(c context.Context, id int64) (r *net.ShowFlowResult, err error) {
- var (
- f *net.Flow
- details map[int64][]*net.TokenBind
- n *net.Net
- )
- if f, err = s.gorm.FlowByID(c, id); err != nil {
- return
- }
- if details, err = s.gorm.TokenBindByElement(c, []int64{id}, []int8{net.BindTypeFlow}, true); err != nil {
- return
- }
- if n, err = s.gorm.NetByID(c, f.NetID); err != nil {
- return
- }
- r = &net.ShowFlowResult{
- Flow: f,
- Tokens: details[id],
- IsStart: n.StartFlowID == id,
- }
- return
- }
- //GetFlowList .
- func (s *Service) GetFlowList(c context.Context, pm *net.ListNetElementParam) (result *net.ListFlowRes, err error) {
- var (
- flowID []int64
- tks map[int64][]*net.TokenBind
- n *net.Net
- uid = []int64{}
- unames map[int64]string
- )
- if result, err = s.gorm.FlowList(c, pm); err != nil {
- return
- }
- if len(result.Result) == 0 {
- return
- }
- for _, item := range result.Result {
- flowID = append(flowID, item.ID)
- uid = append(uid, item.UID)
- }
- if tks, err = s.gorm.TokenBindByElement(c, flowID, []int8{net.BindTypeFlow}, true); err != nil {
- return
- }
- if n, err = s.gorm.NetByID(c, pm.NetID); err != nil {
- return
- }
- if unames, err = s.http.GetUnames(c, uid); err != nil {
- log.Error("GetFlowList s.http.GetUnames error(%v)", err)
- err = nil
- }
- for _, item := range result.Result {
- item.IsStart = item.ID == n.StartFlowID
- item.Username = unames[item.UID]
- for _, bd := range tks[item.ID] {
- item.Tokens = append(item.Tokens, bd.ChName)
- }
- }
- return
- }
- //GetFlowByNet .
- func (s *Service) GetFlowByNet(c context.Context, netID int64) (result map[int64]string, err error) {
- var (
- flows []*net.Flow
- )
- result = map[int64]string{}
- if flows, err = s.gorm.FlowsByNet(c, []int64{netID}); err != nil {
- log.Error("GetFlowByNet s.gorm.FlowsByNet(%d) error(%v)", netID, err)
- return
- }
- for _, item := range flows {
- result[item.ID] = item.ChName
- }
- return
- }
- func (s *Service) checkFlowUnique(c context.Context, netID int64, name string) (err error, msg string) {
- var exist *net.Flow
- if exist, err = s.gorm.FlowByUnique(c, netID, name); err != nil {
- log.Error("checkFlowUnique s.gorm.FlowByUnique(%d,%s) error(%v)", netID, name, err)
- return
- }
- if exist != nil {
- err = ecode.AegisUniqueAlreadyExist
- msg = fmt.Sprintf(ecode.AegisUniqueAlreadyExist.Message(), "节点", name)
- }
- return
- }
- func (s *Service) checkStartFlowBind(oldFlow *net.Flow, tokenIDList []int64) (err error, msg string) {
- if oldFlow != nil && !oldFlow.IsAvailable() {
- err = ecode.AegisFlowDisabled
- msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowDisabled.Message())
- return
- }
- //第一版动态审核初始接入状态:敏感待审、非敏感待审、高频转发待审,非一个确定性值,而系统不提供条件判断和guard解析,由
- //配置初始节点没有令牌,而业务start时自动传入state支持(后续接入统一初始状态进而条件分状态的逻辑后,去掉state字段,且加上该判断)
- //if len(tokenIDList) == 0 {
- // err = ecode.AegisFlowNoToken
- // msg = fmt.Sprintf("%s,不能作为初始节点", ecode.AegisFlowNoToken.Message())
- //}
- return
- }
- //AddFlow .
- func (s *Service) AddFlow(c context.Context, uid int64, f *net.FlowEditParam) (id int64, err error, msg string) {
- var (
- tx *gorm.DB
- diff = []string{}
- diffBind string
- )
- if err, msg = s.checkFlowUnique(c, f.NetID, f.Name); err != nil {
- return
- }
- if f.IsStart {
- if err, msg = s.checkStartFlowBind(nil, f.TokenIDList); err != nil {
- return
- }
- diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, f.IsStart, false, false))
- }
- flow := &net.Flow{
- NetID: f.NetID,
- Name: f.Name,
- ChName: f.ChName,
- Description: f.Description,
- UID: uid,
- }
- //db update
- tx, err = s.gorm.BeginTx(c)
- if err != nil {
- log.Error("AddFlow s.gorm.BeginTx error(%v)", err)
- return
- }
- if err = s.gorm.AddItem(c, tx, flow); err != nil {
- tx.Rollback()
- return
- }
- if diffBind, _, err, msg = s.compareFlowBind(c, tx, flow.ID, f.TokenIDList, false); err != nil {
- log.Error("AddFlow s.compareFlowBind error(%v) params(%+v)", err, f)
- tx.Rollback()
- return
- }
- if diffBind != "" {
- diff = append(diff, diffBind)
- }
- if f.IsStart {
- if err = s.gorm.NetBindStartFlow(c, tx, flow.NetID, flow.ID); err != nil {
- tx.Rollback()
- return
- }
- }
- if err = tx.Commit().Error; err != nil {
- log.Error("AddFlow tx.Commit error(%v)", err)
- return
- }
- id = flow.ID
- //日志
- diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, "", false))
- diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, "", false))
- oper := &model.NetConfOper{
- OID: flow.ID,
- Action: model.LogNetActionNew,
- UID: flow.UID,
- NetID: flow.NetID,
- ChName: flow.ChName,
- FlowID: flow.ID,
- Diff: diff,
- }
- s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
- return
- }
- // UpdateFlow .
- func (s *Service) UpdateFlow(c context.Context, uid int64, f *net.FlowEditParam) (err error, msg string) {
- var (
- old *net.Flow
- n *net.Net
- startFlowID int64 = -1
- updates = map[string]interface{}{}
- tx *gorm.DB
- diff = []string{}
- diffBind string
- changedBind []int64
- )
- if old, err = s.gorm.FlowByID(c, f.ID); err != nil {
- log.Error("UpdateFlow s.gorm.FlowByID(%d) error(%v)", f.ID, err)
- return
- }
- if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
- log.Error("UpdateFlow s.gorm.NetByID(%d) error(%v) flowid(%d)", old.NetID, err, f.ID)
- return
- }
- if f.IsStart && n.StartFlowID != f.ID {
- startFlowID = f.ID
- } else if !f.IsStart && n.StartFlowID == f.ID {
- startFlowID = 0
- }
- if f.IsStart {
- if err, msg = s.checkStartFlowBind(old, f.TokenIDList); err != nil {
- return
- }
- diff = append(diff, model.LogFieldTemp(model.LogFieldStartFlow, true, false, true))
- }
- if f.Name != old.Name {
- if err, msg = s.checkFlowUnique(c, old.NetID, f.Name); err != nil {
- return
- }
- diff = append(diff, model.LogFieldTemp(model.LogFieldName, f.Name, old.Name, true))
- old.Name = f.Name
- updates["name"] = f.Name
- }
- if f.ChName != old.ChName {
- diff = append(diff, model.LogFieldTemp(model.LogFieldChName, f.ChName, old.ChName, true))
- old.ChName = f.ChName
- updates["ch_name"] = f.ChName
- }
- if f.Description != old.Description {
- old.Description = f.Description
- updates["description"] = f.Description
- }
- //db update
- tx, err = s.gorm.BeginTx(c)
- if err != nil {
- log.Error("UpdateFlow s.gorm.BeginTx error(%v)", err)
- return
- }
- if len(updates) > 0 {
- if err = s.gorm.UpdateFields(c, tx, net.TableFlow, old.ID, updates); err != nil {
- tx.Rollback()
- return
- }
- }
- if startFlowID >= 0 {
- if err = s.gorm.NetBindStartFlow(c, tx, n.ID, startFlowID); err != nil {
- tx.Rollback()
- return
- }
- }
- if diffBind, changedBind, err, msg = s.compareFlowBind(c, tx, f.ID, f.TokenIDList, true); err != nil {
- log.Error("updateFlow s.compareFlowBind error(%v) params(%+v)", err, f)
- tx.Rollback()
- return
- }
- if diffBind != "" {
- diff = append(diff, diffBind)
- }
- if err = tx.Commit().Error; err != nil {
- log.Error("UpdateFlow tx.Commit error(%v)", err)
- return
- }
- s.delFlowCache(c, old, changedBind)
- //日志
- if len(diff) == 0 {
- return
- }
- oper := &model.NetConfOper{
- OID: old.ID,
- Action: model.LogNetActionUpdate,
- UID: uid,
- NetID: old.NetID,
- ChName: old.ChName,
- FlowID: old.ID,
- Diff: diff,
- }
- s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
- return
- }
- //SwitchFlow .
- func (s *Service) SwitchFlow(c context.Context, id int64, needDisable bool) (err error) {
- var (
- old *net.Flow
- n *net.Net
- dirs []*net.Direction
- action string
- )
- if old, err = s.gorm.FlowByID(c, id); err != nil {
- log.Error("SwitchFlow s.gorm.FlowByID(%d) error(%v) needDisable(%v)", id, err, needDisable)
- return
- }
- log.Info("SwitchFlow id(%d) needdisable(%v) old-flow(%+v)", id, needDisable, old)
- available := old.IsAvailable()
- if available == !needDisable {
- return
- }
- if needDisable {
- if dirs, err = s.gorm.DirectionByFlowID(c, []int64{id}, 0); err != nil {
- log.Error("SwitchFlow s.gorm.DirectionByFlowID(%d) error(%v)", id, err)
- return
- }
- if len(dirs) > 0 {
- log.Error("SwitchFlow dir by flow(%d) founded", id)
- err = ecode.AegisFlowBinded
- return
- }
- if n, err = s.gorm.NetByID(c, old.NetID); err != nil {
- log.Error("SwitchFlow s.gorm.NetByID(%d) error(%v) flow(%d)", old.NetID, err, id)
- return
- }
- if n.StartFlowID == id {
- log.Error("SwitchFlow net(%d).startflow=flow(%d) founded", n.ID, id)
- err = ecode.AegisFlowBinded
- return
- }
- old.DisableTime = time.Now()
- action = model.LogNetActionDisable
- } else {
- old.DisableTime = net.Recovered
- action = model.LogNetActionAvailable
- }
- if err = s.gorm.UpdateFields(c, nil, net.TableFlow, id, map[string]interface{}{"disable_time": old.DisableTime}); err != nil {
- return
- }
- s.delFlowCache(c, old, nil)
- //日志
- oper := &model.NetConfOper{
- OID: old.ID,
- Action: action,
- UID: old.UID,
- NetID: old.NetID,
- ChName: old.ChName,
- FlowID: old.ID,
- }
- s.sendNetConfLog(c, model.LogTypeFlowConf, oper)
- return
- }
|