server_java.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package zk
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "os/exec"
  7. "path/filepath"
  8. )
  9. type ErrMissingServerConfigField string
  10. func (e ErrMissingServerConfigField) Error() string {
  11. return fmt.Sprintf("zk: missing server config field '%s'", string(e))
  12. }
  13. const (
  14. DefaultServerTickTime = 2000
  15. DefaultServerInitLimit = 10
  16. DefaultServerSyncLimit = 5
  17. DefaultServerAutoPurgeSnapRetainCount = 3
  18. DefaultPeerPort = 2888
  19. DefaultLeaderElectionPort = 3888
  20. )
  21. type ServerConfigServer struct {
  22. ID int
  23. Host string
  24. PeerPort int
  25. LeaderElectionPort int
  26. }
  27. type ServerConfig struct {
  28. TickTime int // Number of milliseconds of each tick
  29. InitLimit int // Number of ticks that the initial synchronization phase can take
  30. SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
  31. DataDir string // Direcrory where the snapshot is stored
  32. ClientPort int // Port at which clients will connect
  33. AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
  34. AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
  35. Servers []ServerConfigServer
  36. }
  37. func (sc ServerConfig) Marshall(w io.Writer) error {
  38. if sc.DataDir == "" {
  39. return ErrMissingServerConfigField("dataDir")
  40. }
  41. fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
  42. if sc.TickTime <= 0 {
  43. sc.TickTime = DefaultServerTickTime
  44. }
  45. fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
  46. if sc.InitLimit <= 0 {
  47. sc.InitLimit = DefaultServerInitLimit
  48. }
  49. fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
  50. if sc.SyncLimit <= 0 {
  51. sc.SyncLimit = DefaultServerSyncLimit
  52. }
  53. fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
  54. if sc.ClientPort <= 0 {
  55. sc.ClientPort = DefaultPort
  56. }
  57. fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
  58. if sc.AutoPurgePurgeInterval > 0 {
  59. if sc.AutoPurgeSnapRetainCount <= 0 {
  60. sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
  61. }
  62. fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
  63. fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
  64. }
  65. if len(sc.Servers) > 0 {
  66. for _, srv := range sc.Servers {
  67. if srv.PeerPort <= 0 {
  68. srv.PeerPort = DefaultPeerPort
  69. }
  70. if srv.LeaderElectionPort <= 0 {
  71. srv.LeaderElectionPort = DefaultLeaderElectionPort
  72. }
  73. fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
  74. }
  75. }
  76. return nil
  77. }
  78. var jarSearchPaths = []string{
  79. "zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
  80. "../zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
  81. "/usr/share/java/zookeeper-*.jar",
  82. "/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
  83. "/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
  84. }
  85. func findZookeeperFatJar() string {
  86. var paths []string
  87. zkPath := os.Getenv("ZOOKEEPER_PATH")
  88. if zkPath == "" {
  89. paths = jarSearchPaths
  90. } else {
  91. paths = []string{filepath.Join(zkPath, "contrib/fatjar/zookeeper-*-fatjar.jar")}
  92. }
  93. for _, path := range paths {
  94. matches, _ := filepath.Glob(path)
  95. // TODO: could sort by version and pick latest
  96. if len(matches) > 0 {
  97. return matches[0]
  98. }
  99. }
  100. return ""
  101. }
  102. type Server struct {
  103. JarPath string
  104. ConfigPath string
  105. Stdout, Stderr io.Writer
  106. cmd *exec.Cmd
  107. }
  108. func (srv *Server) Start() error {
  109. if srv.JarPath == "" {
  110. srv.JarPath = findZookeeperFatJar()
  111. if srv.JarPath == "" {
  112. return fmt.Errorf("zk: unable to find server jar")
  113. }
  114. }
  115. srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
  116. srv.cmd.Stdout = srv.Stdout
  117. srv.cmd.Stderr = srv.Stderr
  118. return srv.cmd.Start()
  119. }
  120. func (srv *Server) Stop() error {
  121. srv.cmd.Process.Signal(os.Kill)
  122. return srv.cmd.Wait()
  123. }