mysql.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "go-common/app/infra/notify/model"
  6. )
  7. const (
  8. _loadNotify = "SELECT n.id,topic.cluster,topic.topic,a.group,n.callback,n.concurrent,n.filter,n.mtime FROM notify AS n LEFT JOIN auth2 as a ON a.id=n.gid LEFT JOIN topic ON a.topic_id=topic.id WHERE n.state=1 AND n.zone=?"
  9. _loadPub = `SELECT a.group,topic.cluster,topic.topic,a.operation,app2.app_secret FROM auth2 AS a LEFT JOIN app2 ON app2.id=a.app_id LEFT JOIN topic ON topic.id=a.topic_id`
  10. _selFilter = "SELECT `filters` FROM filters WHERE nid=?"
  11. _addFailBk = "INSERT INTO fail_backup(topic,`group`,`cluster`,msg,`index`) VALUE (?,?,?,?,?)"
  12. _delFailBk = "DELETE FROM fail_backup where id=?"
  13. _loadFailBk = "SELECT id,topic,`group`,`cluster`,`msg`,`offset`,`index` FROM fail_backup"
  14. )
  15. // LoadNotify load all notify config.
  16. func (d *Dao) LoadNotify(c context.Context, zone string) (ns []*model.Watcher, err error) {
  17. rows, err := d.db.Query(c, _loadNotify, zone)
  18. if err != nil {
  19. return
  20. }
  21. defer rows.Close()
  22. for rows.Next() {
  23. n := new(model.Watcher)
  24. if err = rows.Scan(&n.ID, &n.Cluster, &n.Topic, &n.Group, &n.Callback, &n.Concurrent, &n.Filter, &n.Mtime); err != nil {
  25. return
  26. }
  27. ns = append(ns, n)
  28. }
  29. return
  30. }
  31. // LoadPub load all pub config.
  32. func (d *Dao) LoadPub(c context.Context) (ps []*model.Pub, err error) {
  33. rows, err := d.db.Query(c, _loadPub)
  34. if err != nil {
  35. return
  36. }
  37. defer rows.Close()
  38. for rows.Next() {
  39. n := new(model.Pub)
  40. if err = rows.Scan(&n.Group, &n.Cluster, &n.Topic, &n.Operation, &n.AppSecret); err != nil {
  41. return
  42. }
  43. ps = append(ps, n)
  44. }
  45. return
  46. }
  47. // Filters get filter condition.
  48. func (d *Dao) Filters(c context.Context, id int64) (fs []*model.Filter, err error) {
  49. rows := d.db.QueryRow(c, _selFilter, id)
  50. if err != nil {
  51. return
  52. }
  53. var filters string
  54. if err = rows.Scan(&filters); err != nil {
  55. return
  56. }
  57. err = json.Unmarshal([]byte(filters), &fs)
  58. return
  59. }
  60. // AddFailBk add fail msg to fail backup.
  61. func (d *Dao) AddFailBk(c context.Context, topic, group, cluster, msg string, index int64) (id int64, err error) {
  62. res, err := d.db.Exec(c, _addFailBk, topic, group, cluster, msg, index)
  63. if err != nil {
  64. return
  65. }
  66. return res.LastInsertId()
  67. }
  68. // DelFailBk del msg from fail backup.
  69. func (d *Dao) DelFailBk(c context.Context, id int64) (affected int64, err error) {
  70. res, err := d.db.Exec(c, _delFailBk, id)
  71. if err != nil {
  72. return
  73. }
  74. return res.RowsAffected()
  75. }
  76. // LoadFailBk load all fail backup msg.
  77. func (d *Dao) LoadFailBk(c context.Context) (fbs []*model.FailBackup, err error) {
  78. rows, err := d.db.Query(c, _loadFailBk)
  79. if err != nil {
  80. return
  81. }
  82. defer rows.Close()
  83. for rows.Next() {
  84. fb := new(model.FailBackup)
  85. if err = rows.Scan(&fb.ID, &fb.Topic, &fb.Group, &fb.Cluster, &fb.Msg, &fb.Offset, &fb.Index); err != nil {
  86. return
  87. }
  88. fbs = append(fbs, fb)
  89. }
  90. return
  91. }