es.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package dao
  2. import (
  3. "context"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "time"
  8. "go-common/app/job/main/search/model"
  9. "go-common/library/log"
  10. "go-common/library/stat/prom"
  11. "gopkg.in/olivere/elastic.v5"
  12. )
  13. // BulkDatabusData 写入es数据来自databus.
  14. func (d *Dao) BulkDatabusData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) {
  15. // TODO 需要兼容
  16. var (
  17. request elastic.BulkableRequest
  18. bulkRequest = d.ESPool[attrs.ESName].Bulk()
  19. //indexField = ""
  20. )
  21. //s := strings.Split(attrs.DataSQL.DataIndexSuffix, ";")
  22. //if len(s) >= 2 {
  23. // indexField = strings.Split(s[1], ":")[0]
  24. //}
  25. for _, b := range bulkData {
  26. var (
  27. indexName string
  28. strID string
  29. )
  30. if name, ok := b["index_name"]; ok {
  31. if indexName, ok = name.(string); ok {
  32. delete(b, "index_name")
  33. } else {
  34. log.Error("dao.es.BulkDBData index_name err")
  35. continue
  36. }
  37. } else {
  38. if !writeEntityIndex {
  39. indexName, _ = b.Index(attrs)
  40. } else {
  41. _, indexName = b.Index(attrs)
  42. }
  43. }
  44. if id, ok := b["index_id"]; ok {
  45. if strID, ok = id.(string); !ok {
  46. log.Error("es.BulkDBData.strID(%v)", id)
  47. continue
  48. }
  49. } else {
  50. if strID, ok = b.StrID(attrs); !ok {
  51. log.Error("es.BulkDBData.strID")
  52. continue
  53. }
  54. }
  55. if indexName == "" {
  56. continue
  57. }
  58. for _, v := range attrs.DataSQL.DataIndexRemoveFields {
  59. delete(b, v)
  60. }
  61. if _, ok := b["index_field"]; ok {
  62. delete(b, "index_field")
  63. //delete(b, indexField)
  64. delete(b, "ctime")
  65. delete(b, "mtime")
  66. }
  67. for k := range b {
  68. if !d.Contain(k, attrs.DataSQL.DataIndexFormatFields) {
  69. delete(b, k)
  70. }
  71. }
  72. key := []string{}
  73. for k := range b {
  74. key = append(key, k)
  75. }
  76. for _, k := range key {
  77. customType, ok := attrs.DataSQL.DataIndexFormatFields[k]
  78. if ok {
  79. switch customType {
  80. case "ip":
  81. switch b[k].(type) {
  82. case float64:
  83. ipFormat := b.InetNtoA(int64(b[k].(float64)))
  84. b[k+"_format"] = ipFormat
  85. case int64:
  86. ipFormat := b.InetNtoA(b[k].(int64))
  87. b[k+"_format"] = ipFormat
  88. }
  89. case "arr":
  90. var arr []int
  91. binaryAttributes := strconv.FormatInt(b[k].(int64), 2)
  92. for i := len(binaryAttributes) - 1; i >= 0; i-- {
  93. b := fmt.Sprintf("%c", binaryAttributes[i])
  94. if b == "1" {
  95. arr = append(arr, len(binaryAttributes)-i)
  96. }
  97. }
  98. b[k+"_format"] = arr
  99. case "bin":
  100. var arr []int
  101. binaryAttributes := strconv.FormatInt(b[k].(int64), 2)
  102. for i := len(binaryAttributes) - 1; i >= 0; i-- {
  103. b := fmt.Sprintf("%c", binaryAttributes[i])
  104. if b == "1" {
  105. arr = append(arr, len(binaryAttributes)-i)
  106. }
  107. }
  108. b[k] = arr
  109. case "workflow":
  110. if state, ok := b[k].(int64); ok {
  111. b["state"] = state & 15
  112. b["business_state"] = state >> 4 & 15
  113. delete(b, k)
  114. }
  115. case "time":
  116. if v, ok := b[k].(string); ok {
  117. if v == "0000-00-00 00:00:00" {
  118. b[k] = "0001-01-01 00:00:00"
  119. }
  120. }
  121. default:
  122. // as long as you happy
  123. }
  124. }
  125. }
  126. if strID == "" {
  127. request = elastic.NewBulkIndexRequest().Index(indexName).Type(attrs.Index.IndexType).Doc(b)
  128. } else {
  129. request = elastic.NewBulkUpdateRequest().Index(indexName).Type(attrs.Index.IndexType).Id(strID).Doc(b).DocAsUpsert(true)
  130. }
  131. //fmt.Println(request)
  132. bulkRequest.Add(request)
  133. }
  134. if bulkRequest.NumberOfActions() == 0 {
  135. return
  136. }
  137. now := time.Now()
  138. // prom.BusinessInfoCount.Add("redis:bulk:doc", int64(bulkRequest.NumberOfActions()))
  139. for i := 0; i < bulkRequest.NumberOfActions(); i++ {
  140. prom.BusinessInfoCount.Incr("redis:bulk:doc")
  141. }
  142. if _, err = bulkRequest.Do(c); err != nil {
  143. log.Error("appid(%s) bulk error(%v)", attrs.AppID, err)
  144. }
  145. prom.LibClient.Timing("redis:bulk", int64(time.Since(now)/time.Millisecond))
  146. return
  147. }
  148. // BulkDBData 写入es数据来自db.
  149. func (d *Dao) BulkDBData(c context.Context, attrs *model.Attrs, writeEntityIndex bool, bulkData ...model.MapData) (err error) {
  150. var (
  151. indexName string
  152. strID string
  153. request elastic.BulkableRequest
  154. bulkRequest = d.ESPool[attrs.ESName].Bulk()
  155. )
  156. for _, b := range bulkData {
  157. if name, ok := b["index_name"]; ok {
  158. if indexName, ok = name.(string); ok {
  159. delete(b, "index_name")
  160. } else {
  161. log.Error("dao.es.BulkDBData index_name err")
  162. continue
  163. }
  164. } else {
  165. if !writeEntityIndex {
  166. indexName, _ = b.Index(attrs)
  167. } else {
  168. _, indexName = b.Index(attrs)
  169. }
  170. }
  171. if id, ok := b["index_id"]; ok {
  172. if strID, ok = id.(string); !ok {
  173. log.Error("es.BulkDBData.strID(%v)", id)
  174. continue
  175. }
  176. } else {
  177. if strID, ok = b.StrID(attrs); !ok {
  178. log.Error("es.BulkDBData.strID")
  179. continue
  180. }
  181. }
  182. if indexName == "" || strID == "" {
  183. continue
  184. }
  185. //attr提供要去除掉的字段,不往ES中写
  186. for _, v := range attrs.DataSQL.DataIndexRemoveFields {
  187. delete(b, v)
  188. }
  189. request = elastic.NewBulkUpdateRequest().Index(indexName).Type(attrs.Index.IndexType).Id(strID).Doc(b).DocAsUpsert(true).RetryOnConflict(3)
  190. //fmt.Println(request)
  191. bulkRequest.Add(request)
  192. }
  193. if bulkRequest.NumberOfActions() == 0 {
  194. // 注意这里request格式问题,会引起action为0
  195. return
  196. }
  197. log.Info("insert number is %d", bulkRequest.NumberOfActions())
  198. now := time.Now()
  199. // prom.BusinessInfoCount.Add("redis:bulk:doc", int64(bulkRequest.NumberOfActions()))
  200. for i := 0; i < bulkRequest.NumberOfActions(); i++ {
  201. prom.BusinessInfoCount.Incr("redis:bulk:doc")
  202. }
  203. if _, err = bulkRequest.Do(c); err != nil {
  204. log.Error("appid(%s) bulk error(%v)", attrs.AppID, err)
  205. }
  206. prom.LibClient.Timing("redis:bulk", int64(time.Since(now)/time.Millisecond))
  207. return
  208. }
  209. // pingEsCluster ping es cluster
  210. func (d *Dao) pingESCluster(ctx context.Context) (err error) {
  211. //for name, client := range d.ESPool {
  212. // if _, _, err = client.Ping(d.c.Es[name].Addr[0]).Do(ctx); err != nil {
  213. // d.PromError("Es:Ping", "%s:Ping error(%v)", name, err)
  214. // return
  215. // }
  216. //}
  217. return
  218. }
  219. // Contain .
  220. func (d *Dao) Contain(obj interface{}, target interface{}) bool {
  221. targetValue := reflect.ValueOf(target)
  222. switch reflect.TypeOf(target).Kind() {
  223. case reflect.Slice, reflect.Array:
  224. for i := 0; i < targetValue.Len(); i++ {
  225. if targetValue.Index(i).Interface() == obj {
  226. return true
  227. }
  228. }
  229. case reflect.Map:
  230. if targetValue.MapIndex(reflect.ValueOf(obj)).IsValid() {
  231. return true
  232. }
  233. }
  234. return false
  235. }