update.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package elastic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "net/url"
  7. "strings"
  8. "time"
  9. "go-common/library/ecode"
  10. )
  11. // Update elastic upsert
  12. type Update struct {
  13. *Elastic
  14. business string
  15. data map[string][]interface{}
  16. insert bool
  17. }
  18. // NewUpdate new a request every update
  19. func (e *Elastic) NewUpdate(business string) *Update {
  20. return &Update{
  21. Elastic: e,
  22. business: business,
  23. data: make(map[string][]interface{}),
  24. }
  25. }
  26. // IndexByMod index by mod
  27. func (us *Update) IndexByMod(prefix string, val, mod int64) string {
  28. tmp := mod - 1
  29. var digit int
  30. for tmp > 0 {
  31. tmp /= 10
  32. digit++
  33. }
  34. format := fmt.Sprintf("%s_%%0%dd", prefix, digit)
  35. return fmt.Sprintf(format, val%mod)
  36. }
  37. // IndexByTime index by time
  38. func (us *Update) IndexByTime(prefix string, typ indexType, t time.Time) (index string) {
  39. year := t.Format("2006")
  40. month := t.Format("01")
  41. switch typ {
  42. case IndexTypeYear:
  43. index = strings.Join([]string{prefix, year}, "_")
  44. case IndexTypeMonth:
  45. index = strings.Join([]string{prefix, year, month}, "_")
  46. case IndexTypeDay:
  47. day := t.Format("02")
  48. index = strings.Join([]string{prefix, year, month, day}, "_")
  49. case IndexTypeWeek:
  50. index = strings.Join([]string{prefix, year, month, weeks[t.Day()/8]}, "_")
  51. }
  52. return
  53. }
  54. // AddData add data items to request 'data' param
  55. func (us *Update) AddData(index string, data interface{}) *Update {
  56. if data == nil {
  57. return us
  58. }
  59. us.data[index] = append(us.data[index], data)
  60. return us
  61. }
  62. // HasData weather data is empty or not
  63. func (us *Update) HasData() bool {
  64. if us.data == nil {
  65. return false
  66. }
  67. return len(us.data) > 0
  68. }
  69. // Insert set insert flag, it means 'replace'
  70. func (us *Update) Insert() *Update {
  71. us.insert = true
  72. return us
  73. }
  74. // Do post a request
  75. func (us *Update) Do(ctx context.Context) (err error) {
  76. data, err := json.Marshal(us.data)
  77. if err != nil {
  78. return
  79. }
  80. params := url.Values{}
  81. params.Add("business", us.business)
  82. params.Add("data", string(data))
  83. params.Add("insert", fmt.Sprintf("%t", us.insert))
  84. response := new(response)
  85. if err = us.client.Post(ctx, us.c.Host+_pathUpsert, "", params, &response); err != nil {
  86. return
  87. }
  88. if !ecode.Int(response.Code).Equal(ecode.OK) {
  89. err = ecode.Int(response.Code)
  90. }
  91. return
  92. }
  93. // Params get query parameters
  94. func (us *Update) Params() string {
  95. data, _ := json.Marshal(us.data)
  96. return fmt.Sprintf("business=%s&insert=%t&data=%s", us.business, us.insert, data)
  97. }