123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package dao
- import (
- "context"
- "fmt"
- "strings"
- "time"
- "go-common/app/service/main/riot-search/conf"
- "go-common/app/service/main/riot-search/model"
- "go-common/library/database/sql"
- "go-common/library/log"
- "github.com/go-ego/riot"
- "github.com/go-ego/riot/types"
- )
- // 过审的增量数据
- var _selIncrement = "SELECT id, title from archive where mtime>? and mtime<=?"
- // Dao dao
- type Dao struct {
- c *conf.Config
- searcher *riot.Engine
- db *sql.DB
- }
- // New init mysql db
- func New(c *conf.Config) (dao *Dao) {
- validateConfig(c.Riot)
- if c.UT {
- dao = &Dao{
- c: c,
- searcher: &riot.Engine{},
- db: sql.NewMySQL(c.Mysql),
- }
- dao.searcher.Init(types.EngineOpts{})
- return
- }
- dao = &Dao{
- c: c,
- searcher: &riot.Engine{},
- // db
- db: sql.NewMySQL(c.Mysql),
- }
- dao.searcher.Init(types.EngineOpts{
- GseDict: c.Riot.Dict,
- StopTokenFile: c.Riot.StopToken,
- NumShards: c.Riot.NumShards,
- IndexerOpts: &types.IndexerOpts{
- IndexType: types.FrequenciesIndex,
- DocCacheSize: 5000,
- },
- })
- return
- }
- func validateConfig(conf *conf.RiotConfig) {
- if conf.Dict == "" || conf.StopToken == "" {
- panic("must provide a dict and stop_token file")
- }
- if conf.FlushTime <= 0 {
- panic("flush time must larger than 0")
- }
- }
- // Close close the resource.
- func (d *Dao) Close() {
- d.db.Close()
- }
- // Ping dao ping
- func (d *Dao) Ping(c context.Context) error {
- return d.db.Ping(c)
- }
- // IncrementBackup select mtime>now-24h data
- func (d *Dao) IncrementBackup(c context.Context, stime, etime time.Time) (docs []*model.Document, err error) {
- var states []int
- for k := range model.PubStates.LegalStates {
- states = append(states, k)
- }
- query := _selIncrement + " and state in (" + strings.Trim(strings.Join(strings.Split(fmt.Sprint(states), " "), ","), "[]") + ")" + " order by id asc"
- rows, err := d.db.Query(c, query, stime, etime)
- log.Info("exec query(%s) args(stime:%v, etime:%v)", query, stime, etime)
- if err != nil {
- return
- }
- defer rows.Close()
- for rows.Next() {
- doc := &model.Document{}
- if err = rows.Scan(&doc.ID, &doc.Content); err != nil {
- return
- }
- docs = append(docs, doc)
- }
- err = rows.Err()
- return
- }
|