123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- package http
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "hash/crc32"
- "io/ioutil"
- "net/http"
- "go-common/app/infra/canal/conf"
- "go-common/app/infra/canal/infoc"
- config "go-common/library/conf"
- "go-common/library/ecode"
- bm "go-common/library/net/http/blademaster"
- "github.com/BurntSushi/toml"
- "github.com/siddontang/go-mysql/canal"
- )
- const (
- _heartHeat = 60
- _readTimeout = 90
- _flavor = "mysql"
- _updateUser = "canal"
- _updateMark = "infoc"
- )
- // InfocConf .
- type infocConf struct {
- Addr string `json:"db_addr"`
- User string `json:"user"`
- Pass string `json:"pass"`
- InfocDBs []*infocDB `json:"databases"`
- }
- // InfocDB .
- type infocDB struct {
- Schema string `json:"schema"`
- Tables []*infoTable `json:"tables"`
- LancerAddr string `json:"lancer_addr"`
- LancerTaskID string `json:"lancer_task_id"`
- LancerReportAddr string `json:"lancer_report_addr"`
- Proto string `json:"proto"`
- }
- // InfoTable .
- type infoTable struct {
- Name string `json:"name"`
- OmitFlied []string `json:"omit_field"`
- OmitAction []string `json:"omit_action"`
- }
- func infocPost(c *bm.Context) {
- var (
- ics []*infocConf
- bs []byte
- err error
- buf *bytes.Buffer
- )
- content := make(map[string]string)
- if bs, err = ioutil.ReadAll(c.Request.Body); err != nil {
- c.AbortWithStatus(http.StatusInternalServerError)
- return
- }
- if err = json.Unmarshal(bs, &ics); err != nil {
- c.AbortWithStatus(http.StatusInternalServerError)
- return
- }
- for _, ifc := range ics {
- databases := make([]*conf.Database, len(ifc.InfocDBs))
- for idx, infocDB := range ifc.InfocDBs {
- tables := make([]*conf.CTable, len(infocDB.Tables))
- for ix, table := range infocDB.Tables {
- tables[ix] = &conf.CTable{
- Name: table.Name,
- OmitAction: table.OmitAction,
- OmitField: table.OmitFlied,
- }
- }
- databases[idx] = &conf.Database{
- Schema: infocDB.Schema,
- Infoc: &infoc.Config{
- TaskID: infocDB.LancerTaskID,
- Addr: infocDB.LancerAddr,
- ReporterAddr: infocDB.LancerReportAddr,
- Proto: infocDB.Proto,
- },
- CTables: tables,
- }
- }
- ic := &conf.InsConf{
- Databases: databases,
- Config: &canal.Config{
- Addr: ifc.Addr,
- User: ifc.User,
- Password: ifc.Pass,
- ServerID: crc32.ChecksumIEEE([]byte(ifc.Addr)),
- Flavor: _flavor,
- HeartbeatPeriod: _heartHeat,
- ReadTimeout: _readTimeout,
- },
- }
- var isc = &struct {
- InsConf *conf.InsConf `toml:"instance"`
- }{
- InsConf: ic,
- }
- buf = new(bytes.Buffer)
- if err = toml.NewEncoder(buf).Encode(isc); err != nil {
- c.AbortWithStatus(http.StatusInternalServerError)
- return
- }
- content[fmt.Sprintf("%v.toml", ifc.Addr)] = buf.String()
- }
- for cn, cv := range content {
- value, err := conf.ConfClient.ConfIng(cn)
- if err == nil {
- err = conf.ConfClient.Update(value.CID, cv, _updateUser, _updateMark)
- } else if err == ecode.NothingFound {
- err = conf.ConfClient.Create(cn, cv, _updateUser, _updateMark)
- }
- if err != nil {
- c.AbortWithStatus(http.StatusInternalServerError)
- return
- }
- }
- }
- func infocCurrent(c *bm.Context) {
- var (
- ok bool
- result []*config.Value
- )
- if result, ok = conf.ConfClient.Configs(); !ok {
- c.Status(http.StatusInternalServerError)
- return
- }
- ics := make([]*infocConf, 0, len(result))
- for _, ns := range result {
- var ic struct {
- InsConf *conf.InsConf `toml:"instance"`
- }
- if _, err := toml.Decode(ns.Config, &ic); err != nil {
- c.AbortWithStatus(http.StatusInternalServerError)
- return
- }
- if ic.InsConf == nil {
- continue
- }
- icf := &infocConf{
- Addr: ic.InsConf.Addr,
- User: ic.InsConf.User,
- Pass: ic.InsConf.Password,
- }
- for _, icdb := range ic.InsConf.Databases {
- if icdb.Infoc == nil {
- continue
- }
- tables := make([]*infoTable, len(icdb.CTables))
- for idx, ctable := range icdb.CTables {
- tables[idx] = &infoTable{
- Name: ctable.Name,
- OmitFlied: ctable.OmitField,
- OmitAction: ctable.OmitAction,
- }
- }
- icf.InfocDBs = append(icf.InfocDBs, &infocDB{
- Schema: icdb.Schema,
- Tables: tables,
- LancerAddr: icdb.Infoc.Addr,
- LancerTaskID: icdb.Infoc.TaskID,
- })
- }
- ics = append(ics, icf)
- }
- c.JSON(ics, nil)
- }
|