indexer_worker.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. // Copyright 2013 Hui Chen
  2. // Copyright 2016 ego authors
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  5. // not use this file except in compliance with the License. You may obtain
  6. // a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. // License for the specific language governing permissions and limitations
  14. // under the License.
  15. package riot
  16. import (
  17. "sync/atomic"
  18. "github.com/go-ego/riot/types"
  19. )
  20. type indexerAddDocReq struct {
  21. doc *types.DocIndex
  22. forceUpdate bool
  23. }
  24. type indexerLookupReq struct {
  25. countDocsOnly bool
  26. tokens []string
  27. labels []string
  28. docIds map[uint64]bool
  29. options types.RankOpts
  30. rankerReturnChan chan rankerReturnReq
  31. orderless bool
  32. logic types.Logic
  33. }
  34. type indexerRemoveDocReq struct {
  35. docId uint64
  36. forceUpdate bool
  37. }
  38. func (engine *Engine) indexerAddDocWorker(shard int) {
  39. for {
  40. request := <-engine.indexerAddDocChans[shard]
  41. engine.indexers[shard].AddDocToCache(request.doc, request.forceUpdate)
  42. if request.doc != nil {
  43. atomic.AddUint64(&engine.numTokenIndexAdded,
  44. uint64(len(request.doc.Keywords)))
  45. engine.loc.Lock()
  46. atomic.AddUint64(&engine.numDocsIndexed, 1)
  47. // engine.numDocsIndexed++
  48. engine.loc.Unlock()
  49. }
  50. if request.forceUpdate {
  51. engine.loc.Lock()
  52. atomic.AddUint64(&engine.numDocsForceUpdated, 1)
  53. engine.loc.Unlock()
  54. }
  55. }
  56. }
  57. func (engine *Engine) indexerRemoveDocWorker(shard int) {
  58. for {
  59. request := <-engine.indexerRemoveDocChans[shard]
  60. engine.indexers[shard].RemoveDocToCache(request.docId, request.forceUpdate)
  61. if request.docId != 0 {
  62. engine.loc.Lock()
  63. atomic.AddUint64(&engine.numDocsRemoved, 1)
  64. engine.loc.Unlock()
  65. }
  66. if request.forceUpdate {
  67. engine.loc.Lock()
  68. atomic.AddUint64(&engine.numDocsForceUpdated, 1)
  69. engine.loc.Unlock()
  70. }
  71. }
  72. }
  73. func (engine *Engine) orderLess(
  74. request indexerLookupReq, docs []types.IndexedDoc) {
  75. if engine.initOptions.IDOnly {
  76. var outputDocs []types.ScoredID
  77. // var outputDocs types.ScoredIDs
  78. for _, d := range docs {
  79. outputDocs = append(outputDocs, types.ScoredID{
  80. DocId: d.DocId,
  81. TokenSnippetLocs: d.TokenSnippetLocs,
  82. TokenLocs: d.TokenLocs})
  83. }
  84. request.rankerReturnChan <- rankerReturnReq{
  85. docs: types.ScoredIDs(outputDocs),
  86. numDocs: len(outputDocs),
  87. }
  88. return
  89. }
  90. var outputDocs []types.ScoredDoc
  91. // var outputDocs types.ScoredDocs
  92. for _, d := range docs {
  93. outputDocs = append(outputDocs, types.ScoredDoc{
  94. DocId: d.DocId,
  95. TokenSnippetLocs: d.TokenSnippetLocs,
  96. TokenLocs: d.TokenLocs})
  97. }
  98. request.rankerReturnChan <- rankerReturnReq{
  99. docs: types.ScoredDocs(outputDocs),
  100. numDocs: len(outputDocs),
  101. }
  102. }
  103. func (engine *Engine) indexerLookupWorker(shard int) {
  104. for {
  105. request := <-engine.indexerLookupChans[shard]
  106. var (
  107. docs []types.IndexedDoc
  108. numDocs int
  109. )
  110. if request.docIds == nil {
  111. docs, numDocs = engine.indexers[shard].Lookup(
  112. request.tokens, request.labels,
  113. nil, request.countDocsOnly, request.logic)
  114. // docs, numDocs = engine.indexers[shard].Lookup(request.tokens,
  115. // request.labels, nil, request.countDocsOnly)
  116. } else {
  117. docs, numDocs = engine.indexers[shard].Lookup(
  118. request.tokens, request.labels,
  119. request.docIds, request.countDocsOnly, request.logic)
  120. // docs, numDocs = engine.indexers[shard].Lookup(request.tokens,
  121. // request.labels, request.docIds, request.countDocsOnly)
  122. }
  123. if request.countDocsOnly {
  124. request.rankerReturnChan <- rankerReturnReq{numDocs: numDocs}
  125. continue
  126. }
  127. if len(docs) == 0 {
  128. request.rankerReturnChan <- rankerReturnReq{}
  129. continue
  130. }
  131. if request.orderless {
  132. // var outputDocs interface{}
  133. engine.orderLess(request, docs)
  134. continue
  135. }
  136. rankerRequest := rankerRankReq{
  137. countDocsOnly: request.countDocsOnly,
  138. docs: docs,
  139. options: request.options,
  140. rankerReturnChan: request.rankerReturnChan,
  141. }
  142. engine.rankerRankChans[shard] <- rankerRequest
  143. }
  144. }