dm_date.go 10.0 KB


  1. package business
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/search/dao"
  10. "go-common/app/job/main/search/model"
  11. xsql "go-common/library/database/sql"
  12. "go-common/library/log"
  13. "go-common/library/queue/databus"
  14. )
  15. const (
  16. minIDSQL = "SELECT id FROM dm_index_%03d WHERE ctime > ? ORDER BY id ASC LIMIT 1"
  17. )
  18. // DmDate .
  19. type DmDate struct {
  20. d *dao.Dao
  21. appid string
  22. attrs *model.Attrs
  23. db *xsql.DB
  24. dtb *databus.Databus
  25. offsets model.LoopOffsets
  26. mapData []model.MapData
  27. commits map[int32]*databus.Message
  28. frontTwelveMonthDate string
  29. tableName []string
  30. oidDayMap map[string]string
  31. }
  32. // NewDmDate .
  33. func NewDmDate(d *dao.Dao, appid string) (dd *DmDate) {
  34. dd = &DmDate{
  35. d: d,
  36. appid: appid,
  37. attrs: d.AttrPool[appid],
  38. offsets: make(map[int]*model.LoopOffset),
  39. commits: make(map[int32]*databus.Message),
  40. frontTwelveMonthDate: "2017-08-01",
  41. oidDayMap: make(map[string]string),
  42. }
  43. for i := dd.attrs.Table.TableFrom; i <= dd.attrs.Table.TableTo; i++ {
  44. dd.offsets[i] = &model.LoopOffset{}
  45. }
  46. dd.db = d.DBPool[dd.attrs.DBName]
  47. dd.dtb = d.DatabusPool[dd.attrs.Databus.Databus]
  48. return
  49. }
  50. // Business return business.
  51. func (dd *DmDate) Business() string {
  52. return dd.attrs.Business
  53. }
  54. // InitIndex init index.
  55. func (dd *DmDate) InitIndex(c context.Context) {
  56. var (
  57. indexAliasName string
  58. indexEntityName string
  59. )
  60. aliases, err := dd.d.GetAliases(dd.attrs.ESName, dd.attrs.Index.IndexAliasPrefix)
  61. now := time.Now()
  62. for i := -12; i < 18; i++ {
  63. newDate := now.AddDate(0, i, 0).Format("2006-01")
  64. indexAliasName = dd.attrs.Index.IndexAliasPrefix + strings.Replace(newDate, "-", "_", -1)
  65. indexEntityName = dd.attrs.Index.IndexEntityPrefix + strings.Replace(newDate, "-", "_", -1)
  66. if err != nil {
  67. dd.d.InitIndex(c, nil, dd.attrs.ESName, indexAliasName, indexEntityName, dd.attrs.Index.IndexMapping)
  68. } else {
  69. dd.d.InitIndex(c, aliases, dd.attrs.ESName, indexAliasName, indexEntityName, dd.attrs.Index.IndexMapping)
  70. }
  71. }
  72. }
  73. // InitOffset .
  74. func (dd *DmDate) InitOffset(c context.Context) {
  75. dd.d.InitOffset(c, dd.offsets[0], dd.attrs, dd.tableName)
  76. log.Info("in InitOffset")
  77. for i := dd.attrs.Table.TableFrom; i <= dd.attrs.Table.TableTo; i++ {
  78. var (
  79. id int64
  80. err error
  81. row *xsql.Row
  82. )
  83. row = dd.db.QueryRow(c, fmt.Sprintf(minIDSQL, i), dd.frontTwelveMonthDate)
  84. if err = row.Scan(&id); err != nil {
  85. if err == xsql.ErrNoRows {
  86. log.Info("in ErrNoRows")
  87. err = nil
  88. } else {
  89. log.Info("row.Scan error(%v)", err)
  90. log.Error("row.Scan error(%v)", err)
  91. time.Sleep(time.Second * 3)
  92. continue
  93. }
  94. }
  95. log.Info("here i am %d", i)
  96. dd.offsets[i] = &model.LoopOffset{}
  97. dd.offsets[i].OffsetID = id
  98. }
  99. log.Info("InitOffset over")
  100. }
  101. // Offset get offset.
  102. func (dd *DmDate) Offset(c context.Context) {
  103. for i := dd.attrs.Table.TableFrom; i <= dd.attrs.Table.TableTo; i++ {
  104. tableName := fmt.Sprintf("%s%0"+dd.attrs.Table.TableZero+"d", dd.attrs.Table.TablePrefix, i)
  105. offset, err := dd.d.Offset(c, dd.attrs.AppID, tableName)
  106. if err != nil {
  107. log.Error("dd.d.Offset error(%v)", err)
  108. time.Sleep(time.Second * 3)
  109. }
  110. dd.offsets[i].SetReview(offset.ReviewID, offset.ReviewTime)
  111. dd.offsets[i].SetOffset(offset.OffsetID(), offset.OffsetTime())
  112. }
  113. }
  114. // SetRecover set recover
  115. func (dd *DmDate) SetRecover(c context.Context, recoverID int64, recoverTime string, i int) {
  116. }
  117. // IncrMessages .
  118. func (dd *DmDate) IncrMessages(c context.Context) (length int, err error) {
  119. ticker := time.NewTicker(time.Duration(time.Millisecond * time.Duration(dd.attrs.Databus.Ticker)))
  120. defer ticker.Stop()
  121. timeStr := time.Now().Format("2006-01-02")
  122. t, _ := time.ParseInLocation("2006-01-02", timeStr, time.Local)
  123. tomorrowZeroTimestamp := t.AddDate(0, 0, 1).Unix()
  124. nowTimestamp := time.Now().Unix()
  125. if tomorrowZeroTimestamp-nowTimestamp < 180 {
  126. dd.oidDayMap = nil
  127. dd.oidDayMap = make(map[string]string)
  128. }
  129. for {
  130. select {
  131. case msg, ok := <-dd.dtb.Messages():
  132. if !ok {
  133. log.Error("databus: %s binlog consumer exit!!!", dd.attrs.Databus)
  134. break
  135. }
  136. m := &model.Message{}
  137. dd.commits[msg.Partition] = msg
  138. if err = json.Unmarshal(msg.Value, m); err != nil {
  139. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  140. continue
  141. }
  142. if m.Action == "insert" && strings.HasPrefix(m.Table, "dm_index") {
  143. var parseMap map[string]interface{}
  144. parseMap, err = dd.d.JSON2map(m.New)
  145. if err != nil {
  146. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  147. continue
  148. }
  149. newParseMap := dd.newDtbParseMap(c, parseMap)
  150. indexID := newParseMap["index_id"].(string)
  151. indexName := newParseMap["index_name"].(string)
  152. if _, exists := dd.oidDayMap[indexID]; exists {
  153. continue
  154. }
  155. dd.oidDayMap[indexID] = indexName
  156. dd.mapData = append(dd.mapData, newParseMap)
  157. }
  158. if len(dd.mapData) < dd.attrs.Databus.AggCount {
  159. continue
  160. }
  161. case <-ticker.C:
  162. }
  163. break
  164. }
  165. if len(dd.mapData) > 0 {
  166. dd.mapData, err = dd.d.ExtraData(c, dd.mapData, dd.attrs, "dtb", []string{})
  167. }
  168. length = len(dd.mapData)
  169. //amd.d.extraData(c, amd, "dtb")
  170. return
  171. }
  172. // AllMessages .
  173. func (dd *DmDate) AllMessages(c context.Context) (length int, err error) {
  174. dd.mapData = []model.MapData{}
  175. for i := dd.attrs.Table.TableFrom; i <= dd.attrs.Table.TableTo; i++ {
  176. var rows *xsql.Rows
  177. if dd.offsets[i].OffsetID == 0 {
  178. continue
  179. }
  180. if rows, err = dd.db.Query(c, fmt.Sprintf(dd.attrs.DataSQL.SQLByID, dd.attrs.DataSQL.SQLFields, i), dd.offsets[i].OffsetID, dd.attrs.Other.Size); err != nil {
  181. log.Error("AllMessages db.Query error(%v)", err)
  182. return
  183. }
  184. tempList := []model.MapData{}
  185. for rows.Next() {
  186. item, row := dao.InitMapData(dd.attrs.DataSQL.DataIndexFields)
  187. if err = rows.Scan(row...); err != nil {
  188. log.Error("appMultipleDatabus.AllMessages rows.Scan() error(%v)", err)
  189. continue
  190. }
  191. newParseMap := dd.newParseMap(c, item)
  192. ctime, ok := newParseMap["ctime"].(*interface{})
  193. if ok {
  194. dbTime := (*ctime).(time.Time)
  195. dbTimeStr := dbTime.Format("2006-01-02")
  196. t1, err1 := time.Parse("2006-01-02", dd.frontTwelveMonthDate)
  197. t2, err2 := time.Parse("2006-01-02", dbTimeStr)
  198. if err1 != nil || err2 != nil || t1.After(t2) {
  199. continue
  200. }
  201. } else {
  202. continue
  203. }
  204. tempList = append(tempList, newParseMap)
  205. dd.mapData = append(dd.mapData, newParseMap)
  206. }
  207. rows.Close()
  208. tmpLength := len(tempList)
  209. if tmpLength > 0 {
  210. dd.offsets[i].SetTempOffset(tempList[tmpLength-1].PrimaryID(), tempList[tmpLength-1].StrCTime())
  211. }
  212. }
  213. length = len(dd.mapData)
  214. if length > 0 {
  215. dd.mapData, err = dd.d.ExtraData(c, dd.mapData, dd.attrs, "db", []string{})
  216. }
  217. log.Info("length is %d", length)
  218. return
  219. }
  220. // BulkIndex .
  221. func (dd *DmDate) BulkIndex(c context.Context, start int, end int, writeEntityIndex bool) (err error) {
  222. partData := dd.mapData[start:end]
  223. // if dd.d.GetConfig(c).Business.Index {
  224. // err = dd.d.BulkDBData(c, dd.attrs, partData...)
  225. // } else {
  226. // err = dd.d.BulkDatabusData(c, dd.attrs, partData...)
  227. // }
  228. err = dd.d.BulkDBData(c, dd.attrs, writeEntityIndex, partData...)
  229. return
  230. }
  231. // Commit commit offset.
  232. func (dd *DmDate) Commit(c context.Context) (err error) {
  233. if dd.d.GetConfig(c).Business.Index {
  234. for i := dd.attrs.Table.TableFrom; i <= dd.attrs.Table.TableTo; i++ {
  235. tOffset := dd.offsets[i]
  236. if tOffset.TempOffsetID != 0 {
  237. tOffset.OffsetID = tOffset.TempOffsetID
  238. }
  239. if tOffset.TempOffsetTime != "" {
  240. tOffset.OffsetTime = tOffset.TempOffsetTime
  241. }
  242. tableName := fmt.Sprintf("%s%0"+dd.attrs.Table.TableZero+"d", dd.attrs.Table.TablePrefix, i)
  243. if err = dd.d.CommitOffset(c, tOffset, dd.attrs.AppID, tableName); err != nil {
  244. log.Error("appMultipleDatabus.Commit error(%v)", err)
  245. continue
  246. }
  247. }
  248. } else {
  249. for k, c := range dd.commits {
  250. if err = c.Commit(); err != nil {
  251. log.Error("appMultipleDatabus.Commit error(%v)", err)
  252. continue
  253. }
  254. delete(dd.commits, k)
  255. }
  256. }
  257. dd.mapData = []model.MapData{}
  258. return
  259. }
  260. // Sleep interval duration.
  261. func (dd *DmDate) Sleep(c context.Context) {
  262. }
  263. // Size return size.
  264. func (dd *DmDate) Size(c context.Context) int {
  265. return 0
  266. }
  267. // newParseMap .
  268. func (dd *DmDate) newParseMap(c context.Context, parseMap map[string]interface{}) (res map[string]interface{}) {
  269. res = parseMap
  270. indexName, strID := "", ""
  271. if res["month"] != nil {
  272. if month, ok := res["month"].(*interface{}); ok {
  273. mth := strings.Replace(dd.b2s((*month).([]uint8)), "-", "_", -1)
  274. indexName = "dm_date_" + mth
  275. }
  276. }
  277. if res["date"] != nil {
  278. if date, ok := res["date"].(*interface{}); ok {
  279. dte := strings.Replace(dd.b2s((*date).([]uint8)), "-", "_", -1)
  280. if oid, ok := res["oid"].(*interface{}); ok {
  281. strID = strconv.FormatInt((*oid).(int64), 10) + "_" + dte
  282. }
  283. }
  284. }
  285. res["index_name"] = indexName
  286. res["index_id"] = strID
  287. return
  288. }
  289. // newDtbParseMap .
  290. func (dd *DmDate) newDtbParseMap(c context.Context, parseMap map[string]interface{}) (res map[string]interface{}) {
  291. res = parseMap
  292. indexName, strID, mth, dte, id := "", "", "", "", ""
  293. if res["ctime"] != nil {
  294. if ctime, ok := res["ctime"].(string); ok {
  295. t, _ := time.Parse("2006-01-02 15:04:05", ctime)
  296. mth = t.Format("2006-01")
  297. dte = t.Format("2006-01-02")
  298. indexName = "dm_date_" + strings.Replace(mth, "-", "_", -1)
  299. }
  300. }
  301. if res["oid"] != nil {
  302. if oid, ok := res["oid"].(int64); ok {
  303. strOid := strconv.FormatInt(oid, 10)
  304. strID = strOid + "_" + strings.Replace(dte, "-", "_", -1)
  305. }
  306. }
  307. if res["id"] != nil {
  308. if newID, ok := res["id"].(int64); ok {
  309. id = strconv.Itoa(int(newID))
  310. }
  311. }
  312. for k := range res {
  313. if k == "id" || k == "oid" {
  314. continue
  315. }
  316. delete(res, k)
  317. }
  318. res["index_name"] = indexName
  319. res["index_id"] = strID
  320. res["month"] = mth
  321. res["date"] = dte
  322. res["id"] = id
  323. return
  324. }
  325. // bs2 []uint8 to string.
  326. func (dd *DmDate) b2s(bs []uint8) string {
  327. b := make([]byte, len(bs))
  328. for i, v := range bs {
  329. b[i] = byte(v)
  330. }
  331. return string(b)
  332. }