canal_conf.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package conf
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net"
  6. "regexp"
  7. "strings"
  8. "go-common/app/infra/canal/infoc"
  9. "go-common/library/conf"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. xtime "go-common/library/time"
  13. "github.com/BurntSushi/toml"
  14. "github.com/siddontang/go-mysql/canal"
  15. "github.com/siddontang/go-mysql/client"
  16. "github.com/siddontang/go-mysql/mysql"
  17. )
  18. var (
  19. // config change event
  20. event = make(chan *InsConf, 1)
  21. hbaseEvent = make(chan *HBaseInsConf, 1)
  22. tidbEvent = make(chan *TiDBInsConf, 1)
  23. canalPath string
  24. )
  25. // Addition addition attrbute of canal.
  26. type Addition struct {
  27. PrimaryKey []string `toml:"primarykey"` // kafka msg key
  28. OmitField []string `toml:"omitfield"` // field will be ignored in table
  29. }
  30. // CTable canal table.
  31. type CTable struct {
  32. PrimaryKey []string `toml:"primarykey"` // kafka msg key
  33. OmitField []string `toml:"omitfield"` // field will be ignored in table
  34. OmitAction []string `toml:"omitaction"` // action will be ignored in table
  35. Name string `toml:"name"` // table name support regular expression
  36. Tables []string
  37. }
  38. // Database represent mysql db
  39. type Database struct {
  40. Schema string `toml:"schema"`
  41. Databus *databus.Config `toml:"databus"`
  42. Infoc *infoc.Config `toml:"infoc"`
  43. CTables []*CTable `toml:"table"`
  44. TableMap map[string]*Addition
  45. }
  46. // CheckTable check database tables.
  47. func (db *Database) CheckTable(addr, user, passwd string) (err error) {
  48. var (
  49. conn *client.Conn
  50. res *mysql.Result
  51. regex *regexp.Regexp
  52. table string
  53. )
  54. db.TableMap = make(map[string]*Addition)
  55. if conn, err = client.Connect(addr, user, passwd, db.Schema); err != nil {
  56. return
  57. }
  58. defer conn.Close()
  59. if res, err = conn.Execute(fmt.Sprintf("SHOW TABLES FROM `%s`", db.Schema)); err != nil {
  60. log.Error("conn.Execute() error(%v)", err)
  61. return
  62. }
  63. for _, ctable := range db.CTables {
  64. if regex, err = regexp.Compile(ctable.Name); err != nil {
  65. log.Error("regexp.Compile(%s) error(%v)", ctable.Name, err)
  66. return
  67. }
  68. for _, value := range res.Values {
  69. table = fmt.Sprintf("%s", value[0])
  70. if regex.MatchString(table) {
  71. db.TableMap[table] = &Addition{
  72. PrimaryKey: ctable.PrimaryKey,
  73. OmitField: ctable.OmitField,
  74. }
  75. ctable.Tables = append(ctable.Tables, table)
  76. }
  77. }
  78. if len(ctable.Tables) == 0 {
  79. return fmt.Errorf("addr(%s) db(%s) subscribles nothing,table(%s) is empty", addr, db.Schema, ctable.Name)
  80. }
  81. }
  82. return
  83. }
  84. // InsConf instance config
  85. type InsConf struct {
  86. *canal.Config
  87. MonitorPeriod xtime.Duration `toml:"monitor_period"`
  88. MonitorOff bool `toml:"monitor_off"`
  89. Databases []*Database `toml:"db"`
  90. MasterInfo *MasterInfoConfig `toml:"masterinfo"`
  91. }
  92. // HBaseTable hbase canal table.
  93. type HBaseTable struct {
  94. Name string `toml:"name"` // table name
  95. OmitField []string `toml:"omitfield"` // field will be ignored in table
  96. }
  97. // HBaseDatabase hbase database.
  98. type HBaseDatabase struct {
  99. Tables []*HBaseTable `toml:"table"`
  100. Databus *databus.Config `toml:"databus"`
  101. }
  102. // HBaseInsConf hbase instance config.
  103. type HBaseInsConf struct {
  104. Cluster string
  105. Root string
  106. Addrs []string
  107. MonitorPeriod xtime.Duration `toml:"monitor_period"`
  108. MonitorOff bool `toml:"monitor_off"`
  109. Databases []*HBaseDatabase `toml:"db"`
  110. MasterInfo *MasterInfoConfig `toml:"masterinfo"`
  111. }
  112. // CanalConfig config struct
  113. type CanalConfig struct {
  114. Instances []*InsConf `toml:"instance"`
  115. HBaseInstances []*HBaseInsConf `toml:"hbase_instance"`
  116. TiDBInstances []*TiDBInsConf `toml:"tidb_instance"`
  117. }
  118. func newInsConf(fn, fc string) (c *InsConf, err error) {
  119. var ic struct {
  120. InsConf *InsConf `toml:"instance"`
  121. }
  122. ipPort := strings.TrimSuffix(fn, ".toml")
  123. if _, _, err = net.SplitHostPort(ipPort); err != nil {
  124. return
  125. }
  126. if _, err = toml.Decode(fc, &ic); err != nil {
  127. return
  128. }
  129. if ic.InsConf == nil {
  130. err = fmt.Errorf("file(%s) cannot decode toml", fn)
  131. return
  132. }
  133. if ic.InsConf.Addr != ipPort {
  134. err = fmt.Errorf("file(%s) name not equal addr(%s)", fn, ic.InsConf.Addr)
  135. return
  136. }
  137. if ic.InsConf.MasterInfo == nil {
  138. ic.InsConf.MasterInfo = Conf.MasterInfo
  139. }
  140. return ic.InsConf, nil
  141. }
  142. func newHBaseConf(fn, fc string) (c *HBaseInsConf, err error) {
  143. var ic struct {
  144. InsConf *HBaseInsConf `toml:"instance"`
  145. }
  146. if _, err = toml.Decode(fc, &ic); err != nil {
  147. return
  148. }
  149. if ic.InsConf == nil {
  150. err = fmt.Errorf("file(%s) cannot decode toml", fn)
  151. return
  152. }
  153. cluster := strings.TrimSuffix(fn, ".hbase.toml")
  154. if ic.InsConf.Cluster != cluster {
  155. err = fmt.Errorf("file(%s) name not equal name(%s)", cluster, ic.InsConf.Cluster)
  156. return
  157. }
  158. if ic.InsConf.MasterInfo == nil {
  159. ic.InsConf.MasterInfo = Conf.MasterInfo
  160. }
  161. return ic.InsConf, nil
  162. }
  163. // LoadCanalConf load canal config.
  164. func LoadCanalConf() (c *CanalConfig, err error) {
  165. var (
  166. result []*conf.Value
  167. ok bool
  168. )
  169. if canalPath != "" {
  170. result, err = localCanal()
  171. } else {
  172. result, ok = ConfClient.Configs()
  173. if !ok {
  174. panic("no canal-config")
  175. }
  176. }
  177. c = new(CanalConfig)
  178. im := map[string]struct{}{}
  179. for _, ns := range result {
  180. if ns.Name == "canal.toml" || ns.Name == "common.toml" {
  181. continue
  182. }
  183. if strings.HasSuffix(ns.Name, ".hbase.toml") {
  184. var ic *HBaseInsConf
  185. if ic, err = newHBaseConf(ns.Name, ns.Config); err != nil {
  186. err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
  187. return
  188. }
  189. c.HBaseInstances = append(c.HBaseInstances, ic)
  190. } else if strings.HasSuffix(ns.Name, ".tidb.toml") {
  191. var ic *TiDBInsConf
  192. if ic, err = newTiDBConf(ns.Name, ns.Config); err != nil {
  193. err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
  194. return
  195. }
  196. c.TiDBInstances = append(c.TiDBInstances, ic)
  197. } else {
  198. var ic *InsConf
  199. if !strings.HasSuffix(ns.Name, ".toml") {
  200. err = fmt.Errorf("file(%s) name is not a toml", ns.Name)
  201. continue
  202. }
  203. if ic, err = newInsConf(ns.Name, ns.Config); err != nil {
  204. err = fmt.Errorf("file(%s) decode error(%v)", ns.Name, err)
  205. return
  206. }
  207. if _, ok := im[ic.Addr]; ok {
  208. err = fmt.Errorf("file(%s) repeat with other toml", ns.Name)
  209. return
  210. }
  211. im[ic.Addr] = struct{}{}
  212. c.Instances = append(c.Instances, ic)
  213. }
  214. }
  215. if canalPath == "" {
  216. go func() {
  217. for name := range ConfClient.Event() {
  218. log.Info("config(%s) reload", name)
  219. reloadConfig(name)
  220. }
  221. }()
  222. }
  223. return
  224. }
  225. func localCanal() (vs []*conf.Value, ok error) {
  226. fs, err := ioutil.ReadDir(canalPath)
  227. if err != nil {
  228. panic(err)
  229. }
  230. for _, f := range fs {
  231. if !strings.HasSuffix(f.Name(), ".toml") {
  232. continue
  233. }
  234. ct, err := ioutil.ReadFile(canalPath + f.Name())
  235. if err != nil {
  236. continue
  237. }
  238. vs = append(vs, &conf.Value{
  239. Name: f.Name(),
  240. Config: string(ct),
  241. })
  242. }
  243. return
  244. }
  245. // Event returns config change event chan,
  246. func Event() chan *InsConf {
  247. return event
  248. }
  249. // HBaseEvent returns config change event chan,
  250. func HBaseEvent() chan *HBaseInsConf {
  251. return hbaseEvent
  252. }
  253. func reloadConfig(name string) {
  254. var (
  255. cf string
  256. ok bool
  257. )
  258. if name == "canal.toml" || name == "common.toml" {
  259. LoadCanal()
  260. return
  261. }
  262. if !strings.HasSuffix(name, ".toml") {
  263. return
  264. }
  265. if cf, ok = ConfClient.Value(name); !ok {
  266. // TODO(felix): auto reload? or restart hard?
  267. return
  268. }
  269. if strings.HasSuffix(name, ".hbase.toml") {
  270. ic, err := newHBaseConf(name, cf)
  271. if err != nil {
  272. return
  273. }
  274. hbaseEvent <- ic
  275. } else if strings.HasSuffix(name, ".tidb.toml") {
  276. ic, err := newTiDBConf(name, cf)
  277. if err != nil {
  278. return
  279. }
  280. tidbEvent <- ic
  281. } else {
  282. ic, err := newInsConf(name, cf)
  283. if err != nil {
  284. return
  285. }
  286. event <- ic
  287. }
  288. }