flw.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package zk
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "regexp"
  9. "strconv"
  10. "strings"
  11. "time"
  12. )
  13. // FLWSrvr is a FourLetterWord helper function. In particular, this function pulls the srvr output
  14. // from the zookeeper instances and parses the output. A slice of *ServerStats structs are returned
  15. // as well as a boolean value to indicate whether this function processed successfully.
  16. //
  17. // If the boolean value is false there was a problem. If the *ServerStats slice is empty or nil,
  18. // then the error happened before we started to obtain 'srvr' values. Otherwise, one of the
  19. // servers had an issue and the "Error" value in the struct should be inspected to determine
  20. // which server had the issue.
  21. func FLWSrvr(servers []string, timeout time.Duration) ([]*ServerStats, bool) {
  22. // different parts of the regular expression that are required to parse the srvr output
  23. const (
  24. zrVer = `^Zookeeper version: ([A-Za-z0-9\.\-]+), built on (\d\d/\d\d/\d\d\d\d \d\d:\d\d [A-Za-z0-9:\+\-]+)`
  25. zrLat = `^Latency min/avg/max: (\d+)/(\d+)/(\d+)`
  26. zrNet = `^Received: (\d+).*\n^Sent: (\d+).*\n^Connections: (\d+).*\n^Outstanding: (\d+)`
  27. zrState = `^Zxid: (0x[A-Za-z0-9]+).*\n^Mode: (\w+).*\n^Node count: (\d+)`
  28. )
  29. // build the regex from the pieces above
  30. re, err := regexp.Compile(fmt.Sprintf(`(?m:\A%v.*\n%v.*\n%v.*\n%v)`, zrVer, zrLat, zrNet, zrState))
  31. if err != nil {
  32. return nil, false
  33. }
  34. imOk := true
  35. servers = FormatServers(servers)
  36. ss := make([]*ServerStats, len(servers))
  37. for i := range ss {
  38. response, err := fourLetterWord(servers[i], "srvr", timeout)
  39. if err != nil {
  40. ss[i] = &ServerStats{Error: err}
  41. imOk = false
  42. continue
  43. }
  44. matches := re.FindAllStringSubmatch(string(response), -1)
  45. if matches == nil {
  46. err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
  47. ss[i] = &ServerStats{Error: err}
  48. imOk = false
  49. continue
  50. }
  51. match := matches[0][1:]
  52. // determine current server
  53. var srvrMode Mode
  54. switch match[10] {
  55. case "leader":
  56. srvrMode = ModeLeader
  57. case "follower":
  58. srvrMode = ModeFollower
  59. case "standalone":
  60. srvrMode = ModeStandalone
  61. default:
  62. srvrMode = ModeUnknown
  63. }
  64. buildTime, err := time.Parse("01/02/2006 15:04 MST", match[1])
  65. if err != nil {
  66. ss[i] = &ServerStats{Error: err}
  67. imOk = false
  68. continue
  69. }
  70. parsedInt, err := strconv.ParseInt(match[9], 0, 64)
  71. if err != nil {
  72. ss[i] = &ServerStats{Error: err}
  73. imOk = false
  74. continue
  75. }
  76. // the ZxID value is an int64 with two int32s packed inside
  77. // the high int32 is the epoch (i.e., number of leader elections)
  78. // the low int32 is the counter
  79. epoch := int32(parsedInt >> 32)
  80. counter := int32(parsedInt & 0xFFFFFFFF)
  81. // within the regex above, these values must be numerical
  82. // so we can avoid useless checking of the error return value
  83. minLatency, _ := strconv.ParseInt(match[2], 0, 64)
  84. avgLatency, _ := strconv.ParseInt(match[3], 0, 64)
  85. maxLatency, _ := strconv.ParseInt(match[4], 0, 64)
  86. recv, _ := strconv.ParseInt(match[5], 0, 64)
  87. sent, _ := strconv.ParseInt(match[6], 0, 64)
  88. cons, _ := strconv.ParseInt(match[7], 0, 64)
  89. outs, _ := strconv.ParseInt(match[8], 0, 64)
  90. ncnt, _ := strconv.ParseInt(match[11], 0, 64)
  91. ss[i] = &ServerStats{
  92. Sent: sent,
  93. Received: recv,
  94. NodeCount: ncnt,
  95. MinLatency: minLatency,
  96. AvgLatency: avgLatency,
  97. MaxLatency: maxLatency,
  98. Connections: cons,
  99. Outstanding: outs,
  100. Epoch: epoch,
  101. Counter: counter,
  102. BuildTime: buildTime,
  103. Mode: srvrMode,
  104. Version: match[0],
  105. }
  106. }
  107. return ss, imOk
  108. }
  109. // FLWRuok is a FourLetterWord helper function. In particular, this function
  110. // pulls the ruok output from each server.
  111. func FLWRuok(servers []string, timeout time.Duration) []bool {
  112. servers = FormatServers(servers)
  113. oks := make([]bool, len(servers))
  114. for i := range oks {
  115. response, err := fourLetterWord(servers[i], "ruok", timeout)
  116. if err != nil {
  117. continue
  118. }
  119. if bytes.Equal(response[:4], []byte("imok")) {
  120. oks[i] = true
  121. }
  122. }
  123. return oks
  124. }
  125. // FLWCons is a FourLetterWord helper function. In particular, this function
  126. // pulls the ruok output from each server.
  127. //
  128. // As with FLWSrvr, the boolean value indicates whether one of the requests had
  129. // an issue. The Clients struct has an Error value that can be checked.
  130. func FLWCons(servers []string, timeout time.Duration) ([]*ServerClients, bool) {
  131. const (
  132. zrAddr = `^ /((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?):(?:\d+))\[\d+\]`
  133. zrPac = `\(queued=(\d+),recved=(\d+),sent=(\d+),sid=(0x[A-Za-z0-9]+),lop=(\w+),est=(\d+),to=(\d+),`
  134. zrSesh = `lcxid=(0x[A-Za-z0-9]+),lzxid=(0x[A-Za-z0-9]+),lresp=(\d+),llat=(\d+),minlat=(\d+),avglat=(\d+),maxlat=(\d+)\)`
  135. )
  136. re, err := regexp.Compile(fmt.Sprintf("%v%v%v", zrAddr, zrPac, zrSesh))
  137. if err != nil {
  138. return nil, false
  139. }
  140. servers = FormatServers(servers)
  141. sc := make([]*ServerClients, len(servers))
  142. imOk := true
  143. for i := range sc {
  144. response, err := fourLetterWord(servers[i], "cons", timeout)
  145. if err != nil {
  146. sc[i] = &ServerClients{Error: err}
  147. imOk = false
  148. continue
  149. }
  150. scan := bufio.NewScanner(bytes.NewReader(response))
  151. var clients []*ServerClient
  152. for scan.Scan() {
  153. line := scan.Bytes()
  154. if len(line) == 0 {
  155. continue
  156. }
  157. m := re.FindAllStringSubmatch(string(line), -1)
  158. if m == nil {
  159. err := fmt.Errorf("unable to parse fields from zookeeper response (no regex matches)")
  160. sc[i] = &ServerClients{Error: err}
  161. imOk = false
  162. continue
  163. }
  164. match := m[0][1:]
  165. queued, _ := strconv.ParseInt(match[1], 0, 64)
  166. recvd, _ := strconv.ParseInt(match[2], 0, 64)
  167. sent, _ := strconv.ParseInt(match[3], 0, 64)
  168. sid, _ := strconv.ParseInt(match[4], 0, 64)
  169. est, _ := strconv.ParseInt(match[6], 0, 64)
  170. timeout, _ := strconv.ParseInt(match[7], 0, 32)
  171. lcxid, _ := parseInt64(match[8])
  172. lzxid, _ := parseInt64(match[9])
  173. lresp, _ := strconv.ParseInt(match[10], 0, 64)
  174. llat, _ := strconv.ParseInt(match[11], 0, 32)
  175. minlat, _ := strconv.ParseInt(match[12], 0, 32)
  176. avglat, _ := strconv.ParseInt(match[13], 0, 32)
  177. maxlat, _ := strconv.ParseInt(match[14], 0, 32)
  178. clients = append(clients, &ServerClient{
  179. Queued: queued,
  180. Received: recvd,
  181. Sent: sent,
  182. SessionID: sid,
  183. Lcxid: int64(lcxid),
  184. Lzxid: int64(lzxid),
  185. Timeout: int32(timeout),
  186. LastLatency: int32(llat),
  187. MinLatency: int32(minlat),
  188. AvgLatency: int32(avglat),
  189. MaxLatency: int32(maxlat),
  190. Established: time.Unix(est, 0),
  191. LastResponse: time.Unix(lresp, 0),
  192. Addr: match[0],
  193. LastOperation: match[5],
  194. })
  195. }
  196. sc[i] = &ServerClients{Clients: clients}
  197. }
  198. return sc, imOk
  199. }
  200. // parseInt64 is similar to strconv.ParseInt, but it also handles hex values that represent negative numbers
  201. func parseInt64(s string) (int64, error) {
  202. if strings.HasPrefix(s, "0x") {
  203. i, err := strconv.ParseUint(s, 0, 64)
  204. return int64(i), err
  205. }
  206. return strconv.ParseInt(s, 0, 64)
  207. }
  208. func fourLetterWord(server, command string, timeout time.Duration) ([]byte, error) {
  209. conn, err := net.DialTimeout("tcp", server, timeout)
  210. if err != nil {
  211. return nil, err
  212. }
  213. // the zookeeper server should automatically close this socket
  214. // once the command has been processed, but better safe than sorry
  215. defer conn.Close()
  216. conn.SetWriteDeadline(time.Now().Add(timeout))
  217. _, err = conn.Write([]byte(command))
  218. if err != nil {
  219. return nil, err
  220. }
  221. conn.SetReadDeadline(time.Now().Add(timeout))
  222. return ioutil.ReadAll(conn)
  223. }