12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097 |
- package service
- import (
- "context"
- "encoding/json"
- "fmt"
- "time"
- "go-common/app/admin/main/aegis/model/common"
- "go-common/app/admin/main/aegis/model/net"
- "go-common/library/log"
- "go-common/library/xstr"
- )
- const netCacheNull = "{null}"
- //ICacheObjDest 对象缓存接口
- type ICacheObjDest interface {
- //GetKey 获取对象缓存的key
- GetKey(id int64, opt ...interface{}) (k []string)
- //AppendString--v is from cache, to decode & append to dest, && check null
- AppendString(id int64, v string) (ok bool, err error)
- //AppendRaw--get from db, to encode & append to dest & to convert to string
- AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error)
- }
- //ICacheDecoder 解析字符串
- type ICacheDecoder interface {
- //--get from cache, to decode & append to dest, && check null
- AppendString(id int64, v string) (ok bool, err error)
- }
- //FCacheKeyPro 根据id获取key
- type FCacheKeyPro func(id int64, opt ...interface{}) (k []string)
- //FCacheAggregateRelation db获取聚合字段和对象id的关联
- type FCacheAggregateRelation func(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error)
- //FCacheRawByAggr db根据聚合值,获取对象集合和关联
- type FCacheRawByAggr func(c context.Context, miss []int64, objDest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error)
- //AggrCacheDest 聚合缓存
- type AggrCacheDest struct {
- Dest map[int64][]int64
- GetKey FCacheKeyPro
- AggrRelaRaw FCacheAggregateRelation
- }
- //AppendString 实现ICacheDecoder接口
- func (a *AggrCacheDest) AppendString(id int64, v string) (ok bool, err error) {
- var (
- res []int64
- )
- if a.Dest == nil {
- a.Dest = map[int64][]int64{}
- }
- if res, err = xstr.SplitInts(v); err != nil {
- log.Error("AggrCacheDest AppendString(%s) error(%v)", v, err)
- return
- }
- ok = true
- a.Dest[id] = res
- return
- }
- //AppendRelationRaw 根据聚合字段值,从db获取聚合字段与对象id的关联
- func (a *AggrCacheDest) AppendRelationRaw(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res map[int64][]int64
- )
- if res, err = a.AggrRelaRaw(c, miss, kopt, opt...); err != nil {
- log.Error("AggrCacheDest AppendRelationRaw error(%v) miss(%+v)", err, miss)
- return
- }
- if len(res) == 0 {
- return
- }
- a.Dest = common.CopyMap(res, a.Dest, true)
- missCache = make(map[string]string, len(res))
- for aggr, ids := range res {
- v := xstr.JoinInts(ids)
- for _, k := range a.GetKey(aggr, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- //CacheWrap 以net对象为例子,可以根据business_id字段聚合
- type CacheWrap struct {
- ObjDest ICacheObjDest //对象集合
- AggrObjRaw FCacheRawByAggr //根据聚合字段获取对象集合from db
- AggregateDest *AggrCacheDest //对象的某个聚合字段与对象id的集合
- }
- //AppendAggregateRaw 从db获取聚合字段映射的对象集合&关联
- func (w *CacheWrap) AppendAggregateRaw(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- aggrRela map[int64][]int64
- )
- if missCache, aggrRela, err = w.AggrObjRaw(c, miss, w.ObjDest, kopt, opt...); err != nil {
- return
- }
- if missCache == nil {
- missCache = map[string]string{}
- }
- for aggrFieldVal, objIDList := range aggrRela {
- v := xstr.JoinInts(objIDList)
- for _, k := range w.AggregateDest.GetKey(aggrFieldVal, kopt...) {
- missCache[k] = v
- }
- }
- w.AggregateDest.Dest = common.CopyMap(aggrRela, w.AggregateDest.Dest, true)
- return
- }
- //getFromCache 获取缓存并解析
- func (s *Service) getFromCache(c context.Context, ids []int64, keyPro FCacheKeyPro, dest ICacheDecoder, kopt []interface{}) (miss []int64) {
- var (
- caches []string
- err error
- ok bool
- )
- if len(ids) == 0 {
- return
- }
- //get from cache, chech whether miss
- ids = common.Unique(ids, true)
- keys := []string{}
- keymap := map[string]int64{}
- for _, id := range ids {
- for _, k := range keyPro(id, kopt...) {
- if keymap[k] != 0 {
- continue
- }
- keymap[k] = id
- keys = append(keys, k)
- }
- }
- if caches, err = s.redis.MGet(c, keys...); err != nil {
- miss = ids
- err = nil
- return
- }
- log.Info("getFromCache caches(%+v) keys(%+v)", caches, keys)
- missmap := map[int64]int64{}
- for i, item := range caches {
- id := keymap[keys[i]]
- if item != "" && item != netCacheNull {
- if ok, err = dest.AppendString(id, item); err == nil && ok {
- continue
- }
- }
- if missmap[id] > 0 || item == netCacheNull {
- continue
- }
- miss = append(miss, id)
- missmap[id] = id
- }
- log.Info("getFromCache miss(%+v) keys(%+v)", miss, keys)
- return
- }
- func (s *Service) setNetCache() {
- for {
- missCache, open := <-s.netCacheCh
- if !open {
- log.Info("s.netCacheCh closed!")
- break
- }
- s.redis.SetMulti(context.TODO(), missCache)
- time.Sleep(time.Millisecond * 1)
- }
- }
- //objCache 根据对象id,获取对象
- func (s *Service) objCache(c context.Context, ids []int64, dest ICacheObjDest, kopt []interface{}, opt ...interface{}) (err error) {
- var (
- miss []int64
- missCache map[string]string
- )
- //get from cache
- if miss = s.getFromCache(c, ids, dest.GetKey, dest, kopt); len(miss) == 0 {
- return
- }
- //get miss from db
- if missCache, err = dest.AppendRaw(c, s, miss, kopt, opt...); err != nil {
- return
- }
- //set miss to cache
- if missCache == nil {
- missCache = map[string]string{}
- }
- for _, missid := range miss {
- missks := dest.GetKey(missid, kopt...)
- for _, missk := range missks {
- if _, exist := missCache[missk]; exist {
- continue
- }
- missCache[missk] = netCacheNull
- }
- }
- log.Info("objCache missCache(%+v)", missCache)
- s.netCacheCh <- missCache
- return
- }
- //aggregateRelationCache 根据聚合值,获取聚合字段与对象的映射关系
- func (s *Service) aggregateRelationCache(c context.Context, aggrID []int64, dest *AggrCacheDest, kopt []interface{}, aggrOpt ...interface{}) (err error) {
- var (
- miss []int64
- missCache map[string]string
- )
- //get from cache
- if miss = s.getFromCache(c, aggrID, dest.GetKey, dest, kopt); len(miss) == 0 {
- return
- }
- //get miss from db
- if missCache, err = dest.AppendRelationRaw(c, miss, kopt, aggrOpt...); err != nil {
- return
- }
- //set miss to cache
- if missCache == nil {
- missCache = map[string]string{}
- }
- for _, missid := range miss {
- missks := dest.GetKey(missid, kopt...)
- for _, missk := range missks {
- if _, exist := missCache[missk]; exist {
- continue
- }
- missCache[missk] = netCacheNull
- }
- }
- log.Info("aggregateRelationCache missCache(%+v)", missCache)
- s.netCacheCh <- missCache
- return
- }
- //aggregateCache 根据聚合字段的值,获取对应的对象,比如:根据flowid获取获取其绑定关系binds
- func (s *Service) aggregateCache(c context.Context, aggrID []int64, wrap *CacheWrap, aggrkopt, objkopt, aggrOpt, objOpt []interface{}) (err error) {
- var (
- ptrAggrMiss, ptrAggrAll, ptrCache, ptrObjMiss string = "aggrmiss", "aggrall", "cache", "objmiss"
- current string
- needObjRelat map[int64][]int64
- aggrMiss, objMiss []int64
- missCache, missObjCache map[string]string
- )
- //获取flowid对应哪些binds的映射关系
- if aggrMiss = s.getFromCache(c, aggrID, wrap.AggregateDest.GetKey, wrap.AggregateDest, aggrkopt); len(aggrMiss) == 0 {
- current = ptrAggrAll
- } else {
- current = ptrAggrMiss
- }
- needObjRelat = common.CopyMap(wrap.AggregateDest.Dest, needObjRelat, true)
- if current == ptrAggrMiss {
- if missCache, err = wrap.AppendAggregateRaw(c, aggrMiss, aggrkopt, aggrOpt...); err != nil {
- return
- }
- if len(needObjRelat) <= 0 { //全部从db获取
- current = ptrCache
- } else { //部分缓存获取聚合字段与对象id的关联,部分db根据聚合字段获取对象体
- current = ptrAggrAll
- }
- }
- if current == ptrAggrAll {
- objID := []int64{}
- for _, obji := range needObjRelat {
- objID = append(objID, obji...)
- }
- if objMiss = s.getFromCache(c, objID, wrap.ObjDest.GetKey, wrap.ObjDest, objkopt); len(objMiss) == 0 {
- return
- }
- current = ptrObjMiss
- }
- if missCache == nil {
- missCache = map[string]string{}
- }
- if current == ptrObjMiss {
- if missObjCache, err = wrap.ObjDest.AppendRaw(c, s, objMiss, objkopt, objOpt...); err != nil {
- return
- }
- for k, v := range missObjCache {
- missCache[k] = v
- }
- current = ptrCache
- }
- if current == ptrCache {
- for _, missid := range aggrMiss {
- missks := wrap.AggregateDest.GetKey(missid, aggrkopt...)
- for _, missk := range missks {
- if _, exist := missCache[missk]; exist {
- continue
- }
- missCache[missk] = netCacheNull
- }
- }
- for _, missid := range objMiss {
- missks := wrap.ObjDest.GetKey(missid, objkopt...)
- for _, missk := range missks {
- if _, exist := missCache[missk]; exist {
- continue
- }
- missCache[missk] = netCacheNull
- }
- }
- log.Info("aggregateCache missCache(%+v)", missCache)
- s.netCacheCh <- missCache
- }
- return
- }
- //delCache 删除缓存
- func (s *Service) delCache(c context.Context, keys []string) (err error) {
- if len(keys) == 0 {
- return
- }
- if err = s.redis.DelMulti(c, keys...); err != nil {
- log.Error("delCache s.redis.DelMulti error(%v) keys(%+v)", err, keys)
- }
- return
- }
- /**
- * 业务应用
- * 需要缓存的对象
- ******************************************
- * one2many
- * key type val
- * flow int64+avail []*tokenbind,err--ok
- * flow int64+dir []*dir,err---ok
- * netid int64 []*flow,err---ok
- * netid []int64,avail,disp []int64,err--transitionid---ok
- * tranid []int64,isbatch []*tokenbind,err---ok
- * tranid int64,avail []*dir.err--ok
- * biz int64 []int64,err---netid ---ok
- *
- * one2one
- * ids []int64,xxx,yy []*struct,err --- ok
- ******************************************
- */
- //net的聚合字段business_id
- type netArr struct {
- dest []*net.Net
- }
- func (n *netArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("net:%d", id)}
- }
- func (n *netArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.Net{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("netArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *netArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.Net
- bs []byte
- )
- if res, err = s.gorm.Nets(c, miss); err != nil {
- log.Error("netArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("netArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) bizNetRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
- rela, err = s.gorm.NetIDByBusiness(c, miss)
- return
- }
- func (s *Service) bizNetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("business_net:%d", id)}
- }
- func (s *Service) netIDByBusiness(c context.Context, business int64) (res []int64, err error) {
- aggr := &AggrCacheDest{
- GetKey: s.bizNetKey,
- AggrRelaRaw: s.bizNetRelation,
- }
- if err = s.aggregateRelationCache(c, []int64{business}, aggr, nil); err != nil {
- return
- }
- res = aggr.Dest[business]
- return
- }
- func (s *Service) netByID(c context.Context, id int64) (res *net.Net, err error) {
- dest := &netArr{}
- if err = s.objCache(c, []int64{id}, dest, nil); err != nil {
- return
- }
- if len(dest.dest) == 0 {
- return
- }
- res = dest.dest[0]
- return
- }
- func (s *Service) delNetCache(c context.Context, n *net.Net) (err error) {
- objdest := &netArr{}
- keys := objdest.GetKey(n.ID)
- keys = append(keys, s.bizNetKey(n.BusinessID)...)
- err = s.delCache(c, keys)
- return
- }
- //tokenArr 根据id查询对象
- type tokenArr struct {
- dest []*net.Token
- }
- func (n *tokenArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("token:%d", id)}
- }
- func (n *tokenArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.Token{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("tokenArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *tokenArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.Token
- bs []byte
- )
- if res, err = s.gorm.Tokens(c, miss); err != nil {
- log.Error("tokenArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("tokenArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) tokens(c context.Context, ids []int64) (res []*net.Token, err error) {
- objdest := &tokenArr{}
- if err = s.objCache(c, ids, objdest, nil); err != nil {
- return
- }
- res = objdest.dest
- return
- }
- //token_bind的聚合字段element_id
- type bindArr struct {
- dest []*net.TokenBind
- }
- func (n *bindArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("bind:%d", id)}
- }
- func (n *bindArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.TokenBind{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("bindArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *bindArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.TokenBind
- bs []byte
- )
- if res, err = s.gorm.TokenBinds(c, miss); err != nil {
- log.Error("bindArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("bindArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) elementBindKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("element_bind:%d", id)}
- }
- //全部类型的正常状态的绑定
- func (s *Service) elementBindAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
- var (
- res map[int64][]*net.TokenBind
- bs []byte
- )
- objdester := objdest.(*bindArr)
- if res, err = s.gorm.TokenBindByElement(c, miss, net.BindTypes, true); err != nil {
- return
- }
- aggrRelation = map[int64][]int64{}
- objCache = map[string]string{}
- for eid, tbs := range res {
- for _, item := range tbs {
- if bs, err = json.Marshal(item); err != nil {
- return
- }
- v := string(bs)
- for _, k := range objdest.GetKey(item.ID, kopt...) {
- objCache[k] = v
- }
- objdester.dest = append(objdester.dest, item)
- aggrRelation[eid] = append(aggrRelation[eid], item.ID)
- }
- }
- return
- }
- func (s *Service) tokenBindByElement(c context.Context, eid []int64, tp []int8) (res []*net.TokenBind, err error) {
- objdest := &bindArr{}
- w := &CacheWrap{
- ObjDest: objdest,
- AggrObjRaw: s.elementBindAppendRaw,
- AggregateDest: &AggrCacheDest{
- GetKey: s.elementBindKey,
- },
- }
- //获取eid对应的所有类型的bind对象
- if err = s.aggregateCache(c, eid, w, nil, nil, nil, nil); err != nil {
- return
- }
- res = []*net.TokenBind{}
- for _, item := range objdest.dest {
- included := false
- for _, bindtp := range tp {
- if bindtp == item.Type {
- included = true
- break
- }
- }
- if included {
- res = append(res, item)
- }
- }
- return
- }
- func (s *Service) tokenBinds(c context.Context, ids []int64, onlyAvailable bool) (res []*net.TokenBind, err error) {
- objdest := &bindArr{}
- if err = s.objCache(c, ids, objdest, nil); err != nil {
- return
- }
- if !onlyAvailable {
- res = objdest.dest
- return
- }
- for _, item := range objdest.dest {
- if item.IsAvailable() {
- res = append(res, item)
- }
- }
- return
- }
- //flow的聚合字段是net_id
- type flowArr struct {
- dest []*net.Flow
- }
- func (n *flowArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("flow:%d", id)}
- }
- func (n *flowArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.Flow{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("flowArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *flowArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.Flow
- bs []byte
- )
- if res, err = s.gorm.Flows(c, miss); err != nil {
- log.Error("flowArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("flowArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) netFlowKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("net_flow:%d", id)}
- }
- func (s *Service) netFlowAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
- var (
- res []*net.Flow
- bs []byte
- )
- objdester := objdest.(*flowArr)
- if res, err = s.gorm.FlowsByNet(c, miss); err != nil {
- return
- }
- aggrRelation = map[int64][]int64{}
- objCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- return
- }
- v := string(bs)
- for _, k := range objdest.GetKey(item.ID, kopt...) {
- objCache[k] = v
- }
- objdester.dest = append(objdester.dest, item)
- aggrRelation[item.NetID] = append(aggrRelation[item.NetID], item.ID)
- }
- return
- }
- func (s *Service) netFlowRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
- rela, err = s.gorm.FlowIDByNet(c, miss)
- return
- }
- func (s *Service) flowIDByNet(c context.Context, nid int64) (res []int64, err error) {
- dest := &AggrCacheDest{
- GetKey: s.netFlowKey,
- AggrRelaRaw: s.netFlowRelation,
- }
- if err = s.aggregateRelationCache(c, []int64{nid}, dest, nil); err != nil {
- return
- }
- res = dest.Dest[nid]
- return
- }
- func (s *Service) flowsByNet(c context.Context, nid int64) (res []*net.Flow, err error) {
- objdest := &flowArr{}
- w := &CacheWrap{
- ObjDest: objdest,
- AggrObjRaw: s.netFlowAppendRaw,
- AggregateDest: &AggrCacheDest{
- GetKey: s.netFlowKey,
- },
- }
- if err = s.aggregateCache(c, []int64{nid}, w, nil, nil, nil, nil); err != nil {
- return
- }
- res = objdest.dest
- return
- }
- func (s *Service) flows(c context.Context, ids []int64, onlyAvailable bool) (list []*net.Flow, err error) {
- objdest := &flowArr{}
- if err = s.objCache(c, ids, objdest, nil); err != nil {
- return
- }
- if !onlyAvailable {
- list = objdest.dest
- return
- }
- for _, item := range objdest.dest {
- if item.IsAvailable() {
- list = append(list, item)
- }
- }
- return
- }
- func (s *Service) flowByID(c context.Context, id int64) (f *net.Flow, err error) {
- var list []*net.Flow
- if list, err = s.flows(c, []int64{id}, false); err != nil {
- return
- }
- if len(list) == 0 {
- return
- }
- f = list[0]
- return
- }
- func (s *Service) delFlowCache(c context.Context, f *net.Flow, changedBind []int64) (err error) {
- objdest := &flowArr{}
- keys := objdest.GetKey(f.ID)
- keys = append(keys, s.netFlowKey(f.NetID)...)
- keys = append(keys, s.elementBindKey(f.ID)...)
- binds := &bindArr{}
- for _, bind := range changedBind {
- keys = append(keys, binds.GetKey(bind)...)
- }
- err = s.delCache(c, keys)
- return
- }
- //transition的聚合字段是net_id
- type tranArr struct {
- dest []*net.Transition
- }
- func (n *tranArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("transition:%d", id)}
- }
- func (n *tranArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.Transition{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("tranArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *tranArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.Transition
- bs []byte
- )
- if res, err = s.gorm.Transitions(c, miss); err != nil {
- log.Error("tranArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("tranArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) netTranKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("net_transition_dispatch%v_avail%v:%d", opt[0].(bool), opt[1].(bool), id)}
- }
- func (s *Service) netTranRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
- rela, err = s.gorm.TransitionIDByNet(c, miss, opt[0].(bool), opt[1].(bool))
- return
- }
- func (s *Service) tranIDByNet(c context.Context, nid []int64, onlyDispatch bool, onlyAvailable bool) (res []int64, err error) {
- dest := &AggrCacheDest{
- GetKey: s.netTranKey,
- AggrRelaRaw: s.netTranRelation,
- }
- if err = s.aggregateRelationCache(c, nid, dest, []interface{}{onlyDispatch, onlyAvailable}, onlyDispatch, onlyAvailable); err != nil {
- return
- }
- for _, ids := range dest.Dest {
- res = append(res, ids...)
- }
- return
- }
- func (s *Service) transitions(c context.Context, ids []int64, onlyAvailable bool) (list []*net.Transition, err error) {
- objdest := &tranArr{}
- if err = s.objCache(c, ids, objdest, nil); err != nil {
- return
- }
- if !onlyAvailable {
- list = objdest.dest
- return
- }
- for _, item := range objdest.dest {
- if item.IsAvailable() {
- list = append(list, item)
- }
- }
- return
- }
- func (s *Service) transitionByID(c context.Context, id int64) (res *net.Transition, err error) {
- var list []*net.Transition
- if list, err = s.transitions(c, []int64{id}, false); err != nil {
- return
- }
- if len(list) == 0 {
- return
- }
- res = list[0]
- return
- }
- func (s *Service) delTranCache(c context.Context, f *net.Transition, changedBind []int64) (err error) {
- objdest := &tranArr{}
- keys := objdest.GetKey(f.ID)
- keys = append(keys, s.netTranKey(f.NetID, true, true)...)
- keys = append(keys, s.netTranKey(f.NetID, true, false)...)
- keys = append(keys, s.netTranKey(f.NetID, false, true)...)
- keys = append(keys, s.netTranKey(f.NetID, false, false)...)
- keys = append(keys, s.elementBindKey(f.ID)...)
- binds := &bindArr{}
- for _, bind := range changedBind {
- keys = append(keys, binds.GetKey(bind)...)
- }
- err = s.delCache(c, keys)
- return
- }
- //direction的聚合字段是flow_id/transition_id
- type dirArr struct {
- dest []*net.Direction
- }
- func (n *dirArr) GetKey(id int64, opt ...interface{}) []string {
- return []string{fmt.Sprintf("direction:%d", id)}
- }
- func (n *dirArr) AppendString(id int64, v string) (ok bool, err error) {
- one := &net.Direction{}
- if err = json.Unmarshal([]byte(v), one); err != nil {
- log.Error("dirArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
- return
- }
- ok = true
- n.dest = append(n.dest, one)
- return
- }
- func (n *dirArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
- var (
- res []*net.Direction
- bs []byte
- )
- if res, err = s.gorm.Directions(c, miss); err != nil {
- log.Error("dirArr AppendRaw error(%+v) miss(%+v)", err, miss)
- return
- }
- n.dest = append(n.dest, res...)
- missCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- log.Error("dirArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
- continue
- }
- v := string(bs)
- for _, k := range n.GetKey(item.ID, kopt...) {
- missCache[k] = v
- }
- }
- return
- }
- func (s *Service) flowDirKey(id int64, opt ...interface{}) []string {
- di := opt[0].([]int8)
- res := make([]string, len(di))
- for i, d := range di {
- res[i] = fmt.Sprintf("flow_direction_%d:%d", d, id)
- }
- return res
- }
- func (s *Service) tranDirKey(id int64, opt ...interface{}) []string {
- di := opt[0].([]int8)
- avail := opt[1].(bool)
- res := make([]string, len(di))
- for i, d := range di {
- res[i] = fmt.Sprintf("transition_direction_avail%v_%d:%d", avail, d, id)
- }
- return res
- }
- func (s *Service) flowDirAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
- var (
- res []*net.Direction
- bs []byte
- )
- objdester := objdest.(*dirArr)
- if res, err = s.gorm.DirectionByFlowID(c, miss, opt[0].(int8)); err != nil {
- return
- }
- aggrRelation = map[int64][]int64{}
- objCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- return
- }
- v := string(bs)
- for _, k := range objdest.GetKey(item.ID, kopt...) {
- objCache[k] = v
- }
- objdester.dest = append(objdester.dest, item)
- aggrRelation[item.FlowID] = append(aggrRelation[item.FlowID], item.ID)
- }
- return
- }
- func (s *Service) tranDirAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
- var (
- res []*net.Direction
- bs []byte
- )
- objdester := objdest.(*dirArr)
- if res, err = s.gorm.DirectionByTransitionID(c, miss, opt[0].(int8), opt[1].(bool)); err != nil {
- return
- }
- aggrRelation = map[int64][]int64{}
- objCache = map[string]string{}
- for _, item := range res {
- if bs, err = json.Marshal(item); err != nil {
- return
- }
- v := string(bs)
- for _, k := range objdest.GetKey(item.ID, kopt...) {
- objCache[k] = v
- }
- objdester.dest = append(objdester.dest, item)
- aggrRelation[item.TransitionID] = append(aggrRelation[item.TransitionID], item.ID)
- }
- return
- }
- func (s *Service) dirByFlow(c context.Context, nid []int64, dir int8) (res []*net.Direction, err error) {
- objdest := &dirArr{}
- w := &CacheWrap{
- ObjDest: objdest,
- AggrObjRaw: s.flowDirAppendRaw,
- AggregateDest: &AggrCacheDest{
- GetKey: s.flowDirKey,
- },
- }
- if err = s.aggregateCache(c, nid, w, []interface{}{[]int8{dir}}, nil, []interface{}{dir}, nil); err != nil {
- return
- }
- res = objdest.dest
- return
- }
- func (s *Service) dirByTran(c context.Context, nid []int64, dir int8, onlyAvailable bool) (res []*net.Direction, err error) {
- objdest := &dirArr{}
- w := &CacheWrap{
- ObjDest: objdest,
- AggrObjRaw: s.tranDirAppendRaw,
- AggregateDest: &AggrCacheDest{
- GetKey: s.tranDirKey,
- },
- }
- if err = s.aggregateCache(c, nid, w, []interface{}{[]int8{dir}, onlyAvailable}, nil, []interface{}{dir, onlyAvailable}, nil); err != nil {
- return
- }
- res = objdest.dest
- return
- }
- func (s *Service) directions(c context.Context, ids []int64) (list []*net.Direction, err error) {
- objdest := &dirArr{}
- if err = s.objCache(c, ids, objdest, nil); err != nil {
- return
- }
- list = objdest.dest
- return
- }
- func (s *Service) delDirCache(c context.Context, dir *net.Direction) (err error) {
- objdest := &dirArr{}
- keys := objdest.GetKey(dir.ID)
- keys = append(keys, s.flowDirKey(dir.FlowID, []int8{net.DirInput, net.DirOutput})...)
- keys = append(keys, s.tranDirKey(dir.TransitionID, []int8{net.DirInput, net.DirOutput}, true)...)
- keys = append(keys, s.tranDirKey(dir.TransitionID, []int8{net.DirInput, net.DirOutput}, false)...)
- err = s.delCache(c, keys)
- return
- }
|