net_cache.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "go-common/app/admin/main/aegis/model/common"
  8. "go-common/app/admin/main/aegis/model/net"
  9. "go-common/library/log"
  10. "go-common/library/xstr"
  11. )
  12. const netCacheNull = "{null}"
  13. //ICacheObjDest 对象缓存接口
  14. type ICacheObjDest interface {
  15. //GetKey 获取对象缓存的key
  16. GetKey(id int64, opt ...interface{}) (k []string)
  17. //AppendString--v is from cache, to decode & append to dest, && check null
  18. AppendString(id int64, v string) (ok bool, err error)
  19. //AppendRaw--get from db, to encode & append to dest & to convert to string
  20. AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error)
  21. }
  22. //ICacheDecoder 解析字符串
  23. type ICacheDecoder interface {
  24. //--get from cache, to decode & append to dest, && check null
  25. AppendString(id int64, v string) (ok bool, err error)
  26. }
  27. //FCacheKeyPro 根据id获取key
  28. type FCacheKeyPro func(id int64, opt ...interface{}) (k []string)
  29. //FCacheAggregateRelation db获取聚合字段和对象id的关联
  30. type FCacheAggregateRelation func(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error)
  31. //FCacheRawByAggr db根据聚合值,获取对象集合和关联
  32. type FCacheRawByAggr func(c context.Context, miss []int64, objDest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error)
  33. //AggrCacheDest 聚合缓存
  34. type AggrCacheDest struct {
  35. Dest map[int64][]int64
  36. GetKey FCacheKeyPro
  37. AggrRelaRaw FCacheAggregateRelation
  38. }
  39. //AppendString 实现ICacheDecoder接口
  40. func (a *AggrCacheDest) AppendString(id int64, v string) (ok bool, err error) {
  41. var (
  42. res []int64
  43. )
  44. if a.Dest == nil {
  45. a.Dest = map[int64][]int64{}
  46. }
  47. if res, err = xstr.SplitInts(v); err != nil {
  48. log.Error("AggrCacheDest AppendString(%s) error(%v)", v, err)
  49. return
  50. }
  51. ok = true
  52. a.Dest[id] = res
  53. return
  54. }
  55. //AppendRelationRaw 根据聚合字段值,从db获取聚合字段与对象id的关联
  56. func (a *AggrCacheDest) AppendRelationRaw(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  57. var (
  58. res map[int64][]int64
  59. )
  60. if res, err = a.AggrRelaRaw(c, miss, kopt, opt...); err != nil {
  61. log.Error("AggrCacheDest AppendRelationRaw error(%v) miss(%+v)", err, miss)
  62. return
  63. }
  64. if len(res) == 0 {
  65. return
  66. }
  67. a.Dest = common.CopyMap(res, a.Dest, true)
  68. missCache = make(map[string]string, len(res))
  69. for aggr, ids := range res {
  70. v := xstr.JoinInts(ids)
  71. for _, k := range a.GetKey(aggr, kopt...) {
  72. missCache[k] = v
  73. }
  74. }
  75. return
  76. }
  77. //CacheWrap 以net对象为例子,可以根据business_id字段聚合
  78. type CacheWrap struct {
  79. ObjDest ICacheObjDest //对象集合
  80. AggrObjRaw FCacheRawByAggr //根据聚合字段获取对象集合from db
  81. AggregateDest *AggrCacheDest //对象的某个聚合字段与对象id的集合
  82. }
  83. //AppendAggregateRaw 从db获取聚合字段映射的对象集合&关联
  84. func (w *CacheWrap) AppendAggregateRaw(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  85. var (
  86. aggrRela map[int64][]int64
  87. )
  88. if missCache, aggrRela, err = w.AggrObjRaw(c, miss, w.ObjDest, kopt, opt...); err != nil {
  89. return
  90. }
  91. if missCache == nil {
  92. missCache = map[string]string{}
  93. }
  94. for aggrFieldVal, objIDList := range aggrRela {
  95. v := xstr.JoinInts(objIDList)
  96. for _, k := range w.AggregateDest.GetKey(aggrFieldVal, kopt...) {
  97. missCache[k] = v
  98. }
  99. }
  100. w.AggregateDest.Dest = common.CopyMap(aggrRela, w.AggregateDest.Dest, true)
  101. return
  102. }
  103. //getFromCache 获取缓存并解析
  104. func (s *Service) getFromCache(c context.Context, ids []int64, keyPro FCacheKeyPro, dest ICacheDecoder, kopt []interface{}) (miss []int64) {
  105. var (
  106. caches []string
  107. err error
  108. ok bool
  109. )
  110. if len(ids) == 0 {
  111. return
  112. }
  113. //get from cache, chech whether miss
  114. ids = common.Unique(ids, true)
  115. keys := []string{}
  116. keymap := map[string]int64{}
  117. for _, id := range ids {
  118. for _, k := range keyPro(id, kopt...) {
  119. if keymap[k] != 0 {
  120. continue
  121. }
  122. keymap[k] = id
  123. keys = append(keys, k)
  124. }
  125. }
  126. if caches, err = s.redis.MGet(c, keys...); err != nil {
  127. miss = ids
  128. err = nil
  129. return
  130. }
  131. log.Info("getFromCache caches(%+v) keys(%+v)", caches, keys)
  132. missmap := map[int64]int64{}
  133. for i, item := range caches {
  134. id := keymap[keys[i]]
  135. if item != "" && item != netCacheNull {
  136. if ok, err = dest.AppendString(id, item); err == nil && ok {
  137. continue
  138. }
  139. }
  140. if missmap[id] > 0 || item == netCacheNull {
  141. continue
  142. }
  143. miss = append(miss, id)
  144. missmap[id] = id
  145. }
  146. log.Info("getFromCache miss(%+v) keys(%+v)", miss, keys)
  147. return
  148. }
  149. func (s *Service) setNetCache() {
  150. for {
  151. missCache, open := <-s.netCacheCh
  152. if !open {
  153. log.Info("s.netCacheCh closed!")
  154. break
  155. }
  156. s.redis.SetMulti(context.TODO(), missCache)
  157. time.Sleep(time.Millisecond * 1)
  158. }
  159. }
  160. //objCache 根据对象id,获取对象
  161. func (s *Service) objCache(c context.Context, ids []int64, dest ICacheObjDest, kopt []interface{}, opt ...interface{}) (err error) {
  162. var (
  163. miss []int64
  164. missCache map[string]string
  165. )
  166. //get from cache
  167. if miss = s.getFromCache(c, ids, dest.GetKey, dest, kopt); len(miss) == 0 {
  168. return
  169. }
  170. //get miss from db
  171. if missCache, err = dest.AppendRaw(c, s, miss, kopt, opt...); err != nil {
  172. return
  173. }
  174. //set miss to cache
  175. if missCache == nil {
  176. missCache = map[string]string{}
  177. }
  178. for _, missid := range miss {
  179. missks := dest.GetKey(missid, kopt...)
  180. for _, missk := range missks {
  181. if _, exist := missCache[missk]; exist {
  182. continue
  183. }
  184. missCache[missk] = netCacheNull
  185. }
  186. }
  187. log.Info("objCache missCache(%+v)", missCache)
  188. s.netCacheCh <- missCache
  189. return
  190. }
  191. //aggregateRelationCache 根据聚合值,获取聚合字段与对象的映射关系
  192. func (s *Service) aggregateRelationCache(c context.Context, aggrID []int64, dest *AggrCacheDest, kopt []interface{}, aggrOpt ...interface{}) (err error) {
  193. var (
  194. miss []int64
  195. missCache map[string]string
  196. )
  197. //get from cache
  198. if miss = s.getFromCache(c, aggrID, dest.GetKey, dest, kopt); len(miss) == 0 {
  199. return
  200. }
  201. //get miss from db
  202. if missCache, err = dest.AppendRelationRaw(c, miss, kopt, aggrOpt...); err != nil {
  203. return
  204. }
  205. //set miss to cache
  206. if missCache == nil {
  207. missCache = map[string]string{}
  208. }
  209. for _, missid := range miss {
  210. missks := dest.GetKey(missid, kopt...)
  211. for _, missk := range missks {
  212. if _, exist := missCache[missk]; exist {
  213. continue
  214. }
  215. missCache[missk] = netCacheNull
  216. }
  217. }
  218. log.Info("aggregateRelationCache missCache(%+v)", missCache)
  219. s.netCacheCh <- missCache
  220. return
  221. }
  222. //aggregateCache 根据聚合字段的值,获取对应的对象,比如:根据flowid获取获取其绑定关系binds
  223. func (s *Service) aggregateCache(c context.Context, aggrID []int64, wrap *CacheWrap, aggrkopt, objkopt, aggrOpt, objOpt []interface{}) (err error) {
  224. var (
  225. ptrAggrMiss, ptrAggrAll, ptrCache, ptrObjMiss string = "aggrmiss", "aggrall", "cache", "objmiss"
  226. current string
  227. needObjRelat map[int64][]int64
  228. aggrMiss, objMiss []int64
  229. missCache, missObjCache map[string]string
  230. )
  231. //获取flowid对应哪些binds的映射关系
  232. if aggrMiss = s.getFromCache(c, aggrID, wrap.AggregateDest.GetKey, wrap.AggregateDest, aggrkopt); len(aggrMiss) == 0 {
  233. current = ptrAggrAll
  234. } else {
  235. current = ptrAggrMiss
  236. }
  237. needObjRelat = common.CopyMap(wrap.AggregateDest.Dest, needObjRelat, true)
  238. if current == ptrAggrMiss {
  239. if missCache, err = wrap.AppendAggregateRaw(c, aggrMiss, aggrkopt, aggrOpt...); err != nil {
  240. return
  241. }
  242. if len(needObjRelat) <= 0 { //全部从db获取
  243. current = ptrCache
  244. } else { //部分缓存获取聚合字段与对象id的关联,部分db根据聚合字段获取对象体
  245. current = ptrAggrAll
  246. }
  247. }
  248. if current == ptrAggrAll {
  249. objID := []int64{}
  250. for _, obji := range needObjRelat {
  251. objID = append(objID, obji...)
  252. }
  253. if objMiss = s.getFromCache(c, objID, wrap.ObjDest.GetKey, wrap.ObjDest, objkopt); len(objMiss) == 0 {
  254. return
  255. }
  256. current = ptrObjMiss
  257. }
  258. if missCache == nil {
  259. missCache = map[string]string{}
  260. }
  261. if current == ptrObjMiss {
  262. if missObjCache, err = wrap.ObjDest.AppendRaw(c, s, objMiss, objkopt, objOpt...); err != nil {
  263. return
  264. }
  265. for k, v := range missObjCache {
  266. missCache[k] = v
  267. }
  268. current = ptrCache
  269. }
  270. if current == ptrCache {
  271. for _, missid := range aggrMiss {
  272. missks := wrap.AggregateDest.GetKey(missid, aggrkopt...)
  273. for _, missk := range missks {
  274. if _, exist := missCache[missk]; exist {
  275. continue
  276. }
  277. missCache[missk] = netCacheNull
  278. }
  279. }
  280. for _, missid := range objMiss {
  281. missks := wrap.ObjDest.GetKey(missid, objkopt...)
  282. for _, missk := range missks {
  283. if _, exist := missCache[missk]; exist {
  284. continue
  285. }
  286. missCache[missk] = netCacheNull
  287. }
  288. }
  289. log.Info("aggregateCache missCache(%+v)", missCache)
  290. s.netCacheCh <- missCache
  291. }
  292. return
  293. }
  294. //delCache 删除缓存
  295. func (s *Service) delCache(c context.Context, keys []string) (err error) {
  296. if len(keys) == 0 {
  297. return
  298. }
  299. if err = s.redis.DelMulti(c, keys...); err != nil {
  300. log.Error("delCache s.redis.DelMulti error(%v) keys(%+v)", err, keys)
  301. }
  302. return
  303. }
  304. /**
  305. * 业务应用
  306. * 需要缓存的对象
  307. ******************************************
  308. * one2many
  309. * key type val
  310. * flow int64+avail []*tokenbind,err--ok
  311. * flow int64+dir []*dir,err---ok
  312. * netid int64 []*flow,err---ok
  313. * netid []int64,avail,disp []int64,err--transitionid---ok
  314. * tranid []int64,isbatch []*tokenbind,err---ok
  315. * tranid int64,avail []*dir.err--ok
  316. * biz int64 []int64,err---netid ---ok
  317. *
  318. * one2one
  319. * ids []int64,xxx,yy []*struct,err --- ok
  320. ******************************************
  321. */
  322. //net的聚合字段business_id
  323. type netArr struct {
  324. dest []*net.Net
  325. }
  326. func (n *netArr) GetKey(id int64, opt ...interface{}) []string {
  327. return []string{fmt.Sprintf("net:%d", id)}
  328. }
  329. func (n *netArr) AppendString(id int64, v string) (ok bool, err error) {
  330. one := &net.Net{}
  331. if err = json.Unmarshal([]byte(v), one); err != nil {
  332. log.Error("netArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  333. return
  334. }
  335. ok = true
  336. n.dest = append(n.dest, one)
  337. return
  338. }
  339. func (n *netArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  340. var (
  341. res []*net.Net
  342. bs []byte
  343. )
  344. if res, err = s.gorm.Nets(c, miss); err != nil {
  345. log.Error("netArr AppendRaw error(%+v) miss(%+v)", err, miss)
  346. return
  347. }
  348. n.dest = append(n.dest, res...)
  349. missCache = map[string]string{}
  350. for _, item := range res {
  351. if bs, err = json.Marshal(item); err != nil {
  352. log.Error("netArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  353. continue
  354. }
  355. v := string(bs)
  356. for _, k := range n.GetKey(item.ID, kopt...) {
  357. missCache[k] = v
  358. }
  359. }
  360. return
  361. }
  362. func (s *Service) bizNetRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
  363. rela, err = s.gorm.NetIDByBusiness(c, miss)
  364. return
  365. }
  366. func (s *Service) bizNetKey(id int64, opt ...interface{}) []string {
  367. return []string{fmt.Sprintf("business_net:%d", id)}
  368. }
  369. func (s *Service) netIDByBusiness(c context.Context, business int64) (res []int64, err error) {
  370. aggr := &AggrCacheDest{
  371. GetKey: s.bizNetKey,
  372. AggrRelaRaw: s.bizNetRelation,
  373. }
  374. if err = s.aggregateRelationCache(c, []int64{business}, aggr, nil); err != nil {
  375. return
  376. }
  377. res = aggr.Dest[business]
  378. return
  379. }
  380. func (s *Service) netByID(c context.Context, id int64) (res *net.Net, err error) {
  381. dest := &netArr{}
  382. if err = s.objCache(c, []int64{id}, dest, nil); err != nil {
  383. return
  384. }
  385. if len(dest.dest) == 0 {
  386. return
  387. }
  388. res = dest.dest[0]
  389. return
  390. }
  391. func (s *Service) delNetCache(c context.Context, n *net.Net) (err error) {
  392. objdest := &netArr{}
  393. keys := objdest.GetKey(n.ID)
  394. keys = append(keys, s.bizNetKey(n.BusinessID)...)
  395. err = s.delCache(c, keys)
  396. return
  397. }
  398. //tokenArr 根据id查询对象
  399. type tokenArr struct {
  400. dest []*net.Token
  401. }
  402. func (n *tokenArr) GetKey(id int64, opt ...interface{}) []string {
  403. return []string{fmt.Sprintf("token:%d", id)}
  404. }
  405. func (n *tokenArr) AppendString(id int64, v string) (ok bool, err error) {
  406. one := &net.Token{}
  407. if err = json.Unmarshal([]byte(v), one); err != nil {
  408. log.Error("tokenArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  409. return
  410. }
  411. ok = true
  412. n.dest = append(n.dest, one)
  413. return
  414. }
  415. func (n *tokenArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  416. var (
  417. res []*net.Token
  418. bs []byte
  419. )
  420. if res, err = s.gorm.Tokens(c, miss); err != nil {
  421. log.Error("tokenArr AppendRaw error(%+v) miss(%+v)", err, miss)
  422. return
  423. }
  424. n.dest = append(n.dest, res...)
  425. missCache = map[string]string{}
  426. for _, item := range res {
  427. if bs, err = json.Marshal(item); err != nil {
  428. log.Error("tokenArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  429. continue
  430. }
  431. v := string(bs)
  432. for _, k := range n.GetKey(item.ID, kopt...) {
  433. missCache[k] = v
  434. }
  435. }
  436. return
  437. }
  438. func (s *Service) tokens(c context.Context, ids []int64) (res []*net.Token, err error) {
  439. objdest := &tokenArr{}
  440. if err = s.objCache(c, ids, objdest, nil); err != nil {
  441. return
  442. }
  443. res = objdest.dest
  444. return
  445. }
  446. //token_bind的聚合字段element_id
  447. type bindArr struct {
  448. dest []*net.TokenBind
  449. }
  450. func (n *bindArr) GetKey(id int64, opt ...interface{}) []string {
  451. return []string{fmt.Sprintf("bind:%d", id)}
  452. }
  453. func (n *bindArr) AppendString(id int64, v string) (ok bool, err error) {
  454. one := &net.TokenBind{}
  455. if err = json.Unmarshal([]byte(v), one); err != nil {
  456. log.Error("bindArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  457. return
  458. }
  459. ok = true
  460. n.dest = append(n.dest, one)
  461. return
  462. }
  463. func (n *bindArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  464. var (
  465. res []*net.TokenBind
  466. bs []byte
  467. )
  468. if res, err = s.gorm.TokenBinds(c, miss); err != nil {
  469. log.Error("bindArr AppendRaw error(%+v) miss(%+v)", err, miss)
  470. return
  471. }
  472. n.dest = append(n.dest, res...)
  473. missCache = map[string]string{}
  474. for _, item := range res {
  475. if bs, err = json.Marshal(item); err != nil {
  476. log.Error("bindArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  477. continue
  478. }
  479. v := string(bs)
  480. for _, k := range n.GetKey(item.ID, kopt...) {
  481. missCache[k] = v
  482. }
  483. }
  484. return
  485. }
  486. func (s *Service) elementBindKey(id int64, opt ...interface{}) []string {
  487. return []string{fmt.Sprintf("element_bind:%d", id)}
  488. }
  489. //全部类型的正常状态的绑定
  490. func (s *Service) elementBindAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
  491. var (
  492. res map[int64][]*net.TokenBind
  493. bs []byte
  494. )
  495. objdester := objdest.(*bindArr)
  496. if res, err = s.gorm.TokenBindByElement(c, miss, net.BindTypes, true); err != nil {
  497. return
  498. }
  499. aggrRelation = map[int64][]int64{}
  500. objCache = map[string]string{}
  501. for eid, tbs := range res {
  502. for _, item := range tbs {
  503. if bs, err = json.Marshal(item); err != nil {
  504. return
  505. }
  506. v := string(bs)
  507. for _, k := range objdest.GetKey(item.ID, kopt...) {
  508. objCache[k] = v
  509. }
  510. objdester.dest = append(objdester.dest, item)
  511. aggrRelation[eid] = append(aggrRelation[eid], item.ID)
  512. }
  513. }
  514. return
  515. }
  516. func (s *Service) tokenBindByElement(c context.Context, eid []int64, tp []int8) (res []*net.TokenBind, err error) {
  517. objdest := &bindArr{}
  518. w := &CacheWrap{
  519. ObjDest: objdest,
  520. AggrObjRaw: s.elementBindAppendRaw,
  521. AggregateDest: &AggrCacheDest{
  522. GetKey: s.elementBindKey,
  523. },
  524. }
  525. //获取eid对应的所有类型的bind对象
  526. if err = s.aggregateCache(c, eid, w, nil, nil, nil, nil); err != nil {
  527. return
  528. }
  529. res = []*net.TokenBind{}
  530. for _, item := range objdest.dest {
  531. included := false
  532. for _, bindtp := range tp {
  533. if bindtp == item.Type {
  534. included = true
  535. break
  536. }
  537. }
  538. if included {
  539. res = append(res, item)
  540. }
  541. }
  542. return
  543. }
  544. func (s *Service) tokenBinds(c context.Context, ids []int64, onlyAvailable bool) (res []*net.TokenBind, err error) {
  545. objdest := &bindArr{}
  546. if err = s.objCache(c, ids, objdest, nil); err != nil {
  547. return
  548. }
  549. if !onlyAvailable {
  550. res = objdest.dest
  551. return
  552. }
  553. for _, item := range objdest.dest {
  554. if item.IsAvailable() {
  555. res = append(res, item)
  556. }
  557. }
  558. return
  559. }
  560. //flow的聚合字段是net_id
  561. type flowArr struct {
  562. dest []*net.Flow
  563. }
  564. func (n *flowArr) GetKey(id int64, opt ...interface{}) []string {
  565. return []string{fmt.Sprintf("flow:%d", id)}
  566. }
  567. func (n *flowArr) AppendString(id int64, v string) (ok bool, err error) {
  568. one := &net.Flow{}
  569. if err = json.Unmarshal([]byte(v), one); err != nil {
  570. log.Error("flowArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  571. return
  572. }
  573. ok = true
  574. n.dest = append(n.dest, one)
  575. return
  576. }
  577. func (n *flowArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  578. var (
  579. res []*net.Flow
  580. bs []byte
  581. )
  582. if res, err = s.gorm.Flows(c, miss); err != nil {
  583. log.Error("flowArr AppendRaw error(%+v) miss(%+v)", err, miss)
  584. return
  585. }
  586. n.dest = append(n.dest, res...)
  587. missCache = map[string]string{}
  588. for _, item := range res {
  589. if bs, err = json.Marshal(item); err != nil {
  590. log.Error("flowArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  591. continue
  592. }
  593. v := string(bs)
  594. for _, k := range n.GetKey(item.ID, kopt...) {
  595. missCache[k] = v
  596. }
  597. }
  598. return
  599. }
  600. func (s *Service) netFlowKey(id int64, opt ...interface{}) []string {
  601. return []string{fmt.Sprintf("net_flow:%d", id)}
  602. }
  603. func (s *Service) netFlowAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
  604. var (
  605. res []*net.Flow
  606. bs []byte
  607. )
  608. objdester := objdest.(*flowArr)
  609. if res, err = s.gorm.FlowsByNet(c, miss); err != nil {
  610. return
  611. }
  612. aggrRelation = map[int64][]int64{}
  613. objCache = map[string]string{}
  614. for _, item := range res {
  615. if bs, err = json.Marshal(item); err != nil {
  616. return
  617. }
  618. v := string(bs)
  619. for _, k := range objdest.GetKey(item.ID, kopt...) {
  620. objCache[k] = v
  621. }
  622. objdester.dest = append(objdester.dest, item)
  623. aggrRelation[item.NetID] = append(aggrRelation[item.NetID], item.ID)
  624. }
  625. return
  626. }
  627. func (s *Service) netFlowRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
  628. rela, err = s.gorm.FlowIDByNet(c, miss)
  629. return
  630. }
  631. func (s *Service) flowIDByNet(c context.Context, nid int64) (res []int64, err error) {
  632. dest := &AggrCacheDest{
  633. GetKey: s.netFlowKey,
  634. AggrRelaRaw: s.netFlowRelation,
  635. }
  636. if err = s.aggregateRelationCache(c, []int64{nid}, dest, nil); err != nil {
  637. return
  638. }
  639. res = dest.Dest[nid]
  640. return
  641. }
  642. func (s *Service) flowsByNet(c context.Context, nid int64) (res []*net.Flow, err error) {
  643. objdest := &flowArr{}
  644. w := &CacheWrap{
  645. ObjDest: objdest,
  646. AggrObjRaw: s.netFlowAppendRaw,
  647. AggregateDest: &AggrCacheDest{
  648. GetKey: s.netFlowKey,
  649. },
  650. }
  651. if err = s.aggregateCache(c, []int64{nid}, w, nil, nil, nil, nil); err != nil {
  652. return
  653. }
  654. res = objdest.dest
  655. return
  656. }
  657. func (s *Service) flows(c context.Context, ids []int64, onlyAvailable bool) (list []*net.Flow, err error) {
  658. objdest := &flowArr{}
  659. if err = s.objCache(c, ids, objdest, nil); err != nil {
  660. return
  661. }
  662. if !onlyAvailable {
  663. list = objdest.dest
  664. return
  665. }
  666. for _, item := range objdest.dest {
  667. if item.IsAvailable() {
  668. list = append(list, item)
  669. }
  670. }
  671. return
  672. }
  673. func (s *Service) flowByID(c context.Context, id int64) (f *net.Flow, err error) {
  674. var list []*net.Flow
  675. if list, err = s.flows(c, []int64{id}, false); err != nil {
  676. return
  677. }
  678. if len(list) == 0 {
  679. return
  680. }
  681. f = list[0]
  682. return
  683. }
  684. func (s *Service) delFlowCache(c context.Context, f *net.Flow, changedBind []int64) (err error) {
  685. objdest := &flowArr{}
  686. keys := objdest.GetKey(f.ID)
  687. keys = append(keys, s.netFlowKey(f.NetID)...)
  688. keys = append(keys, s.elementBindKey(f.ID)...)
  689. binds := &bindArr{}
  690. for _, bind := range changedBind {
  691. keys = append(keys, binds.GetKey(bind)...)
  692. }
  693. err = s.delCache(c, keys)
  694. return
  695. }
  696. //transition的聚合字段是net_id
  697. type tranArr struct {
  698. dest []*net.Transition
  699. }
  700. func (n *tranArr) GetKey(id int64, opt ...interface{}) []string {
  701. return []string{fmt.Sprintf("transition:%d", id)}
  702. }
  703. func (n *tranArr) AppendString(id int64, v string) (ok bool, err error) {
  704. one := &net.Transition{}
  705. if err = json.Unmarshal([]byte(v), one); err != nil {
  706. log.Error("tranArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  707. return
  708. }
  709. ok = true
  710. n.dest = append(n.dest, one)
  711. return
  712. }
  713. func (n *tranArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  714. var (
  715. res []*net.Transition
  716. bs []byte
  717. )
  718. if res, err = s.gorm.Transitions(c, miss); err != nil {
  719. log.Error("tranArr AppendRaw error(%+v) miss(%+v)", err, miss)
  720. return
  721. }
  722. n.dest = append(n.dest, res...)
  723. missCache = map[string]string{}
  724. for _, item := range res {
  725. if bs, err = json.Marshal(item); err != nil {
  726. log.Error("tranArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  727. continue
  728. }
  729. v := string(bs)
  730. for _, k := range n.GetKey(item.ID, kopt...) {
  731. missCache[k] = v
  732. }
  733. }
  734. return
  735. }
  736. func (s *Service) netTranKey(id int64, opt ...interface{}) []string {
  737. return []string{fmt.Sprintf("net_transition_dispatch%v_avail%v:%d", opt[0].(bool), opt[1].(bool), id)}
  738. }
  739. func (s *Service) netTranRelation(c context.Context, miss []int64, kopt []interface{}, opt ...interface{}) (rela map[int64][]int64, err error) {
  740. rela, err = s.gorm.TransitionIDByNet(c, miss, opt[0].(bool), opt[1].(bool))
  741. return
  742. }
  743. func (s *Service) tranIDByNet(c context.Context, nid []int64, onlyDispatch bool, onlyAvailable bool) (res []int64, err error) {
  744. dest := &AggrCacheDest{
  745. GetKey: s.netTranKey,
  746. AggrRelaRaw: s.netTranRelation,
  747. }
  748. if err = s.aggregateRelationCache(c, nid, dest, []interface{}{onlyDispatch, onlyAvailable}, onlyDispatch, onlyAvailable); err != nil {
  749. return
  750. }
  751. for _, ids := range dest.Dest {
  752. res = append(res, ids...)
  753. }
  754. return
  755. }
  756. func (s *Service) transitions(c context.Context, ids []int64, onlyAvailable bool) (list []*net.Transition, err error) {
  757. objdest := &tranArr{}
  758. if err = s.objCache(c, ids, objdest, nil); err != nil {
  759. return
  760. }
  761. if !onlyAvailable {
  762. list = objdest.dest
  763. return
  764. }
  765. for _, item := range objdest.dest {
  766. if item.IsAvailable() {
  767. list = append(list, item)
  768. }
  769. }
  770. return
  771. }
  772. func (s *Service) transitionByID(c context.Context, id int64) (res *net.Transition, err error) {
  773. var list []*net.Transition
  774. if list, err = s.transitions(c, []int64{id}, false); err != nil {
  775. return
  776. }
  777. if len(list) == 0 {
  778. return
  779. }
  780. res = list[0]
  781. return
  782. }
  783. func (s *Service) delTranCache(c context.Context, f *net.Transition, changedBind []int64) (err error) {
  784. objdest := &tranArr{}
  785. keys := objdest.GetKey(f.ID)
  786. keys = append(keys, s.netTranKey(f.NetID, true, true)...)
  787. keys = append(keys, s.netTranKey(f.NetID, true, false)...)
  788. keys = append(keys, s.netTranKey(f.NetID, false, true)...)
  789. keys = append(keys, s.netTranKey(f.NetID, false, false)...)
  790. keys = append(keys, s.elementBindKey(f.ID)...)
  791. binds := &bindArr{}
  792. for _, bind := range changedBind {
  793. keys = append(keys, binds.GetKey(bind)...)
  794. }
  795. err = s.delCache(c, keys)
  796. return
  797. }
  798. //direction的聚合字段是flow_id/transition_id
  799. type dirArr struct {
  800. dest []*net.Direction
  801. }
  802. func (n *dirArr) GetKey(id int64, opt ...interface{}) []string {
  803. return []string{fmt.Sprintf("direction:%d", id)}
  804. }
  805. func (n *dirArr) AppendString(id int64, v string) (ok bool, err error) {
  806. one := &net.Direction{}
  807. if err = json.Unmarshal([]byte(v), one); err != nil {
  808. log.Error("dirArr AppendString json.Unmarshal error(%v) v(%s)", err, v)
  809. return
  810. }
  811. ok = true
  812. n.dest = append(n.dest, one)
  813. return
  814. }
  815. func (n *dirArr) AppendRaw(c context.Context, s *Service, miss []int64, kopt []interface{}, opt ...interface{}) (missCache map[string]string, err error) {
  816. var (
  817. res []*net.Direction
  818. bs []byte
  819. )
  820. if res, err = s.gorm.Directions(c, miss); err != nil {
  821. log.Error("dirArr AppendRaw error(%+v) miss(%+v)", err, miss)
  822. return
  823. }
  824. n.dest = append(n.dest, res...)
  825. missCache = map[string]string{}
  826. for _, item := range res {
  827. if bs, err = json.Marshal(item); err != nil {
  828. log.Error("dirArr AppendRaw json.Marshal error(%v) item(%+v)", err, item)
  829. continue
  830. }
  831. v := string(bs)
  832. for _, k := range n.GetKey(item.ID, kopt...) {
  833. missCache[k] = v
  834. }
  835. }
  836. return
  837. }
  838. func (s *Service) flowDirKey(id int64, opt ...interface{}) []string {
  839. di := opt[0].([]int8)
  840. res := make([]string, len(di))
  841. for i, d := range di {
  842. res[i] = fmt.Sprintf("flow_direction_%d:%d", d, id)
  843. }
  844. return res
  845. }
  846. func (s *Service) tranDirKey(id int64, opt ...interface{}) []string {
  847. di := opt[0].([]int8)
  848. avail := opt[1].(bool)
  849. res := make([]string, len(di))
  850. for i, d := range di {
  851. res[i] = fmt.Sprintf("transition_direction_avail%v_%d:%d", avail, d, id)
  852. }
  853. return res
  854. }
  855. func (s *Service) flowDirAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
  856. var (
  857. res []*net.Direction
  858. bs []byte
  859. )
  860. objdester := objdest.(*dirArr)
  861. if res, err = s.gorm.DirectionByFlowID(c, miss, opt[0].(int8)); err != nil {
  862. return
  863. }
  864. aggrRelation = map[int64][]int64{}
  865. objCache = map[string]string{}
  866. for _, item := range res {
  867. if bs, err = json.Marshal(item); err != nil {
  868. return
  869. }
  870. v := string(bs)
  871. for _, k := range objdest.GetKey(item.ID, kopt...) {
  872. objCache[k] = v
  873. }
  874. objdester.dest = append(objdester.dest, item)
  875. aggrRelation[item.FlowID] = append(aggrRelation[item.FlowID], item.ID)
  876. }
  877. return
  878. }
  879. func (s *Service) tranDirAppendRaw(c context.Context, miss []int64, objdest ICacheObjDest, kopt []interface{}, opt ...interface{}) (objCache map[string]string, aggrRelation map[int64][]int64, err error) {
  880. var (
  881. res []*net.Direction
  882. bs []byte
  883. )
  884. objdester := objdest.(*dirArr)
  885. if res, err = s.gorm.DirectionByTransitionID(c, miss, opt[0].(int8), opt[1].(bool)); err != nil {
  886. return
  887. }
  888. aggrRelation = map[int64][]int64{}
  889. objCache = map[string]string{}
  890. for _, item := range res {
  891. if bs, err = json.Marshal(item); err != nil {
  892. return
  893. }
  894. v := string(bs)
  895. for _, k := range objdest.GetKey(item.ID, kopt...) {
  896. objCache[k] = v
  897. }
  898. objdester.dest = append(objdester.dest, item)
  899. aggrRelation[item.TransitionID] = append(aggrRelation[item.TransitionID], item.ID)
  900. }
  901. return
  902. }
  903. func (s *Service) dirByFlow(c context.Context, nid []int64, dir int8) (res []*net.Direction, err error) {
  904. objdest := &dirArr{}
  905. w := &CacheWrap{
  906. ObjDest: objdest,
  907. AggrObjRaw: s.flowDirAppendRaw,
  908. AggregateDest: &AggrCacheDest{
  909. GetKey: s.flowDirKey,
  910. },
  911. }
  912. if err = s.aggregateCache(c, nid, w, []interface{}{[]int8{dir}}, nil, []interface{}{dir}, nil); err != nil {
  913. return
  914. }
  915. res = objdest.dest
  916. return
  917. }
  918. func (s *Service) dirByTran(c context.Context, nid []int64, dir int8, onlyAvailable bool) (res []*net.Direction, err error) {
  919. objdest := &dirArr{}
  920. w := &CacheWrap{
  921. ObjDest: objdest,
  922. AggrObjRaw: s.tranDirAppendRaw,
  923. AggregateDest: &AggrCacheDest{
  924. GetKey: s.tranDirKey,
  925. },
  926. }
  927. if err = s.aggregateCache(c, nid, w, []interface{}{[]int8{dir}, onlyAvailable}, nil, []interface{}{dir, onlyAvailable}, nil); err != nil {
  928. return
  929. }
  930. res = objdest.dest
  931. return
  932. }
  933. func (s *Service) directions(c context.Context, ids []int64) (list []*net.Direction, err error) {
  934. objdest := &dirArr{}
  935. if err = s.objCache(c, ids, objdest, nil); err != nil {
  936. return
  937. }
  938. list = objdest.dest
  939. return
  940. }
  941. func (s *Service) delDirCache(c context.Context, dir *net.Direction) (err error) {
  942. objdest := &dirArr{}
  943. keys := objdest.GetKey(dir.ID)
  944. keys = append(keys, s.flowDirKey(dir.FlowID, []int8{net.DirInput, net.DirOutput})...)
  945. keys = append(keys, s.tranDirKey(dir.TransitionID, []int8{net.DirInput, net.DirOutput}, true)...)
  946. keys = append(keys, s.tranDirKey(dir.TransitionID, []int8{net.DirInput, net.DirOutput}, false)...)
  947. err = s.delCache(c, keys)
  948. return
  949. }