123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package riot
- import (
- "sync/atomic"
- "github.com/go-ego/riot/types"
- )
- type indexerAddDocReq struct {
- doc *types.DocIndex
- forceUpdate bool
- }
- type indexerLookupReq struct {
- countDocsOnly bool
- tokens []string
- labels []string
- docIds map[uint64]bool
- options types.RankOpts
- rankerReturnChan chan rankerReturnReq
- orderless bool
- logic types.Logic
- }
- type indexerRemoveDocReq struct {
- docId uint64
- forceUpdate bool
- }
- func (engine *Engine) indexerAddDocWorker(shard int) {
- for {
- request := <-engine.indexerAddDocChans[shard]
- engine.indexers[shard].AddDocToCache(request.doc, request.forceUpdate)
- if request.doc != nil {
- atomic.AddUint64(&engine.numTokenIndexAdded,
- uint64(len(request.doc.Keywords)))
- engine.loc.Lock()
- atomic.AddUint64(&engine.numDocsIndexed, 1)
-
- engine.loc.Unlock()
- }
- if request.forceUpdate {
- engine.loc.Lock()
- atomic.AddUint64(&engine.numDocsForceUpdated, 1)
- engine.loc.Unlock()
- }
- }
- }
- func (engine *Engine) indexerRemoveDocWorker(shard int) {
- for {
- request := <-engine.indexerRemoveDocChans[shard]
- engine.indexers[shard].RemoveDocToCache(request.docId, request.forceUpdate)
- if request.docId != 0 {
- engine.loc.Lock()
- atomic.AddUint64(&engine.numDocsRemoved, 1)
- engine.loc.Unlock()
- }
- if request.forceUpdate {
- engine.loc.Lock()
- atomic.AddUint64(&engine.numDocsForceUpdated, 1)
- engine.loc.Unlock()
- }
- }
- }
- func (engine *Engine) orderLess(
- request indexerLookupReq, docs []types.IndexedDoc) {
- if engine.initOptions.IDOnly {
- var outputDocs []types.ScoredID
-
- for _, d := range docs {
- outputDocs = append(outputDocs, types.ScoredID{
- DocId: d.DocId,
- TokenSnippetLocs: d.TokenSnippetLocs,
- TokenLocs: d.TokenLocs})
- }
- request.rankerReturnChan <- rankerReturnReq{
- docs: types.ScoredIDs(outputDocs),
- numDocs: len(outputDocs),
- }
- return
- }
- var outputDocs []types.ScoredDoc
-
- for _, d := range docs {
- outputDocs = append(outputDocs, types.ScoredDoc{
- DocId: d.DocId,
- TokenSnippetLocs: d.TokenSnippetLocs,
- TokenLocs: d.TokenLocs})
- }
- request.rankerReturnChan <- rankerReturnReq{
- docs: types.ScoredDocs(outputDocs),
- numDocs: len(outputDocs),
- }
- }
- func (engine *Engine) indexerLookupWorker(shard int) {
- for {
- request := <-engine.indexerLookupChans[shard]
- var (
- docs []types.IndexedDoc
- numDocs int
- )
- if request.docIds == nil {
- docs, numDocs = engine.indexers[shard].Lookup(
- request.tokens, request.labels,
- nil, request.countDocsOnly, request.logic)
-
-
- } else {
- docs, numDocs = engine.indexers[shard].Lookup(
- request.tokens, request.labels,
- request.docIds, request.countDocsOnly, request.logic)
-
-
- }
- if request.countDocsOnly {
- request.rankerReturnChan <- rankerReturnReq{numDocs: numDocs}
- continue
- }
- if len(docs) == 0 {
- request.rankerReturnChan <- rankerReturnReq{}
- continue
- }
- if request.orderless {
-
- engine.orderLess(request, docs)
- continue
- }
- rankerRequest := rankerRankReq{
- countDocsOnly: request.countDocsOnly,
- docs: docs,
- options: request.options,
- rankerReturnChan: request.rankerReturnChan,
- }
- engine.rankerRankChans[shard] <- rankerRequest
- }
- }
|