redis.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "go-common/app/tool/saga/model"
  7. "go-common/library/cache/redis"
  8. "github.com/pkg/errors"
  9. )
  10. func mergeTaskKey(taskType int) string {
  11. return fmt.Sprintf("saga_task_%d", taskType)
  12. }
  13. func mrIIDKey(mrIID int) string {
  14. return fmt.Sprintf("saga_mrIID_%d", mrIID)
  15. }
  16. func (d *Dao) pingRedis(c context.Context) (err error) {
  17. conn := d.redis.Get(c)
  18. _, err = conn.Do("SET", "PING", "PONG")
  19. conn.Close()
  20. return
  21. }
  22. // AddMRIID ...
  23. func (d *Dao) AddMRIID(c context.Context, mrIID int, expire int) (err error) {
  24. var (
  25. key = mrIIDKey(mrIID)
  26. conn = d.redis.Get(c)
  27. )
  28. defer conn.Close()
  29. if err = conn.Send("SET", key, mrIID); err != nil {
  30. return
  31. }
  32. if err = conn.Flush(); err != nil {
  33. return
  34. }
  35. if _, err = conn.Receive(); err != nil {
  36. return
  37. }
  38. if _, err = conn.Do("EXPIRE", key, expire); err != nil {
  39. return
  40. }
  41. return
  42. }
  43. // ExistMRIID ...
  44. func (d *Dao) ExistMRIID(c context.Context, mrIID int) (ok bool, err error) {
  45. var (
  46. key = mrIIDKey(mrIID)
  47. conn = d.redis.Get(c)
  48. )
  49. defer conn.Close()
  50. if _, err = redis.Int(conn.Do("GET", key)); err != nil {
  51. if err == redis.ErrNil {
  52. err = nil
  53. }
  54. return false, err
  55. }
  56. return true, nil
  57. }
  58. // DeleteMRIID ...
  59. func (d *Dao) DeleteMRIID(c context.Context, mrIID int) (err error) {
  60. var (
  61. key = mrIIDKey(mrIID)
  62. conn = d.redis.Get(c)
  63. )
  64. defer conn.Close()
  65. if err = conn.Send("DEL", key); err != nil {
  66. return
  67. }
  68. if err = conn.Flush(); err != nil {
  69. return
  70. }
  71. if _, err = conn.Receive(); err != nil {
  72. return
  73. }
  74. return
  75. }
  76. // PushMergeTask ...
  77. func (d *Dao) PushMergeTask(c context.Context, taskType int, taskInfo *model.TaskInfo) (err error) {
  78. var (
  79. key = mergeTaskKey(taskType)
  80. conn = d.redis.Get(c)
  81. bs []byte
  82. )
  83. defer conn.Close()
  84. if bs, err = json.Marshal(taskInfo); err != nil {
  85. err = errors.WithStack(err)
  86. return
  87. }
  88. if err = conn.Send("LPUSH", key, bs); err != nil {
  89. return
  90. }
  91. if err = conn.Flush(); err != nil {
  92. return
  93. }
  94. if _, err = conn.Receive(); err != nil {
  95. return
  96. }
  97. return
  98. }
  99. // DeleteMergeTask ...
  100. func (d *Dao) DeleteMergeTask(c context.Context, taskType int, taskInfo *model.TaskInfo) (err error) {
  101. var (
  102. key = mergeTaskKey(taskType)
  103. conn = d.redis.Get(c)
  104. bs []byte
  105. )
  106. defer conn.Close()
  107. if bs, err = json.Marshal(taskInfo); err != nil {
  108. err = errors.WithStack(err)
  109. return
  110. }
  111. if err = conn.Send("LREM", key, 0, bs); err != nil {
  112. return
  113. }
  114. if err = conn.Flush(); err != nil {
  115. return
  116. }
  117. if _, err = conn.Receive(); err != nil {
  118. return
  119. }
  120. return
  121. }
  122. // MergeTasks ...
  123. func (d *Dao) MergeTasks(c context.Context, taskType int) (count int, taskInfos []*model.TaskInfo, err error) {
  124. var (
  125. key = mergeTaskKey(taskType)
  126. values [][]byte
  127. conn = d.redis.Get(c)
  128. )
  129. defer conn.Close()
  130. if count, err = redis.Int(conn.Do("LLEN", key)); err != nil {
  131. return
  132. }
  133. if values, err = redis.ByteSlices(conn.Do("LRANGE", key, 0, -1)); err != nil {
  134. if err == redis.ErrNil {
  135. err = nil
  136. }
  137. return
  138. }
  139. taskInfos = make([]*model.TaskInfo, 0, count)
  140. for _, value := range values {
  141. taskInfo := &model.TaskInfo{}
  142. if err = json.Unmarshal(value, &taskInfo); err != nil {
  143. err = errors.WithStack(err)
  144. return
  145. }
  146. taskInfos = append(taskInfos, taskInfo)
  147. //taskInfos = append([]*model.TaskInfo{taskInfo}, taskInfos...)
  148. }
  149. return
  150. }
  151. func mergeInfoKey(projID int, branch string) string {
  152. return fmt.Sprintf("saga_mergeInfo_%d_%s", projID, branch)
  153. }
  154. func pathOwnerKey(projID int, branch string, path string) string {
  155. return fmt.Sprintf("saga_PathOwner_%d_%s_%s", projID, branch, path)
  156. }
  157. func pathReviewerKey(projID int, branch string, path string) string {
  158. return fmt.Sprintf("saga_PathReviewer_%d_%s_%s", projID, branch, path)
  159. }
  160. func authInfoKey(projID int, mrIID int) string {
  161. return fmt.Sprintf("saga_auth_%d_%d", projID, mrIID)
  162. }
  163. // SetMergeInfo ...
  164. func (d *Dao) SetMergeInfo(c context.Context, projID int, branch string, mergeInfo *model.MergeInfo) (err error) {
  165. var (
  166. key = mergeInfoKey(projID, branch)
  167. conn = d.redis.Get(c)
  168. bs []byte
  169. )
  170. defer conn.Close()
  171. if bs, err = json.Marshal(mergeInfo); err != nil {
  172. err = errors.WithStack(err)
  173. return
  174. }
  175. if err = conn.Send("SET", key, bs); err != nil {
  176. return
  177. }
  178. if err = conn.Flush(); err != nil {
  179. return
  180. }
  181. if _, err = conn.Receive(); err != nil {
  182. return
  183. }
  184. return
  185. }
  186. // MergeInfo ...
  187. func (d *Dao) MergeInfo(c context.Context, projID int, branch string) (ok bool, mergeInfo *model.MergeInfo, err error) {
  188. var (
  189. key = mergeInfoKey(projID, branch)
  190. value []byte
  191. conn = d.redis.Get(c)
  192. )
  193. defer conn.Close()
  194. if value, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  195. if err == redis.ErrNil {
  196. err = nil
  197. }
  198. return
  199. }
  200. mergeInfo = &model.MergeInfo{}
  201. if err = json.Unmarshal(value, &mergeInfo); err != nil {
  202. err = errors.WithStack(err)
  203. return
  204. }
  205. ok = true
  206. return
  207. }
  208. // DeleteMergeInfo ...
  209. func (d *Dao) DeleteMergeInfo(c context.Context, projID int, branch string) (err error) {
  210. var (
  211. key = mergeInfoKey(projID, branch)
  212. conn = d.redis.Get(c)
  213. )
  214. defer conn.Close()
  215. if err = conn.Send("DEL", key); err != nil {
  216. return
  217. }
  218. if err = conn.Flush(); err != nil {
  219. return
  220. }
  221. if _, err = conn.Receive(); err != nil {
  222. return
  223. }
  224. return
  225. }
  226. // SetPathOwner ...
  227. func (d *Dao) SetPathOwner(c context.Context, projID int, branch string, path string, owners []string) (err error) {
  228. var (
  229. key = pathOwnerKey(projID, branch, path)
  230. conn = d.redis.Get(c)
  231. bs []byte
  232. )
  233. defer conn.Close()
  234. if bs, err = json.Marshal(owners); err != nil {
  235. err = errors.WithStack(err)
  236. return
  237. }
  238. if err = conn.Send("SET", key, bs); err != nil {
  239. return
  240. }
  241. if err = conn.Flush(); err != nil {
  242. return
  243. }
  244. if _, err = conn.Receive(); err != nil {
  245. return
  246. }
  247. return
  248. }
  249. // PathOwner ...
  250. func (d *Dao) PathOwner(c context.Context, projID int, branch string, path string) (owners []string, err error) {
  251. var (
  252. key = pathOwnerKey(projID, branch, path)
  253. conn = d.redis.Get(c)
  254. bs []byte
  255. )
  256. defer conn.Close()
  257. if bs, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  258. if err == redis.ErrNil {
  259. err = nil
  260. }
  261. return
  262. }
  263. if err = json.Unmarshal(bs, &owners); err != nil {
  264. err = errors.WithStack(err)
  265. return
  266. }
  267. return
  268. }
  269. // SetPathReviewer ...
  270. func (d *Dao) SetPathReviewer(c context.Context, projID int, branch string, path string, reviewers []string) (err error) {
  271. var (
  272. key = pathReviewerKey(projID, branch, path)
  273. conn = d.redis.Get(c)
  274. bs []byte
  275. )
  276. defer conn.Close()
  277. if bs, err = json.Marshal(reviewers); err != nil {
  278. return errors.WithStack(err)
  279. }
  280. if err = conn.Send("SET", key, bs); err != nil {
  281. return
  282. }
  283. if err = conn.Flush(); err != nil {
  284. return
  285. }
  286. if _, err = conn.Receive(); err != nil {
  287. return
  288. }
  289. return
  290. }
  291. // PathReviewer ...
  292. func (d *Dao) PathReviewer(c context.Context, projID int, branch string, path string) (reviewers []string, err error) {
  293. var (
  294. key = pathReviewerKey(projID, branch, path)
  295. conn = d.redis.Get(c)
  296. bs []byte
  297. )
  298. defer conn.Close()
  299. if bs, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  300. if err == redis.ErrNil {
  301. err = nil
  302. }
  303. return
  304. }
  305. if err = json.Unmarshal(bs, &reviewers); err != nil {
  306. err = errors.WithStack(err)
  307. return
  308. }
  309. return
  310. }
  311. // pathAuthKey ...
  312. func pathAuthKey(projID int, branch string, path string) string {
  313. return fmt.Sprintf("saga_path_auth_%d_%s_%s", projID, branch, path)
  314. }
  315. // PathAuthR ...
  316. func (d *Dao) PathAuthR(c context.Context, projID int, branch string, path string) (authUsers *model.AuthUsers, err error) {
  317. var (
  318. key = pathAuthKey(projID, branch, path)
  319. conn = d.redis.Get(c)
  320. bs []byte
  321. )
  322. defer conn.Close()
  323. if bs, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  324. if err == redis.ErrNil {
  325. err = nil
  326. }
  327. return
  328. }
  329. authUsers = new(model.AuthUsers)
  330. if err = json.Unmarshal(bs, authUsers); err != nil {
  331. err = errors.WithStack(err)
  332. return
  333. }
  334. return
  335. }
  336. // SetPathAuthR ...
  337. func (d *Dao) SetPathAuthR(c context.Context, projID int, branch string, path string, authUsers *model.AuthUsers) (err error) {
  338. var (
  339. key = pathAuthKey(projID, branch, path)
  340. conn = d.redis.Get(c)
  341. bs []byte
  342. )
  343. defer conn.Close()
  344. if bs, err = json.Marshal(authUsers); err != nil {
  345. return errors.WithStack(err)
  346. }
  347. if err = conn.Send("SET", key, bs); err != nil {
  348. return
  349. }
  350. if err = conn.Flush(); err != nil {
  351. return
  352. }
  353. if _, err = conn.Receive(); err != nil {
  354. return
  355. }
  356. return
  357. }
  358. // DeletePathAuthR ...
  359. func (d *Dao) DeletePathAuthR(c context.Context, projID int, branch string, path string) (err error) {
  360. var (
  361. key = pathAuthKey(projID, branch, path)
  362. conn = d.redis.Get(c)
  363. )
  364. defer conn.Close()
  365. if err = conn.Send("DEL", key); err != nil {
  366. return
  367. }
  368. if err = conn.Flush(); err != nil {
  369. return
  370. }
  371. if _, err = conn.Receive(); err != nil {
  372. return
  373. }
  374. return
  375. }
  376. // SetReportStatus ...
  377. func (d *Dao) SetReportStatus(c context.Context, projID int, mrIID int, result bool) (err error) {
  378. var (
  379. key = authInfoKey(projID, mrIID)
  380. conn = d.redis.Get(c)
  381. bs []byte
  382. )
  383. defer conn.Close()
  384. if bs, err = json.Marshal(result); err != nil {
  385. return errors.WithStack(err)
  386. }
  387. if err = conn.Send("SET", key, bs); err != nil {
  388. return
  389. }
  390. if err = conn.Flush(); err != nil {
  391. return
  392. }
  393. if _, err = conn.Receive(); err != nil {
  394. return
  395. }
  396. return
  397. }
  398. // ReportStatus ...
  399. func (d *Dao) ReportStatus(c context.Context, projID int, mrIID int) (result bool, err error) {
  400. var (
  401. key = authInfoKey(projID, mrIID)
  402. value []byte
  403. conn = d.redis.Get(c)
  404. )
  405. defer conn.Close()
  406. if value, err = redis.Bytes(conn.Do("GET", key)); err != nil {
  407. if err == redis.ErrNil {
  408. err = nil
  409. }
  410. return
  411. }
  412. if err = json.Unmarshal(value, &result); err != nil {
  413. err = errors.WithStack(err)
  414. return
  415. }
  416. return
  417. }
  418. // DeleteReportStatus ...
  419. func (d *Dao) DeleteReportStatus(c context.Context, projID int, mrIID int) (err error) {
  420. var (
  421. key = authInfoKey(projID, mrIID)
  422. conn = d.redis.Get(c)
  423. )
  424. defer conn.Close()
  425. if err = conn.Send("DEL", key); err != nil {
  426. return
  427. }
  428. if err = conn.Flush(); err != nil {
  429. return
  430. }
  431. if _, err = conn.Receive(); err != nil {
  432. return
  433. }
  434. return
  435. }