package service import ( "context" "encoding/json" "fmt" "math" "strconv" "strings" "time" "go-common/app/admin/main/aegis/model" "go-common/app/admin/main/aegis/model/common" "go-common/app/admin/main/aegis/model/net" "go-common/app/admin/main/aegis/model/resource" "go-common/library/ecode" "go-common/library/log" "go-common/library/queue/databus/report" "go-common/library/xstr" ) // send to log service func (s *Service) sendAuditLog(c context.Context, action string, opt *model.SubmitOptions, flowres interface{}, logtype int) (err error) { // send logData := &report.ManagerInfo{ Uname: opt.Uname, UID: opt.UID, Business: model.LogBusinessAudit, Type: logtype, Oid: opt.RID, Action: action, Ctime: time.Now(), Index: []interface{}{opt.BusinessID, opt.NewFlowID, opt.TaskID, strconv.Itoa(opt.Result.State)}, Content: map[string]interface{}{ "opt": opt, "flow": flowres, }, } if err = report.Manager(logData); err != nil { log.Error("report.Manager(%+v) error(%v)", logData, err) } return } // send to log service func (s *Service) sendTaskConsumerLog(c context.Context, action string, opt *common.BaseOptions) (err error) { logData := &report.ManagerInfo{ Uname: opt.Uname, UID: opt.UID, Business: model.LogBusinessTask, Type: model.LogTypeTaskConsumer, Oid: 0, Action: action, Ctime: time.Now(), Index: []interface{}{opt.BusinessID, opt.FlowID, opt.Role}, } if err = report.Manager(logData); err != nil { log.Error("report.Manager(%+v) error(%v)", logData, err) } return } // send to log service func (s *Service) sendRscLog(c context.Context, acction string, opt *model.AddOption, res *net.TriggerResult, update interface{}, err error) { var rid, flowid int64 if res != nil { flowid = res.NewFlowID rid = res.RID } // send logData := &report.ManagerInfo{ Uname: "business", UID: 399, Business: model.LogBusinessResource, Type: model.LogTypeFromAdd, Oid: rid, Action: acction, Ctime: time.Now(), Index: []interface{}{opt.BusinessID, flowid, opt.OID}, Content: map[string]interface{}{ "opt": opt, "res": res, "update": update, "err": err, }, } if err1 := report.Manager(logData); err1 != nil { log.Error("report.Manager(%+v) error(%v)", logData, err1) } } func (s *Service) sendRscCancleLog(c context.Context, BusinessID int64, oids []string, uid int64, username string, err error) { logData := &report.ManagerInfo{ Uname: username, UID: uid, Business: model.LogBusinessResource, Type: model.LogTypeFromCancle, Oid: 0, Action: "cancle", Ctime: time.Now(), Index: []interface{}{BusinessID}, Content: map[string]interface{}{ "oids": oids, "err": err, }, } if err1 := report.Manager(logData); err1 != nil { log.Error("report.Manager(%+v) error(%v)", logData, err1) } } func (s *Service) sendRscSubmitLog(c context.Context, action string, opt *model.SubmitOptions, res interface{}) { logData := &report.ManagerInfo{ Uname: opt.Uname, UID: opt.UID, Business: model.LogBusinessResource, Type: model.LogTypeFormAuditor, Oid: opt.RID, Action: action, Ctime: time.Now(), Index: []interface{}{opt.BusinessID, opt.FlowID, opt.OID}, Content: map[string]interface{}{ "opt": opt, "res": res, }, } if err := report.Manager(logData); err != nil { log.Error("report.Manager(%+v) error(%v)", logData, err) } } /** * 记录流程流转日志 * oid=rid,action=new_flow_id, index=[net_id, old_flow_id, transition_id, from], content=submit+result * from=单个提交/批量提交/跳流程/启动/取消 */ func (s *Service) sendNetTriggerLog(c context.Context, pm *net.TriggerResult) (err error) { var ( submitValue, resValue []byte tran string content = map[string]interface{}{} ) if len(pm.TransitionID) > 0 { tran = xstr.JoinInts(pm.TransitionID) } if pm.SubmitToken != nil { if submitValue, err = json.Marshal(pm.SubmitToken); err != nil { log.Error("sendNetTriggerLog json.Marshal error(%v) submit(%v)", err, pm.SubmitToken) return } content["submit"] = string(submitValue) } if pm.ResultToken != nil { if resValue, err = json.Marshal(pm.ResultToken); err != nil { log.Error("sendNetTriggerLog json.Marshal error(%v) result(%v)", err, pm.ResultToken) return } content["result"] = string(resValue) } data := &report.ManagerInfo{ Business: model.LogBusinessNet, Type: model.LogTypeNetTrigger, Oid: pm.RID, Action: strconv.FormatInt(pm.NewFlowID, 10), Ctime: time.Now(), Index: []interface{}{pm.NetID, tran, pm.From, pm.OldFlowID}, Content: content, } log.Info("sendNetTriggerLog start send log(%+v)", data) report.Manager(data) return } /** * oid: 各元素id, type=level, action=禁用/创建/更新/启用, ctime=time.now, index=[net_id, ch_name, flow_id, tran_id], content=diff * level in (net/token/token_bind_flow/token_bind_transition/flow/transition/direction) * diff如下: * token: obj=name+compare+value(type) * token_bind: obj=flow_chname/tran_chname:从token_obj变成token_obj, ch_name从xx变成xx * flow: ch_name从xx变成xx, name从xx变成xx * tran: ch_name从xx变成xx, name从xx变成xx,trigger从xx变成xx,limit从xx变成xx * dir:direction从xx变成xx,order从xx变成xx,guard从xx变成xx,output从xx变成yy * */ func (s *Service) sendNetConfLog(c context.Context, tp int, oper *model.NetConfOper) (err error) { data := &report.ManagerInfo{ UID: oper.UID, Uname: "", Business: model.LogBusinessNetConf, Type: tp, Oid: oper.OID, Action: oper.Action, Ctime: time.Now(), Index: []interface{}{oper.NetID, oper.FlowID, oper.TranID, oper.ChName}, Content: map[string]interface{}{ "diff": strings.Join(oper.Diff, "\r\n"), }, } log.Info("sendNetConfLog data(%+v)", data) report.Manager(data) return } //SearchAuditLogCSV 操作日志结果csv func (s *Service) SearchAuditLogCSV(c context.Context, pm *model.SearchAuditLogParam) (csv [][]string, err error) { var ( res []*model.SearchAuditLog ) if res, _, err = s.SearchAuditLog(c, pm); err != nil { return } csv = make([][]string, len(res)+1) csv[0] = []string{"rid", "oid", "task id", "状态", "操作时间", "操作人", "其他信息"} for i, item := range res { csv[i+1] = []string{ strconv.FormatInt(item.RID, 10), item.OID, strconv.FormatInt(item.TaskID, 10), item.State, item.Stime, fmt.Sprintf("%s(%s)", item.Uname, item.Department), item.Extra, } } return } //SearchAuditLog 查询审核日志 func (s *Service) SearchAuditLog(c context.Context, pm *model.SearchAuditLogParam) (res []*model.SearchAuditLog, p common.Pager, err error) { var ( logs *model.SearchLogResult ridoid, udepartment map[int64]string oidrid map[string]int64 ) p = common.Pager{ Ps: pm.Ps, Pn: pm.Pn, } //oid转换成rid查询 if len(pm.OID) > 0 { if oidrid, err = s.gorm.ResIDByOID(c, pm.BusinessID, pm.OID); err != nil { log.Error("SearchAuditLog s.gorm.ResIDByOID error(%+v) pm(%+v)", err, pm) return } if len(oidrid) == 0 { return } ridoid = map[int64]string{} for oid, rid := range oidrid { pm.RID = append(pm.RID, rid) ridoid[rid] = oid } } if logs, err = s.searchAuditLog(c, pm); err != nil { err = ecode.AegisSearchErr return } p.Total = logs.Page.Total if len(logs.Result) == 0 { return } uids := []int64{} rids := []int64{} unameuids := []int64{} uidunameexist := map[int64]string{} res = make([]*model.SearchAuditLog, len(logs.Result)) for i, item := range logs.Result { if item.UID > 0 { uids = append(uids, item.UID) } oid, exist := ridoid[item.OID] if !exist { rids = append(rids, item.OID) } if item.Uname != "" { uidunameexist[item.UID] = item.Uname } else { item.Uname = uidunameexist[item.UID] } if item.Uname == "" && item.UID > 0 { unameuids = append(unameuids, item.UID) } change := &model.Change{} if err = json.Unmarshal([]byte(item.Extra), &change); err != nil { log.Error("searchAuditLog json.Unmarshal error(%v) extra(%s) pm(%+v)", err, item.Extra, pm) return } flowaction, submitopt := change.GetSubmitOper() res[i] = &model.SearchAuditLog{ RID: item.OID, OID: oid, TaskID: item.Int2, State: item.Str0, Stime: item.Ctime, UID: item.UID, Uname: item.Uname, Department: "", Extra: fmt.Sprintf("操作详情:[%s]%s %s", item.Action, flowaction, submitopt), } } //由搜索结果提供了rid if len(rids) > 0 { if ridoid, err = s.gorm.ResOIDByID(c, rids); err != nil { return } } unames, _ := s.http.GetUnames(c, unameuids) udepartment, _ = s.http.GetUdepartment(c, uids) for _, item := range res { if item.OID == "" { item.OID = ridoid[item.RID] } if item.Uname == "" && item.UID > 0 { item.Uname = unames[item.UID] } item.Department = udepartment[item.UID] } return } func (s *Service) trackAuditLog(c context.Context, pm *model.SearchAuditLogParam) (res []*model.TrackAudit, err error) { var ( logs *model.SearchLogResult flowch map[int64]string ) res = []*model.TrackAudit{} if logs, err = s.searchAuditLog(c, pm); err != nil { log.Error("trackAuditLog s.searchAuditLog error(%v) pm(%+v)", err, pm) return } res = make([]*model.TrackAudit, len(logs.Result)) flows := []int64{} for i, item := range logs.Result { change := &model.Change{} if err = json.Unmarshal([]byte(item.Extra), change); err != nil { log.Error("trackAuditLog json.Unmarshal error(%v) pm(%+v)", err, pm) return } res[i] = &model.TrackAudit{ Ctime: item.Ctime, FlowID: []int64{}, State: "", Uname: item.Uname, } if int(item.Type) == model.LogTypeAuditCancel { res[i].State = "删除" } if change.Flow == nil { continue } if int(item.Type) == model.LogTypeAuditCancel { var one []int64 one, err = xstr.SplitInts(change.Flow.OldFlowID.String()) if err != nil { log.Error("trackAuditLog xstr.SplitInts(%s) error(%v)", change.Flow.OldFlowID.String(), err) err = nil continue } if len(one) == 0 { continue } res[i].FlowID = one flows = append(flows, one...) continue } flows = append(flows, change.Flow.NewFlowID) res[i].FlowID = []int64{change.Flow.NewFlowID} if change.Flow.ResultToken != nil { res[i].State = change.Flow.ResultToken.ChName } } //get flows names if len(flows) == 0 { return } if flowch, err = s.gorm.ColumnMapString(c, net.TableFlow, "ch_name", flows, ""); err != nil { log.Error("trackAuditLog s.gorm.ColumnMapString error(%v) pm(%+v)", err, pm) return } for _, item := range res { fnames := make([]string, len(item.FlowID)) for i, fid := range item.FlowID { fnames[i] = flowch[fid] } item.FlowName = strings.Join(fnames, ",") } return } //TrackResource 资源信息追踪, 获取资源add/update日志,并分页,以此为基准,获取对应时间端内的资源audit日志;若add/update日志只有不超过1页,则获取全部audit日志;超过1页,最后一页会返回剩余的全部audit日志 func (s *Service) TrackResource(c context.Context, pm *model.TrackParam) (res *model.TrackInfo, p common.Pager, err error) { var ( obj *resource.Resource rsc []*model.TrackRsc audit []*model.TrackAudit rela [][]int LogMinTime = "2018-11-01 10:00:00" ) if obj, err = s.gorm.ResourceByOID(c, pm.OID, pm.BusinessID); err != nil || obj == nil { log.Error("TrackResource s.gorm.ResourceByOID error(%v)/not found, pm(%+v)", err, pm) return } if rsc, p, err = s.searchResourceLog(c, obj.ID, pm.Pn, pm.Ps); err != nil { err = ecode.AegisSearchErr return } //超过部分不需要查询audit topn := int(math.Ceil(float64(p.Total) / float64(p.Ps))) if (topn > 0 && topn < p.Pn) || (topn <= 0 && p.Pn > 1) { return } //没有资源日志,则不查询审核日志--资源日志添加失败,还是需要展示审核日志啊,审核日志分页有规律 ap := &model.SearchAuditLogParam{ BusinessID: pm.BusinessID, RID: []int64{obj.ID}, CtimeFrom: LogMinTime, CtimeTo: "", Ps: 1000, //一次性拿出来所有的日志 } //对于audit日志,当add日志各种情况下会返回如下:no data(p.Total <= 0)---全量, 1页(p.Total <= p.Ps)---全量, 2或多页(p1=最新->p1.lasttime, p2=p1.lasttime-p2.lasttime,...pn=pn-1.lasttime-mintime) if p.Total > p.Ps { //有多页 llen := len(rsc) if llen > 0 && topn > p.Pn { ap.CtimeFrom = rsc[llen-1].Ctime } if p.Pn > 1 { ap.CtimeTo = pm.LastPageTime } } if audit, err = s.trackAuditLog(c, ap); err != nil { err = ecode.AegisSearchErr return } //根据ctime聚合,以资源日志为基准 llen := len(rsc) + 2 rscctime := make([]string, llen) rscctime[0] = time.Now().Format("2006-01-02 15:04:05") //max for i, item := range rsc { rscctime[i+1] = item.Ctime } rscctime[llen-1] = time.Time{}.Format("2006-01-02 15:04:05") //min index := 0 for i := 1; i < llen; i++ { rel := []int{} for ; index < len(audit); index++ { t := audit[index].Ctime if t >= rscctime[i] && t < rscctime[i-1] { rel = append(rel, index) continue } break } if i == llen-1 && len(rel) == 0 { continue } rela = append(rela, rel) } res = &model.TrackInfo{ Add: rsc, Audit: audit, Relation: rela, } return } func (s *Service) searchAuditLog(c context.Context, pm *model.SearchAuditLogParam) (resp *model.SearchLogResult, err error) { args := &model.ParamsQueryLog{ Business: model.LogBusinessAudit, Oid: pm.RID, CtimeFrom: pm.CtimeFrom, CtimeTo: pm.CtimeTo, Int2: pm.TaskID, Uname: pm.Username, } if pm.State != "" { args.Str0 = []string{pm.State} } if pm.BusinessID > 0 { args.Int0 = []int64{pm.BusinessID} } escm := model.EsCommon{ Ps: pm.Ps, Pn: pm.Pn, Order: "ctime", Sort: "desc", } return s.http.QueryLogSearch(c, args, escm) } func (s *Service) auditLogByRID(c context.Context, rid int64) (ls []string, err error) { resp, err := s.searchAuditLog(c, &model.SearchAuditLogParam{ RID: []int64{rid}, Ps: 1000, Pn: 1}) if err != nil || resp == nil { return } for _, result := range resp.Result { change := &model.Change{} if err = json.Unmarshal([]byte(result.Extra), &change); err != nil { log.Error("json.Unmarshal error(%v)", err) return } flowaction, submitopt := change.GetSubmitOper() // 时间 + 操作人 + 操作/state + 操作内容 l := fmt.Sprintf("%s %s[%s] %s %s", result.Ctime, result.Uname, result.Action, flowaction, submitopt) ls = append(ls, l) } return } func (s *Service) searchWeightLog(c context.Context, taskid int64, pn, ps int) (ls []*model.WeightLog, count int, err error) { args := &model.ParamsQueryLog{ Business: model.LogBusinessTask, Type: model.LogTYpeTaskWeight, Oid: []int64{taskid}, Action: []string{"weight"}, } escm := model.EsCommon{ Pn: pn, Ps: ps, Order: "ctime", Sort: "desc", } resp, err := s.http.QueryLogSearch(c, args, escm) if err != nil || resp == nil { return } count = resp.Page.Total for _, result := range resp.Result { logitem := make(map[string]*model.WeightLog) if err = json.Unmarshal([]byte(result.Extra), &logitem); err != nil { log.Error("json.Unmarshal error(%v)", err) return } ls = append(ls, logitem["weightlog"]) } return } func (s *Service) searchConsumerLog(c context.Context, bizid, flowid int64, action []string, uids []int64, ps int) (at map[int64]string, err error) { args := &model.ParamsQueryLog{ Business: model.LogBusinessTask, Type: model.LogTypeTaskConsumer, Action: action, UID: uids, CtimeFrom: time.Now().Add(-24 * time.Hour * 7).Format("2006-01-02 15:04:05"), } if bizid > 0 { args.Int0 = []int64{bizid} } if flowid > 0 { args.Int1 = []int64{flowid} } escm := model.EsCommon{ Order: "ctime", Sort: "desc", Pn: 1, Ps: ps, Group: "uid", } resp, err := s.http.QueryLogSearch(c, args, escm) if err != nil || resp == nil { return } at = make(map[int64]string) for _, item := range resp.Result { if ct, ok := at[item.UID]; ok { if item.Ctime > ct { at[item.UID] = item.Ctime } } else { at[item.UID] = item.Ctime } } return } func (s *Service) searchResourceLog(c context.Context, rid int64, pn, ps int) (result []*model.TrackRsc, p common.Pager, err error) { //根据ctime降序排列 args := &model.ParamsQueryLog{ Business: model.LogBusinessResource, Type: model.LogTypeFromAdd, Oid: []int64{rid}, } p = common.Pager{ Pn: pn, Ps: ps, } escm := model.EsCommon{ Order: "ctime", Sort: "desc", Pn: pn, Ps: ps, } resp, err := s.http.QueryLogSearch(c, args, escm) if err != nil || resp == nil { return } p.Total = resp.Page.Total result = make([]*model.TrackRsc, len(resp.Result)) for i, item := range resp.Result { extra := struct { Opt map[string]interface{} `json:"opt"` }{} if err = json.Unmarshal([]byte(item.Extra), &extra); err != nil { log.Error("ResourceLog json.Unmarshal error(%v) extra(%s)", err, item.Extra) return } result[i] = &model.TrackRsc{ Ctime: item.Ctime, Content: extra.Opt["content"].(string), Detail: extra.Opt, } } //content变化,由于result是根据创建时间降序排列的,以result的最后一个为基础, 向result[0]判断 content := "" for i := len(result) - 1; i >= 0; i-- { item := result[i] //固定content字段比较变化 if item.Content != content { content = item.Content continue } item.Content = "" } return }