dao.go 17 KB


  1. package dao
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "go-common/app/job/main/search/conf"
  11. "go-common/app/job/main/search/model"
  12. "go-common/library/xstr"
  13. // "go-common/database/hbase"
  14. xsql "go-common/library/database/sql"
  15. "go-common/library/log"
  16. "go-common/library/log/infoc"
  17. "go-common/library/queue/databus"
  18. "go-common/library/stat/prom"
  19. "gopkg.in/olivere/elastic.v5"
  20. )
  21. var errorsCount = prom.BusinessErrCount
  22. const (
  23. // business
  24. // search db name. for table attr,offset,manager.
  25. _searchDB = "search"
  26. )
  27. // App .
  28. type App interface {
  29. Business() string
  30. InitIndex(c context.Context)
  31. InitOffset(c context.Context)
  32. Offset(c context.Context)
  33. SetRecover(c context.Context, recoverID int64, recoverTime string, i int)
  34. IncrMessages(c context.Context) (length int, err error)
  35. AllMessages(c context.Context) (length int, err error)
  36. BulkIndex(c context.Context, start, end int, writeEntityIndex bool) (err error)
  37. Commit(c context.Context) (err error)
  38. Sleep(c context.Context)
  39. Size(c context.Context) (size int)
  40. }
  41. // Dao .
  42. type Dao struct {
  43. c *conf.Config
  44. // smsClient
  45. sms *sms
  46. // search db
  47. SearchDB *xsql.DB
  48. // hbase *hbase.Client
  49. BusinessPool map[string]model.BsnAppInfo
  50. AttrPool map[string]*model.Attrs
  51. AppPool map[string]App
  52. DBPool map[string]*xsql.DB
  53. ESPool map[string]*elastic.Client
  54. DatabusPool map[string]*databus.Databus
  55. InfoCPool map[string]*infoc.Infoc
  56. }
  57. // New .
  58. func New(c *conf.Config) (d *Dao) {
  59. d = &Dao{
  60. c: c,
  61. DBPool: newDbPool(c),
  62. }
  63. // check search db
  64. if d.SearchDB = d.DBPool[_searchDB]; d.SearchDB == nil {
  65. panic("SearchDB must config")
  66. }
  67. d.sms = newSMS(d)
  68. d.BusinessPool = newBusinessPool(d)
  69. d.AttrPool = newAttrPool(d)
  70. d.ESPool = newEsPool(c, d)
  71. // consumer
  72. d.DatabusPool = newDatabusPool(c, d)
  73. d.InfoCPool = newInfoCPool(c, d)
  74. return
  75. }
  76. // newDatabusPool .
  77. func newDatabusPool(c *conf.Config, d *Dao) (pool map[string]*databus.Databus) {
  78. pool = make(map[string]*databus.Databus)
  79. if c.Business.Index {
  80. return
  81. }
  82. for name := range d.BusinessPool {
  83. if config, ok := c.Databus[name]; ok {
  84. pool[name] = databus.New(config)
  85. }
  86. }
  87. return
  88. }
  89. // newInfoCPool .
  90. func newInfoCPool(c *conf.Config, d *Dao) (pool map[string]*infoc.Infoc) {
  91. pool = map[string]*infoc.Infoc{}
  92. if c.Business.Index {
  93. return
  94. }
  95. for k := range d.BusinessPool {
  96. if n, ok := c.InfoC[k]; ok {
  97. pool[k] = infoc.New(n)
  98. }
  99. }
  100. return
  101. }
  102. // newBusinessPool all appid info from one business
  103. func newBusinessPool(d *Dao) (pool map[string]model.BsnAppInfo) {
  104. pool = map[string]model.BsnAppInfo{}
  105. if bns, err := newBusiness(d, d.c.Business.Env); err == nil {
  106. for _, v := range bns.bInfo.AppInfo {
  107. if v.AppID != "" {
  108. pool[v.AppID] = v
  109. }
  110. }
  111. }
  112. return
  113. }
  114. // newAttrPool .
  115. func newAttrPool(d *Dao) (pool map[string]*model.Attrs) {
  116. pool = make(map[string]*model.Attrs)
  117. for k := range d.BusinessPool {
  118. ar := newAttr(d, k)
  119. pool[k] = ar.attrs
  120. }
  121. //fmt.Println("strace:attr-pool>", pool)
  122. return
  123. }
  124. // SetRecover set recover.
  125. func (d *Dao) SetRecover(c context.Context, appid string, recoverID int64, recoverTime string, i int) {
  126. d.AppPool[appid].SetRecover(c, recoverID, recoverTime, i)
  127. }
  128. // newDbPool db combo
  129. func newDbPool(c *conf.Config) (pool map[string]*xsql.DB) {
  130. pool = make(map[string]*xsql.DB)
  131. for dbName, config := range c.DB {
  132. pool[dbName] = xsql.NewMySQL(config)
  133. }
  134. return
  135. }
  136. // newEsCluster cluster action
  137. func newEsPool(c *conf.Config, d *Dao) (esCluster map[string]*elastic.Client) {
  138. esCluster = make(map[string]*elastic.Client)
  139. for esName, e := range c.Es {
  140. if client, err := elastic.NewClient(elastic.SetURL(e.Addr...)); err == nil {
  141. esCluster[esName] = client
  142. } else {
  143. d.PromError("es:集群连接失败", "cluster: %s, %v", esName, err)
  144. if err := d.SendSMS(fmt.Sprintf("[search-job]%s集群连接失败", esName)); err != nil {
  145. d.PromError("es:集群连接短信失败", "cluster: %s, %v", esName, err)
  146. }
  147. }
  148. }
  149. return
  150. }
  151. // PromError .
  152. func (d *Dao) PromError(name string, format string, args ...interface{}) {
  153. errorsCount.Incr(name)
  154. log.Error(format, args)
  155. }
  156. // Close close dao
  157. func (d *Dao) Close() {
  158. for _, db := range d.DBPool {
  159. db.Close()
  160. }
  161. }
  162. // Ping health of db.
  163. func (d *Dao) Ping(c context.Context) (err error) {
  164. // TODO 循环ping
  165. if err = d.SearchDB.Ping(c); err != nil {
  166. d.PromError("db:ping", "")
  167. return
  168. }
  169. if err = d.pingESCluster(c); err != nil {
  170. d.PromError("es:ping", "d.pingESCluster error(%v)", err)
  171. return
  172. }
  173. return
  174. }
  175. // GetAliases get all aliases by indexAliasPrefix
  176. func (d *Dao) GetAliases(esName, indexAliasPrefix string) (aliases map[string]bool, err error) {
  177. aliases = map[string]bool{}
  178. if _, ok := d.ESPool[esName]; !ok {
  179. log.Error("GetAliases 集群不存在 (%s)", esName)
  180. return
  181. }
  182. if aliasesRes, err := d.ESPool[esName].Aliases().Index(indexAliasPrefix + "*").Do(context.TODO()); err != nil {
  183. log.Error("GetAliases(%s*) failed", indexAliasPrefix)
  184. } else {
  185. for _, indexDetails := range aliasesRes.Indices {
  186. for _, v := range indexDetails.Aliases {
  187. if v.AliasName != "" {
  188. aliases[v.AliasName] = true
  189. }
  190. }
  191. }
  192. }
  193. return
  194. }
  195. // InitIndex create entity indecies & aliases if necessary
  196. func (d *Dao) InitIndex(c context.Context, aliases map[string]bool, esName, indexAliasName, indexEntityName, indexMapping string) {
  197. if indexMapping == "" {
  198. log.Error("indexEntityName(%s) mapping is epmty", indexEntityName)
  199. return
  200. }
  201. for {
  202. exists, err := d.ESPool[esName].IndexExists(indexEntityName).Do(c)
  203. if err != nil {
  204. time.Sleep(time.Second * 3)
  205. continue
  206. }
  207. if !exists {
  208. if _, err := d.ESPool[esName].CreateIndex(indexEntityName).Body(indexMapping).Do(c); err != nil {
  209. log.Error("indexEntityName(%s) create err(%v)", indexEntityName, err)
  210. time.Sleep(time.Second * 3)
  211. continue
  212. }
  213. }
  214. break
  215. }
  216. // add aliases if necessary
  217. if aliases != nil && indexAliasName != indexEntityName {
  218. if _, ok := aliases[indexAliasName]; !ok {
  219. if _, err := d.ESPool[esName].Alias().Add(indexEntityName, indexAliasName).Do(context.TODO()); err != nil {
  220. log.Error("indexEntityName(%s) failed to add alias indexAliasName(%s) err(%v)", indexEntityName, indexAliasName, err)
  221. }
  222. }
  223. }
  224. }
  225. // InitOffset init offset to offset table .
  226. func (d *Dao) InitOffset(c context.Context, offset *model.LoopOffset, attrs *model.Attrs, arr []string) {
  227. for {
  228. if err := d.bulkInitOffset(c, offset, attrs, arr); err != nil {
  229. log.Error("project(%s) initOffset(%v)", attrs.AppID, err)
  230. time.Sleep(time.Second * 3)
  231. continue
  232. }
  233. break
  234. }
  235. }
  236. // InitMapData init each field struct
  237. func InitMapData(fields []string) (item model.MapData, row []interface{}) {
  238. item = make(map[string]interface{})
  239. for _, v := range fields {
  240. item[v] = new(interface{})
  241. }
  242. for _, v := range fields {
  243. row = append(row, item[v])
  244. }
  245. return
  246. }
  247. // UpdateOffsetByMap .
  248. func UpdateOffsetByMap(offsets *model.LoopOffset, mapData ...model.MapData) {
  249. var (
  250. id int64
  251. mtime string
  252. )
  253. length := len(mapData)
  254. if length == 0 {
  255. return
  256. }
  257. offsetTime := offsets.OffsetTime
  258. lastRes := mapData[length-1]
  259. id = lastRes.PrimaryID()
  260. lastMtime := lastRes.StrMTime()
  261. //fmt.Println("real", lastMtime, id, offsets.OffsetID)
  262. if (id != offsets.OffsetID) && (offsetTime == lastMtime) {
  263. offsets.IsLoop = true
  264. } else {
  265. if offsets.IsLoop {
  266. for _, p := range mapData {
  267. tempMtime := p.StrMTime()
  268. if tempMtime == offsetTime {
  269. continue
  270. }
  271. id = p.PrimaryID()
  272. mtime = tempMtime
  273. break
  274. }
  275. } else {
  276. mtime = lastMtime
  277. }
  278. offsets.IsLoop = false
  279. }
  280. offsets.SetTempOffset(id, mtime)
  281. }
  282. // CommitOffset .
  283. func (d *Dao) CommitOffset(c context.Context, offset *model.LoopOffset, appid, tableName string) (err error) {
  284. if offset.TempOffsetID != 0 {
  285. offset.SetOffset(offset.TempOffsetID, "")
  286. }
  287. if offset.TempOffsetTime != "" {
  288. offset.SetOffset(0, offset.TempOffsetTime)
  289. }
  290. if offset.TempRecoverID >= 0 {
  291. offset.SetRecoverOffset(offset.TempRecoverID, "")
  292. }
  293. if offset.TempRecoverTime != "" {
  294. offset.SetRecoverOffset(-1, offset.TempRecoverTime)
  295. }
  296. err = d.updateOffset(c, offset, appid, tableName)
  297. return
  298. }
  299. // JSON2map json to map.
  300. func (d *Dao) JSON2map(rowJSON json.RawMessage) (result map[string]interface{}, err error) {
  301. decoder := json.NewDecoder(bytes.NewReader(rowJSON))
  302. decoder.UseNumber()
  303. if err = decoder.Decode(&result); err != nil {
  304. log.Error("JSON2map.Unmarshal(%s) error(%v)", rowJSON, err)
  305. return nil, err
  306. }
  307. // json.Number转int64
  308. for k, v := range result {
  309. switch t := v.(type) {
  310. case json.Number:
  311. if result[k], err = t.Int64(); err != nil {
  312. log.Error("JSON2map.json.Number(%v)(%v)", t, err)
  313. return nil, err
  314. }
  315. }
  316. }
  317. return
  318. }
  319. // ExtraData .
  320. func (d *Dao) ExtraData(c context.Context, mapData []model.MapData, attrs *model.Attrs, way string, tags []string) (md []model.MapData, err error) {
  321. md = mapData
  322. switch way {
  323. case "db":
  324. for i, item := range mapData {
  325. item.TransData(attrs)
  326. for k, v := range item {
  327. md[i][k] = v
  328. }
  329. }
  330. case "dtb":
  331. for i, item := range mapData {
  332. item.TransDtb(attrs)
  333. for k, v := range item {
  334. md[i][k] = v
  335. }
  336. }
  337. }
  338. for _, ex := range attrs.DataExtras {
  339. // db exists or not
  340. if _, ok := d.DBPool[ex.DBName]; !ok {
  341. log.Error("ExtraData d.DBPool excludes:%s", ex.DBName)
  342. continue
  343. }
  344. if len(tags) != 0 {
  345. for _, v := range tags {
  346. if v != ex.Tag {
  347. continue
  348. }
  349. switch ex.Type {
  350. case "slice":
  351. md, err = d.extraDataSlice(c, md, attrs, ex)
  352. default:
  353. md, err = d.extraDataDefault(c, md, attrs, ex)
  354. }
  355. }
  356. } else {
  357. switch ex.Type {
  358. case "slice":
  359. md, err = d.extraDataSlice(c, md, attrs, ex)
  360. default:
  361. md, err = d.extraDataDefault(c, md, attrs, ex)
  362. }
  363. }
  364. }
  365. return
  366. }
  367. // extraData-default
  368. func (d *Dao) extraDataDefault(c context.Context, mapData []model.MapData, attrs *model.Attrs, ex model.AttrDataExtra) (md []model.MapData, err error) {
  369. md = mapData
  370. // filter ids from in_fields
  371. var (
  372. ids []int64
  373. items map[int64]model.MapData
  374. include []string
  375. )
  376. cdtInField := ex.Condition["in_field"]
  377. items = make(map[int64]model.MapData)
  378. if cld, ok := ex.Condition["include"]; ok {
  379. include = strings.Split(cld, "=")
  380. }
  381. var rows *xsql.Rows
  382. if cdtInFields := strings.Split(cdtInField, ","); len(cdtInFields) == 1 { //FIXME 支持主键多个条件定位一条数据
  383. for _, m := range mapData {
  384. if v, ok := m[cdtInField]; ok {
  385. if len(include) >= 2 { //TODO 支持多种
  386. if cldVal, ok := m[include[0]]; ok && strconv.FormatInt(cldVal.(int64), 10) == include[1] {
  387. ids = append(ids, v.(int64))
  388. }
  389. } else {
  390. ids = append(ids, v.(int64)) //TODO 加去重
  391. }
  392. }
  393. }
  394. // query extra data
  395. //TODO 如果分表太多的业务,单次循环size设置过大一下子来50万的数据,where in一个表会拒绝请求或超时
  396. if len(ids) > 0 {
  397. if tableFormat := strings.Split(ex.TableFormat, ","); ex.TableFormat == "" || tableFormat[0] == "single" {
  398. i := 0
  399. flag := false
  400. //TODO 缺点:耗内存
  401. for {
  402. var id []int64
  403. if (i+1)*200 < len(ids) {
  404. id = ids[i*200 : (i+1)*200]
  405. } else {
  406. id = ids[i*200:]
  407. flag = true
  408. }
  409. rows, err = d.DBPool[ex.DBName].Query(c, fmt.Sprintf(ex.SQL, xstr.JoinInts(id))+" and 1 = ? ", 1)
  410. if err != nil {
  411. log.Error("extraDataDefault db.Query error(%v)", err)
  412. return
  413. }
  414. for rows.Next() {
  415. item, row := InitMapData(ex.Fields)
  416. if err = rows.Scan(row...); err != nil {
  417. log.Error("extraDataDefault rows.Scan() error(%v)", err)
  418. continue
  419. }
  420. if v, ok := item[ex.InField]; ok {
  421. if v2, ok := v.(*interface{}); ok {
  422. item.TransData(attrs)
  423. items[(*v2).(int64)] = item
  424. }
  425. }
  426. // fmt.Println(item)
  427. }
  428. rows.Close()
  429. i++
  430. if flag {
  431. break
  432. }
  433. }
  434. } else if tableFormat[0] == "int" {
  435. formatData := make(map[int64][]int64)
  436. var dbid = []int64{}
  437. if len(tableFormat) >= 6 { // 弹幕举报根据文章id来分表 dmid进行匹配
  438. for _, m := range mapData {
  439. if v, ok := m[tableFormat[5]]; ok {
  440. dbid = append(dbid, v.(int64)) // 加去重
  441. }
  442. }
  443. } else {
  444. dbid = ids
  445. }
  446. if len(dbid) != len(ids) {
  447. log.Error("tableFormat[5] len error(%v)(%v)", len(dbid), len(ids))
  448. return
  449. }
  450. for i := 0; i < len(ids); i++ {
  451. d, e := strconv.ParseInt(tableFormat[2], 10, 64)
  452. if e != nil {
  453. log.Error("extraDataDefault strconv.Atoi() error(%v)", e)
  454. continue
  455. }
  456. d = dbid[i] % (d + 1)
  457. if d < 0 { //可能有脏数据
  458. continue
  459. }
  460. formatData[d] = append(formatData[d], ids[i])
  461. }
  462. for v, k := range formatData {
  463. rows, err = d.DBPool[ex.DBName].Query(c, fmt.Sprintf(ex.SQL, v, xstr.JoinInts(k))+" and 1 = ? ", 1)
  464. if err != nil {
  465. log.Error("extraDataDefaultTableFormat db.Query error(%v)", err)
  466. return
  467. }
  468. for rows.Next() {
  469. item, row := InitMapData(ex.Fields)
  470. if err = rows.Scan(row...); err != nil {
  471. log.Error("extraDataDefaultTableFormat rows.Scan() error(%v)", err)
  472. continue
  473. }
  474. if v, ok := item[ex.InField]; ok {
  475. if v2, ok := v.(*interface{}); ok {
  476. item.TransData(attrs)
  477. items[(*v2).(int64)] = item
  478. }
  479. }
  480. }
  481. rows.Close()
  482. }
  483. }
  484. }
  485. // fmt.Println("ids:", ids, "items:", items)
  486. // merge data
  487. for i, m := range mapData {
  488. if len(include) >= 2 { //TODO 支持多种
  489. if cldVal, ok := m[include[0]]; !ok || strconv.FormatInt(cldVal.(int64), 10) != include[1] {
  490. continue
  491. }
  492. }
  493. if k, ok := m[cdtInField]; ok {
  494. if item, ok := items[k.(int64)]; ok {
  495. for _, v := range ex.RemoveFields {
  496. delete(item, v)
  497. }
  498. item.TransData(attrs)
  499. for k, v := range item {
  500. md[i][k] = v
  501. }
  502. }
  503. }
  504. }
  505. //fmt.Println(md)
  506. } else {
  507. for i, m := range mapData {
  508. var value []interface{}
  509. for _, v := range cdtInFields {
  510. value = append(value, m[v])
  511. }
  512. rows, err = d.DBPool[ex.DBName].Query(c, ex.SQL, value...)
  513. if err != nil {
  514. log.Error("extraDataDefault db.Query error(%v)", err)
  515. return
  516. }
  517. for rows.Next() {
  518. item, row := InitMapData(ex.Fields)
  519. if err = rows.Scan(row...); err != nil {
  520. log.Error("extraDataDefault rows.Scan() error(%v)", err)
  521. continue
  522. }
  523. item.TransData(attrs)
  524. for _, v := range ex.RemoveFields {
  525. delete(item, v)
  526. }
  527. for k, v := range item {
  528. md[i][k] = v
  529. }
  530. }
  531. rows.Close()
  532. }
  533. }
  534. return
  535. }
  536. // extraData-slice
  537. func (d *Dao) extraDataSlice(c context.Context, mapData []model.MapData, attrs *model.Attrs, ex model.AttrDataExtra) (md []model.MapData, err error) {
  538. md = mapData
  539. // filter ids from in_fields
  540. var (
  541. ids []int64
  542. items map[string]map[string][]interface{}
  543. include []string
  544. )
  545. cdtInField := ex.Condition["in_field"]
  546. items = make(map[string]map[string][]interface{})
  547. sliceFields := strings.Split(ex.SliceField, ",")
  548. if cld, ok := ex.Condition["include"]; ok {
  549. include = strings.Split(cld, "=")
  550. }
  551. for _, m := range mapData {
  552. if v, ok := m[cdtInField]; ok {
  553. if len(include) >= 2 { //TODO 支持多种
  554. if cldVal, ok := m[include[0]]; ok && strconv.FormatInt(cldVal.(int64), 10) == include[1] {
  555. ids = append(ids, v.(int64))
  556. }
  557. } else {
  558. ids = append(ids, v.(int64)) //TODO 加去重
  559. }
  560. }
  561. }
  562. // query extra data
  563. if len(ids) > 0 {
  564. var rows *xsql.Rows
  565. rows, err = d.DBPool[ex.DBName].Query(c, fmt.Sprintf(ex.SQL, xstr.JoinInts(ids))+" and 1 = ? ", 1)
  566. if err != nil {
  567. log.Error("extraDataSlice db.Query error(%v)", err)
  568. return
  569. }
  570. for rows.Next() {
  571. item, row := InitMapData(ex.Fields)
  572. if err = rows.Scan(row...); err != nil {
  573. log.Error("extraDataSlice rows.Scan() error(%v)", err)
  574. continue
  575. }
  576. if v, ok := item[ex.InField]; ok {
  577. if v2, ok := v.(*interface{}); ok {
  578. var key string
  579. switch (*v2).(type) {
  580. case int, int8, int16, int32, int64:
  581. key = strconv.FormatInt((*v2).(int64), 10)
  582. case []uint, []uint8, []uint16, []uint32, []uint64:
  583. key = string((*v2).([]byte))
  584. }
  585. for _, sf := range sliceFields {
  586. if _, ok := items[key]; !ok {
  587. items[key] = make(map[string][]interface{})
  588. }
  589. var res interface{}
  590. if v3, ok := item[sf].(*interface{}); ok {
  591. switch (*v3).(type) {
  592. case []uint, []uint8, []uint16, []uint32, []uint64:
  593. res = string((*v3).([]byte))
  594. default:
  595. res = v3
  596. }
  597. }
  598. items[key][sf] = append(items[key][sf], res)
  599. }
  600. }
  601. }
  602. }
  603. rows.Close()
  604. }
  605. //log.Info("items:%v", items)
  606. // merge data
  607. for i, m := range mapData {
  608. if len(include) >= 2 { //TODO 支持多种
  609. if cldVal, ok := m[include[0]]; !ok || strconv.FormatInt(cldVal.(int64), 10) != include[1] {
  610. continue
  611. }
  612. }
  613. if v, ok := m[cdtInField]; ok {
  614. if item, ok := items[strconv.FormatInt(v.(int64), 10)]; ok {
  615. for _, sf := range sliceFields {
  616. if list, ok := item[sf]; ok {
  617. md[i][sf] = list
  618. }
  619. }
  620. } else {
  621. for _, sf := range sliceFields {
  622. md[i][sf] = []int64{}
  623. }
  624. }
  625. }
  626. }
  627. // for _, v := range md {
  628. // log.Info("md:%v", v)
  629. // }
  630. return
  631. }
  632. // GetConfig .
  633. func (d *Dao) GetConfig(c context.Context) *conf.Config {
  634. return d.c
  635. }