infoc.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package http
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "hash/crc32"
  7. "io/ioutil"
  8. "net/http"
  9. "go-common/app/infra/canal/conf"
  10. "go-common/app/infra/canal/infoc"
  11. config "go-common/library/conf"
  12. "go-common/library/ecode"
  13. bm "go-common/library/net/http/blademaster"
  14. "github.com/BurntSushi/toml"
  15. "github.com/siddontang/go-mysql/canal"
  16. )
  17. const (
  18. _heartHeat = 60
  19. _readTimeout = 90
  20. _flavor = "mysql"
  21. _updateUser = "canal"
  22. _updateMark = "infoc"
  23. )
  24. // InfocConf .
  25. type infocConf struct {
  26. Addr string `json:"db_addr"`
  27. User string `json:"user"`
  28. Pass string `json:"pass"`
  29. InfocDBs []*infocDB `json:"databases"`
  30. }
  31. // InfocDB .
  32. type infocDB struct {
  33. Schema string `json:"schema"`
  34. Tables []*infoTable `json:"tables"`
  35. LancerAddr string `json:"lancer_addr"`
  36. LancerTaskID string `json:"lancer_task_id"`
  37. LancerReportAddr string `json:"lancer_report_addr"`
  38. Proto string `json:"proto"`
  39. }
  40. // InfoTable .
  41. type infoTable struct {
  42. Name string `json:"name"`
  43. OmitFlied []string `json:"omit_field"`
  44. OmitAction []string `json:"omit_action"`
  45. }
  46. func infocPost(c *bm.Context) {
  47. var (
  48. ics []*infocConf
  49. bs []byte
  50. err error
  51. buf *bytes.Buffer
  52. )
  53. content := make(map[string]string)
  54. if bs, err = ioutil.ReadAll(c.Request.Body); err != nil {
  55. c.AbortWithStatus(http.StatusInternalServerError)
  56. return
  57. }
  58. if err = json.Unmarshal(bs, &ics); err != nil {
  59. c.AbortWithStatus(http.StatusInternalServerError)
  60. return
  61. }
  62. for _, ifc := range ics {
  63. databases := make([]*conf.Database, len(ifc.InfocDBs))
  64. for idx, infocDB := range ifc.InfocDBs {
  65. tables := make([]*conf.CTable, len(infocDB.Tables))
  66. for ix, table := range infocDB.Tables {
  67. tables[ix] = &conf.CTable{
  68. Name: table.Name,
  69. OmitAction: table.OmitAction,
  70. OmitField: table.OmitFlied,
  71. }
  72. }
  73. databases[idx] = &conf.Database{
  74. Schema: infocDB.Schema,
  75. Infoc: &infoc.Config{
  76. TaskID: infocDB.LancerTaskID,
  77. Addr: infocDB.LancerAddr,
  78. ReporterAddr: infocDB.LancerReportAddr,
  79. Proto: infocDB.Proto,
  80. },
  81. CTables: tables,
  82. }
  83. }
  84. ic := &conf.InsConf{
  85. Databases: databases,
  86. Config: &canal.Config{
  87. Addr: ifc.Addr,
  88. User: ifc.User,
  89. Password: ifc.Pass,
  90. ServerID: crc32.ChecksumIEEE([]byte(ifc.Addr)),
  91. Flavor: _flavor,
  92. HeartbeatPeriod: _heartHeat,
  93. ReadTimeout: _readTimeout,
  94. },
  95. }
  96. var isc = &struct {
  97. InsConf *conf.InsConf `toml:"instance"`
  98. }{
  99. InsConf: ic,
  100. }
  101. buf = new(bytes.Buffer)
  102. if err = toml.NewEncoder(buf).Encode(isc); err != nil {
  103. c.AbortWithStatus(http.StatusInternalServerError)
  104. return
  105. }
  106. content[fmt.Sprintf("%v.toml", ifc.Addr)] = buf.String()
  107. }
  108. for cn, cv := range content {
  109. value, err := conf.ConfClient.ConfIng(cn)
  110. if err == nil {
  111. err = conf.ConfClient.Update(value.CID, cv, _updateUser, _updateMark)
  112. } else if err == ecode.NothingFound {
  113. err = conf.ConfClient.Create(cn, cv, _updateUser, _updateMark)
  114. }
  115. if err != nil {
  116. c.AbortWithStatus(http.StatusInternalServerError)
  117. return
  118. }
  119. }
  120. }
  121. func infocCurrent(c *bm.Context) {
  122. var (
  123. ok bool
  124. result []*config.Value
  125. )
  126. if result, ok = conf.ConfClient.Configs(); !ok {
  127. c.Status(http.StatusInternalServerError)
  128. return
  129. }
  130. ics := make([]*infocConf, 0, len(result))
  131. for _, ns := range result {
  132. var ic struct {
  133. InsConf *conf.InsConf `toml:"instance"`
  134. }
  135. if _, err := toml.Decode(ns.Config, &ic); err != nil {
  136. c.AbortWithStatus(http.StatusInternalServerError)
  137. return
  138. }
  139. if ic.InsConf == nil {
  140. continue
  141. }
  142. icf := &infocConf{
  143. Addr: ic.InsConf.Addr,
  144. User: ic.InsConf.User,
  145. Pass: ic.InsConf.Password,
  146. }
  147. for _, icdb := range ic.InsConf.Databases {
  148. if icdb.Infoc == nil {
  149. continue
  150. }
  151. tables := make([]*infoTable, len(icdb.CTables))
  152. for idx, ctable := range icdb.CTables {
  153. tables[idx] = &infoTable{
  154. Name: ctable.Name,
  155. OmitFlied: ctable.OmitField,
  156. OmitAction: ctable.OmitAction,
  157. }
  158. }
  159. icf.InfocDBs = append(icf.InfocDBs, &infocDB{
  160. Schema: icdb.Schema,
  161. Tables: tables,
  162. LancerAddr: icdb.Infoc.Addr,
  163. LancerTaskID: icdb.Infoc.TaskID,
  164. })
  165. }
  166. ics = append(ics, icf)
  167. }
  168. c.JSON(ics, nil)
  169. }