dao.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "go-common/app/service/main/riot-search/conf"
  8. "go-common/app/service/main/riot-search/model"
  9. "go-common/library/database/sql"
  10. "go-common/library/log"
  11. "github.com/go-ego/riot"
  12. "github.com/go-ego/riot/types"
  13. )
  14. // 过审的增量数据
  15. var _selIncrement = "SELECT id, title from archive where mtime>? and mtime<=?"
  16. // Dao dao
  17. type Dao struct {
  18. c *conf.Config
  19. searcher *riot.Engine
  20. db *sql.DB
  21. }
  22. // New init mysql db
  23. func New(c *conf.Config) (dao *Dao) {
  24. validateConfig(c.Riot)
  25. if c.UT {
  26. dao = &Dao{
  27. c: c,
  28. searcher: &riot.Engine{},
  29. db: sql.NewMySQL(c.Mysql),
  30. }
  31. dao.searcher.Init(types.EngineOpts{})
  32. return
  33. }
  34. dao = &Dao{
  35. c: c,
  36. searcher: &riot.Engine{},
  37. // db
  38. db: sql.NewMySQL(c.Mysql),
  39. }
  40. dao.searcher.Init(types.EngineOpts{
  41. GseDict: c.Riot.Dict,
  42. StopTokenFile: c.Riot.StopToken,
  43. NumShards: c.Riot.NumShards,
  44. IndexerOpts: &types.IndexerOpts{
  45. IndexType: types.FrequenciesIndex,
  46. DocCacheSize: 5000,
  47. },
  48. })
  49. return
  50. }
  51. func validateConfig(conf *conf.RiotConfig) {
  52. if conf.Dict == "" || conf.StopToken == "" {
  53. panic("must provide a dict and stop_token file")
  54. }
  55. if conf.FlushTime <= 0 {
  56. panic("flush time must larger than 0")
  57. }
  58. }
  59. // Close close the resource.
  60. func (d *Dao) Close() {
  61. d.db.Close()
  62. }
  63. // Ping dao ping
  64. func (d *Dao) Ping(c context.Context) error {
  65. return d.db.Ping(c)
  66. }
  67. // IncrementBackup select mtime>now-24h data
  68. func (d *Dao) IncrementBackup(c context.Context, stime, etime time.Time) (docs []*model.Document, err error) {
  69. var states []int
  70. for k := range model.PubStates.LegalStates {
  71. states = append(states, k)
  72. }
  73. query := _selIncrement + " and state in (" + strings.Trim(strings.Join(strings.Split(fmt.Sprint(states), " "), ","), "[]") + ")" + " order by id asc"
  74. rows, err := d.db.Query(c, query, stime, etime)
  75. log.Info("exec query(%s) args(stime:%v, etime:%v)", query, stime, etime)
  76. if err != nil {
  77. return
  78. }
  79. defer rows.Close()
  80. for rows.Next() {
  81. doc := &model.Document{}
  82. if err = rows.Scan(&doc.ID, &doc.Content); err != nil {
  83. return
  84. }
  85. docs = append(docs, doc)
  86. }
  87. err = rows.Err()
  88. return
  89. }