bulk_index_request.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package elastic
  5. //go:generate easyjson bulk_index_request.go
  6. import (
  7. "encoding/json"
  8. "fmt"
  9. "strings"
  10. )
  11. // BulkIndexRequest is a request to add a document to Elasticsearch.
  12. //
  13. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
  14. // for details.
  15. type BulkIndexRequest struct {
  16. BulkableRequest
  17. index string
  18. typ string
  19. id string
  20. opType string
  21. routing string
  22. parent string
  23. version int64 // default is MATCH_ANY
  24. versionType string // default is "internal"
  25. doc interface{}
  26. pipeline string
  27. retryOnConflict *int
  28. ttl string
  29. source []string
  30. useEasyJSON bool
  31. }
  32. //easyjson:json
  33. type bulkIndexRequestCommand map[string]bulkIndexRequestCommandOp
  34. //easyjson:json
  35. type bulkIndexRequestCommandOp struct {
  36. Id string `json:"_id,omitempty"`
  37. Index string `json:"_index,omitempty"`
  38. TTL string `json:"_ttl,omitempty"`
  39. Type string `json:"_type,omitempty"`
  40. Parent string `json:"_parent,omitempty"`
  41. RetryOnConflict *int `json:"_retry_on_conflict,omitempty"`
  42. Routing string `json:"_routing,omitempty"`
  43. Version int64 `json:"_version,omitempty"`
  44. VersionType string `json:"_version_type,omitempty"`
  45. Pipeline string `json:"pipeline,omitempty"`
  46. }
  47. // NewBulkIndexRequest returns a new BulkIndexRequest.
  48. // The operation type is "index" by default.
  49. func NewBulkIndexRequest() *BulkIndexRequest {
  50. return &BulkIndexRequest{
  51. opType: "index",
  52. }
  53. }
  54. // UseEasyJSON is an experimental setting that enables serialization
  55. // with github.com/mailru/easyjson, which should in faster serialization
  56. // time and less allocations, but removed compatibility with encoding/json,
  57. // usage of unsafe etc. See https://github.com/mailru/easyjson#issues-notes-and-limitations
  58. // for details. This setting is disabled by default.
  59. func (r *BulkIndexRequest) UseEasyJSON(enable bool) *BulkIndexRequest {
  60. r.useEasyJSON = enable
  61. return r
  62. }
  63. // Index specifies the Elasticsearch index to use for this index request.
  64. // If unspecified, the index set on the BulkService will be used.
  65. func (r *BulkIndexRequest) Index(index string) *BulkIndexRequest {
  66. r.index = index
  67. r.source = nil
  68. return r
  69. }
  70. // Type specifies the Elasticsearch type to use for this index request.
  71. // If unspecified, the type set on the BulkService will be used.
  72. func (r *BulkIndexRequest) Type(typ string) *BulkIndexRequest {
  73. r.typ = typ
  74. r.source = nil
  75. return r
  76. }
  77. // Id specifies the identifier of the document to index.
  78. func (r *BulkIndexRequest) Id(id string) *BulkIndexRequest {
  79. r.id = id
  80. r.source = nil
  81. return r
  82. }
  83. // OpType specifies if this request should follow create-only or upsert
  84. // behavior. This follows the OpType of the standard document index API.
  85. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html#operation-type
  86. // for details.
  87. func (r *BulkIndexRequest) OpType(opType string) *BulkIndexRequest {
  88. r.opType = opType
  89. r.source = nil
  90. return r
  91. }
  92. // Routing specifies a routing value for the request.
  93. func (r *BulkIndexRequest) Routing(routing string) *BulkIndexRequest {
  94. r.routing = routing
  95. r.source = nil
  96. return r
  97. }
  98. // Parent specifies the identifier of the parent document (if available).
  99. func (r *BulkIndexRequest) Parent(parent string) *BulkIndexRequest {
  100. r.parent = parent
  101. r.source = nil
  102. return r
  103. }
  104. // Version indicates the version of the document as part of an optimistic
  105. // concurrency model.
  106. func (r *BulkIndexRequest) Version(version int64) *BulkIndexRequest {
  107. r.version = version
  108. r.source = nil
  109. return r
  110. }
  111. // VersionType specifies how versions are created. It can be e.g. internal,
  112. // external, external_gte, or force.
  113. //
  114. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-index_.html#index-versioning
  115. // for details.
  116. func (r *BulkIndexRequest) VersionType(versionType string) *BulkIndexRequest {
  117. r.versionType = versionType
  118. r.source = nil
  119. return r
  120. }
  121. // Doc specifies the document to index.
  122. func (r *BulkIndexRequest) Doc(doc interface{}) *BulkIndexRequest {
  123. r.doc = doc
  124. r.source = nil
  125. return r
  126. }
  127. // RetryOnConflict specifies how often to retry in case of a version conflict.
  128. func (r *BulkIndexRequest) RetryOnConflict(retryOnConflict int) *BulkIndexRequest {
  129. r.retryOnConflict = &retryOnConflict
  130. r.source = nil
  131. return r
  132. }
  133. // TTL is an expiration time for the document.
  134. func (r *BulkIndexRequest) TTL(ttl string) *BulkIndexRequest {
  135. r.ttl = ttl
  136. r.source = nil
  137. return r
  138. }
  139. // Pipeline to use while processing the request.
  140. func (r *BulkIndexRequest) Pipeline(pipeline string) *BulkIndexRequest {
  141. r.pipeline = pipeline
  142. r.source = nil
  143. return r
  144. }
  145. // String returns the on-wire representation of the index request,
  146. // concatenated as a single string.
  147. func (r *BulkIndexRequest) String() string {
  148. lines, err := r.Source()
  149. if err != nil {
  150. return fmt.Sprintf("error: %v", err)
  151. }
  152. return strings.Join(lines, "\n")
  153. }
  154. // Source returns the on-wire representation of the index request,
  155. // split into an action-and-meta-data line and an (optional) source line.
  156. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/docs-bulk.html
  157. // for details.
  158. func (r *BulkIndexRequest) Source() ([]string, error) {
  159. // { "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
  160. // { "field1" : "value1" }
  161. if r.source != nil {
  162. return r.source, nil
  163. }
  164. lines := make([]string, 2)
  165. // "index" ...
  166. indexCommand := bulkIndexRequestCommandOp{
  167. Index: r.index,
  168. Type: r.typ,
  169. Id: r.id,
  170. Routing: r.routing,
  171. Parent: r.parent,
  172. Version: r.version,
  173. VersionType: r.versionType,
  174. RetryOnConflict: r.retryOnConflict,
  175. TTL: r.ttl,
  176. Pipeline: r.pipeline,
  177. }
  178. command := bulkIndexRequestCommand{
  179. r.opType: indexCommand,
  180. }
  181. var err error
  182. var body []byte
  183. if r.useEasyJSON {
  184. // easyjson
  185. body, err = command.MarshalJSON()
  186. } else {
  187. // encoding/json
  188. body, err = json.Marshal(command)
  189. }
  190. if err != nil {
  191. return nil, err
  192. }
  193. lines[0] = string(body)
  194. // "field1" ...
  195. if r.doc != nil {
  196. switch t := r.doc.(type) {
  197. default:
  198. body, err := json.Marshal(r.doc)
  199. if err != nil {
  200. return nil, err
  201. }
  202. lines[1] = string(body)
  203. case json.RawMessage:
  204. lines[1] = string(t)
  205. case *json.RawMessage:
  206. lines[1] = string(*t)
  207. case string:
  208. lines[1] = t
  209. case *string:
  210. lines[1] = *t
  211. }
  212. } else {
  213. lines[1] = "{}"
  214. }
  215. r.source = lines
  216. return lines, nil
  217. }