node_proc.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package tidb
  2. import (
  3. "time"
  4. "go-common/library/log"
  5. )
  6. func (db *DB) nodeproc(e <-chan struct{}) {
  7. if db.dis == nil {
  8. return
  9. }
  10. for {
  11. <-e
  12. nodes := db.nodeList()
  13. if len(nodes) == 0 {
  14. continue
  15. }
  16. cm := make(map[string]*conn)
  17. var conns []*conn
  18. for _, conn := range db.conns {
  19. cm[conn.addr] = conn
  20. }
  21. for _, node := range nodes {
  22. if cm[node] != nil {
  23. conns = append(conns, cm[node])
  24. continue
  25. }
  26. c, err := db.connectDSN(genDSN(db.conf.DSN, node))
  27. if err == nil {
  28. conns = append(conns, c)
  29. } else {
  30. log.Error("tidb: connect addr: %s err: %+v", node, err)
  31. }
  32. }
  33. if len(conns) == 0 {
  34. log.Error("tidb: no nodes ignore event")
  35. continue
  36. }
  37. oldConns := db.conns
  38. db.mutex.Lock()
  39. db.conns = conns
  40. db.mutex.Unlock()
  41. log.Info("tidb: new nodes: %v", nodes)
  42. var removedConn []*conn
  43. for _, conn := range oldConns {
  44. var exist bool
  45. for _, c := range conns {
  46. if c.addr == conn.addr {
  47. exist = true
  48. break
  49. }
  50. }
  51. if !exist {
  52. removedConn = append(removedConn, conn)
  53. }
  54. }
  55. go db.closeConns(removedConn)
  56. }
  57. }
  58. func (db *DB) closeConns(conns []*conn) {
  59. if len(conns) == 0 {
  60. return
  61. }
  62. du := db.conf.QueryTimeout
  63. if db.conf.ExecTimeout > du {
  64. du = db.conf.ExecTimeout
  65. }
  66. if db.conf.TranTimeout > du {
  67. du = db.conf.TranTimeout
  68. }
  69. time.Sleep(time.Duration(du))
  70. for _, conn := range conns {
  71. err := conn.Close()
  72. if err != nil {
  73. log.Error("tidb: close removed conn: %s err: %v", conn.addr, err)
  74. } else {
  75. log.Info("tidb: close removed conn: %s", conn.addr)
  76. }
  77. }
  78. }