search_aggs_pipeline_serial_diff.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. // SerialDiffAggregation implements serial differencing.
  6. // Serial differencing is a technique where values in a time series are
  7. // subtracted from itself at different time lags or periods.
  8. //
  9. // For more details, see
  10. // https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-aggregations-pipeline-serialdiff-aggregation.html
  11. type SerialDiffAggregation struct {
  12. format string
  13. gapPolicy string
  14. lag *int
  15. subAggregations map[string]Aggregation
  16. meta map[string]interface{}
  17. bucketsPaths []string
  18. }
  19. // NewSerialDiffAggregation creates and initializes a new SerialDiffAggregation.
  20. func NewSerialDiffAggregation() *SerialDiffAggregation {
  21. return &SerialDiffAggregation{
  22. subAggregations: make(map[string]Aggregation),
  23. bucketsPaths: make([]string, 0),
  24. }
  25. }
  26. func (a *SerialDiffAggregation) Format(format string) *SerialDiffAggregation {
  27. a.format = format
  28. return a
  29. }
  30. // GapPolicy defines what should be done when a gap in the series is discovered.
  31. // Valid values include "insert_zeros" or "skip". Default is "insert_zeros".
  32. func (a *SerialDiffAggregation) GapPolicy(gapPolicy string) *SerialDiffAggregation {
  33. a.gapPolicy = gapPolicy
  34. return a
  35. }
  36. // GapInsertZeros inserts zeros for gaps in the series.
  37. func (a *SerialDiffAggregation) GapInsertZeros() *SerialDiffAggregation {
  38. a.gapPolicy = "insert_zeros"
  39. return a
  40. }
  41. // GapSkip skips gaps in the series.
  42. func (a *SerialDiffAggregation) GapSkip() *SerialDiffAggregation {
  43. a.gapPolicy = "skip"
  44. return a
  45. }
  46. // Lag specifies the historical bucket to subtract from the current value.
  47. // E.g. a lag of 7 will subtract the current value from the value 7 buckets
  48. // ago. Lag must be a positive, non-zero integer.
  49. func (a *SerialDiffAggregation) Lag(lag int) *SerialDiffAggregation {
  50. a.lag = &lag
  51. return a
  52. }
  53. // SubAggregation adds a sub-aggregation to this aggregation.
  54. func (a *SerialDiffAggregation) SubAggregation(name string, subAggregation Aggregation) *SerialDiffAggregation {
  55. a.subAggregations[name] = subAggregation
  56. return a
  57. }
  58. // Meta sets the meta data to be included in the aggregation response.
  59. func (a *SerialDiffAggregation) Meta(metaData map[string]interface{}) *SerialDiffAggregation {
  60. a.meta = metaData
  61. return a
  62. }
  63. // BucketsPath sets the paths to the buckets to use for this pipeline aggregator.
  64. func (a *SerialDiffAggregation) BucketsPath(bucketsPaths ...string) *SerialDiffAggregation {
  65. a.bucketsPaths = append(a.bucketsPaths, bucketsPaths...)
  66. return a
  67. }
  68. func (a *SerialDiffAggregation) Source() (interface{}, error) {
  69. source := make(map[string]interface{})
  70. params := make(map[string]interface{})
  71. source["serial_diff"] = params
  72. if a.format != "" {
  73. params["format"] = a.format
  74. }
  75. if a.gapPolicy != "" {
  76. params["gap_policy"] = a.gapPolicy
  77. }
  78. if a.lag != nil {
  79. params["lag"] = *a.lag
  80. }
  81. // Add buckets paths
  82. switch len(a.bucketsPaths) {
  83. case 0:
  84. case 1:
  85. params["buckets_path"] = a.bucketsPaths[0]
  86. default:
  87. params["buckets_path"] = a.bucketsPaths
  88. }
  89. // AggregationBuilder (SubAggregations)
  90. if len(a.subAggregations) > 0 {
  91. aggsMap := make(map[string]interface{})
  92. source["aggregations"] = aggsMap
  93. for name, aggregate := range a.subAggregations {
  94. src, err := aggregate.Source()
  95. if err != nil {
  96. return nil, err
  97. }
  98. aggsMap[name] = src
  99. }
  100. }
  101. // Add Meta data if available
  102. if len(a.meta) > 0 {
  103. source["meta"] = a.meta
  104. }
  105. return source, nil
  106. }