market_es.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package dao
  2. import (
  3. "context"
  4. "gopkg.in/olivere/elastic.v5"
  5. "strconv"
  6. "time"
  7. "go-common/app/job/openplatform/open-market/model"
  8. "go-common/library/log"
  9. )
  10. var (
  11. orderIndex = "product_order"
  12. orderType = "product_order_info"
  13. commentIndex = "open_mall"
  14. commentType = "ugc"
  15. marketIndex = "product_market"
  16. marketType = "product_market_info"
  17. )
  18. const (
  19. _orderStatusComplete = 2
  20. _dateFormat = "2006-01-02"
  21. _timeFormat = "2006-01-02 15:04:05"
  22. )
  23. //OrderData fetch ordercount by project and days
  24. func (d *Dao) OrderData(c context.Context, projectID int32, startTimeUnix int64) (orderData map[int32]int64, err error) {
  25. var (
  26. searchResult *elastic.SearchResult
  27. startTime string
  28. firstTime string
  29. firstDay time.Time
  30. startDay time.Time
  31. daysBefore = -1
  32. )
  33. orderData = make(map[int32]int64)
  34. startTime = time.Unix(startTimeUnix, 0).Add(time.Hour * 24).Format(_dateFormat)
  35. firstTime = time.Unix(startTimeUnix, 0).Add(time.Hour * 24).Add(time.Hour * 24 * -30).Format(_dateFormat)
  36. startDay, _ = time.Parse(_dateFormat, startTime)
  37. firstDay, _ = time.Parse(_dateFormat, firstTime)
  38. for {
  39. daysBefore++
  40. startDay = startDay.Add(time.Hour * -24)
  41. if !(startDay.Before(firstDay)) {
  42. rangeQuery := elastic.NewRangeQuery("pay_time")
  43. rangeQuery.Gte(startDay.Format(_timeFormat))
  44. rangeQuery.Lte(startDay.Add(time.Hour * 24).Format(_timeFormat))
  45. query := elastic.NewBoolQuery()
  46. query.Must(elastic.NewMatchQuery("project_id", projectID))
  47. query.Must(elastic.NewMatchQuery("status", _orderStatusComplete))
  48. query.Must(elastic.NewExistsQuery("pay_time"))
  49. query.Must(rangeQuery)
  50. searchResult, err = d.es.Search().
  51. Index(orderIndex).Type(orderType).
  52. Query(query).
  53. Size(0).
  54. Timeout(d.c.ElasticSearch.Timeout).
  55. Do(c)
  56. orderData[int32(daysBefore)] = searchResult.TotalHits()
  57. continue
  58. }
  59. break
  60. }
  61. return
  62. }
  63. // CommentData get comment info from ugc es
  64. func (d *Dao) CommentData(c context.Context, projectID int32, startTimeUnix int64) (commentData map[int32]int64, err error) {
  65. var (
  66. searchResult *elastic.SearchResult
  67. startTime string
  68. firstTime string
  69. firstDay time.Time
  70. startDay time.Time
  71. daysBefore = -1
  72. )
  73. commentData = make(map[int32]int64)
  74. startTime = time.Unix(startTimeUnix, 0).Add(time.Hour * 24).Format(_dateFormat)
  75. firstTime = time.Unix(startTimeUnix, 0).Add(time.Hour * 24).Add(time.Hour * 24 * -30).Format(_dateFormat)
  76. startDay, _ = time.Parse(_dateFormat, startTime)
  77. firstDay, _ = time.Parse(_dateFormat, firstTime)
  78. for {
  79. daysBefore++
  80. startDay = startDay.Add(time.Hour * -24)
  81. if !(startDay.Before(firstDay)) {
  82. rangeQuery := elastic.NewRangeQuery("ctime")
  83. rangeQuery.Gte(startDay.Unix() * 1000)
  84. rangeQuery.Lte(startDay.Add(time.Hour*24).Unix() * 1000)
  85. query := elastic.NewBoolQuery()
  86. query.Must(elastic.NewMatchQuery("subjectId", projectID))
  87. query.Must(elastic.NewMatchQuery("subjectType", 2))
  88. query.Must(rangeQuery)
  89. searchResult, err = d.esUgc.Search().
  90. Index(commentIndex).Type(commentType).
  91. Query(query).
  92. Size(0).
  93. Timeout(d.c.ElasticSearch.Timeout).
  94. Do(c)
  95. commentData[int32(daysBefore)] = searchResult.TotalHits()
  96. continue
  97. }
  98. break
  99. }
  100. return
  101. }
  102. //SaveData save result to es
  103. func (d *Dao) SaveData(c context.Context, project *model.Project) (err error) {
  104. exists, err := d.es.IndexExists(marketIndex).Do(context.Background())
  105. if err != nil {
  106. log.Error("check if index exists error (%v)", err)
  107. return
  108. }
  109. if !exists {
  110. if _, err = d.es.CreateIndex(marketIndex).Do(c); err != nil {
  111. log.Error("index name(%s) create err(%v)", marketIndex, err)
  112. return
  113. }
  114. }
  115. _, err = d.es.Index().
  116. Index(marketIndex).
  117. Type(marketType).
  118. Id(strconv.Itoa(int(project.ID))).
  119. BodyJson(project).
  120. Refresh("true").
  121. Do(c)
  122. if err != nil {
  123. log.Error("put es [%d] err(%v)", project.ID, err)
  124. }
  125. return
  126. }