structs.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. package zk
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "log"
  6. "reflect"
  7. "runtime"
  8. "time"
  9. )
  10. var (
  11. ErrUnhandledFieldType = errors.New("zk: unhandled field type")
  12. ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
  13. ErrShortBuffer = errors.New("zk: buffer too small")
  14. )
  15. type defaultLogger struct{}
  16. func (defaultLogger) Printf(format string, a ...interface{}) {
  17. log.Printf(format, a...)
  18. }
  19. type ACL struct {
  20. Perms int32
  21. Scheme string
  22. ID string
  23. }
  24. type Stat struct {
  25. Czxid int64 // The zxid of the change that caused this znode to be created.
  26. Mzxid int64 // The zxid of the change that last modified this znode.
  27. Ctime int64 // The time in milliseconds from epoch when this znode was created.
  28. Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
  29. Version int32 // The number of changes to the data of this znode.
  30. Cversion int32 // The number of changes to the children of this znode.
  31. Aversion int32 // The number of changes to the ACL of this znode.
  32. EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
  33. DataLength int32 // The length of the data field of this znode.
  34. NumChildren int32 // The number of children of this znode.
  35. Pzxid int64 // last modified children
  36. }
  37. // ServerClient is the information for a single Zookeeper client and its session.
  38. // This is used to parse/extract the output fo the `cons` command.
  39. type ServerClient struct {
  40. Queued int64
  41. Received int64
  42. Sent int64
  43. SessionID int64
  44. Lcxid int64
  45. Lzxid int64
  46. Timeout int32
  47. LastLatency int32
  48. MinLatency int32
  49. AvgLatency int32
  50. MaxLatency int32
  51. Established time.Time
  52. LastResponse time.Time
  53. Addr string
  54. LastOperation string // maybe?
  55. Error error
  56. }
  57. // ServerClients is a struct for the FLWCons() function. It's used to provide
  58. // the list of Clients.
  59. //
  60. // This is needed because FLWCons() takes multiple servers.
  61. type ServerClients struct {
  62. Clients []*ServerClient
  63. Error error
  64. }
  65. // ServerStats is the information pulled from the Zookeeper `stat` command.
  66. type ServerStats struct {
  67. Sent int64
  68. Received int64
  69. NodeCount int64
  70. MinLatency int64
  71. AvgLatency int64
  72. MaxLatency int64
  73. Connections int64
  74. Outstanding int64
  75. Epoch int32
  76. Counter int32
  77. BuildTime time.Time
  78. Mode Mode
  79. Version string
  80. Error error
  81. }
  82. type requestHeader struct {
  83. Xid int32
  84. Opcode int32
  85. }
  86. type responseHeader struct {
  87. Xid int32
  88. Zxid int64
  89. Err ErrCode
  90. }
  91. type multiHeader struct {
  92. Type int32
  93. Done bool
  94. Err ErrCode
  95. }
  96. type auth struct {
  97. Type int32
  98. Scheme string
  99. Auth []byte
  100. }
  101. // Generic request structs
  102. type pathRequest struct {
  103. Path string
  104. }
  105. type PathVersionRequest struct {
  106. Path string
  107. Version int32
  108. }
  109. type pathWatchRequest struct {
  110. Path string
  111. Watch bool
  112. }
  113. type pathResponse struct {
  114. Path string
  115. }
  116. type statResponse struct {
  117. Stat Stat
  118. }
  119. //
  120. type CheckVersionRequest PathVersionRequest
  121. type closeRequest struct{}
  122. type closeResponse struct{}
  123. type connectRequest struct {
  124. ProtocolVersion int32
  125. LastZxidSeen int64
  126. TimeOut int32
  127. SessionID int64
  128. Passwd []byte
  129. }
  130. type connectResponse struct {
  131. ProtocolVersion int32
  132. TimeOut int32
  133. SessionID int64
  134. Passwd []byte
  135. }
  136. type CreateRequest struct {
  137. Path string
  138. Data []byte
  139. Acl []ACL
  140. Flags int32
  141. }
  142. type createResponse pathResponse
  143. type DeleteRequest PathVersionRequest
  144. type deleteResponse struct{}
  145. type errorResponse struct {
  146. Err int32
  147. }
  148. type existsRequest pathWatchRequest
  149. type existsResponse statResponse
  150. type getAclRequest pathRequest
  151. type getAclResponse struct {
  152. Acl []ACL
  153. Stat Stat
  154. }
  155. type getChildrenRequest pathRequest
  156. type getChildrenResponse struct {
  157. Children []string
  158. }
  159. type getChildren2Request pathWatchRequest
  160. type getChildren2Response struct {
  161. Children []string
  162. Stat Stat
  163. }
  164. type getDataRequest pathWatchRequest
  165. type getDataResponse struct {
  166. Data []byte
  167. Stat Stat
  168. }
  169. type getMaxChildrenRequest pathRequest
  170. type getMaxChildrenResponse struct {
  171. Max int32
  172. }
  173. type getSaslRequest struct {
  174. Token []byte
  175. }
  176. type pingRequest struct{}
  177. type pingResponse struct{}
  178. type setAclRequest struct {
  179. Path string
  180. Acl []ACL
  181. Version int32
  182. }
  183. type setAclResponse statResponse
  184. type SetDataRequest struct {
  185. Path string
  186. Data []byte
  187. Version int32
  188. }
  189. type setDataResponse statResponse
  190. type setMaxChildren struct {
  191. Path string
  192. Max int32
  193. }
  194. type setSaslRequest struct {
  195. Token string
  196. }
  197. type setSaslResponse struct {
  198. Token string
  199. }
  200. type setWatchesRequest struct {
  201. RelativeZxid int64
  202. DataWatches []string
  203. ExistWatches []string
  204. ChildWatches []string
  205. }
  206. type setWatchesResponse struct{}
  207. type syncRequest pathRequest
  208. type syncResponse pathResponse
  209. type setAuthRequest auth
  210. type setAuthResponse struct{}
  211. type multiRequestOp struct {
  212. Header multiHeader
  213. Op interface{}
  214. }
  215. type multiRequest struct {
  216. Ops []multiRequestOp
  217. DoneHeader multiHeader
  218. }
  219. type multiResponseOp struct {
  220. Header multiHeader
  221. String string
  222. Stat *Stat
  223. Err ErrCode
  224. }
  225. type multiResponse struct {
  226. Ops []multiResponseOp
  227. DoneHeader multiHeader
  228. }
  229. func (r *multiRequest) Encode(buf []byte) (int, error) {
  230. total := 0
  231. for _, op := range r.Ops {
  232. op.Header.Done = false
  233. n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
  234. if err != nil {
  235. return total, err
  236. }
  237. total += n
  238. }
  239. r.DoneHeader.Done = true
  240. n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
  241. if err != nil {
  242. return total, err
  243. }
  244. total += n
  245. return total, nil
  246. }
  247. func (r *multiRequest) Decode(buf []byte) (int, error) {
  248. r.Ops = make([]multiRequestOp, 0)
  249. r.DoneHeader = multiHeader{-1, true, -1}
  250. total := 0
  251. for {
  252. header := &multiHeader{}
  253. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  254. if err != nil {
  255. return total, err
  256. }
  257. total += n
  258. if header.Done {
  259. r.DoneHeader = *header
  260. break
  261. }
  262. req := requestStructForOp(header.Type)
  263. if req == nil {
  264. return total, ErrAPIError
  265. }
  266. n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
  267. if err != nil {
  268. return total, err
  269. }
  270. total += n
  271. r.Ops = append(r.Ops, multiRequestOp{*header, req})
  272. }
  273. return total, nil
  274. }
  275. func (r *multiResponse) Decode(buf []byte) (int, error) {
  276. var multiErr error
  277. r.Ops = make([]multiResponseOp, 0)
  278. r.DoneHeader = multiHeader{-1, true, -1}
  279. total := 0
  280. for {
  281. header := &multiHeader{}
  282. n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
  283. if err != nil {
  284. return total, err
  285. }
  286. total += n
  287. if header.Done {
  288. r.DoneHeader = *header
  289. break
  290. }
  291. res := multiResponseOp{Header: *header}
  292. var w reflect.Value
  293. switch header.Type {
  294. default:
  295. return total, ErrAPIError
  296. case opError:
  297. w = reflect.ValueOf(&res.Err)
  298. case opCreate:
  299. w = reflect.ValueOf(&res.String)
  300. case opSetData:
  301. res.Stat = new(Stat)
  302. w = reflect.ValueOf(res.Stat)
  303. case opCheck, opDelete:
  304. }
  305. if w.IsValid() {
  306. n, err := decodePacketValue(buf[total:], w)
  307. if err != nil {
  308. return total, err
  309. }
  310. total += n
  311. }
  312. r.Ops = append(r.Ops, res)
  313. if multiErr == nil && res.Err != errOk {
  314. // Use the first error as the error returned from Multi().
  315. multiErr = res.Err.toError()
  316. }
  317. }
  318. return total, multiErr
  319. }
  320. type watcherEvent struct {
  321. Type EventType
  322. State State
  323. Path string
  324. }
  325. type decoder interface {
  326. Decode(buf []byte) (int, error)
  327. }
  328. type encoder interface {
  329. Encode(buf []byte) (int, error)
  330. }
  331. func decodePacket(buf []byte, st interface{}) (n int, err error) {
  332. defer func() {
  333. if r := recover(); r != nil {
  334. if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
  335. err = ErrShortBuffer
  336. } else {
  337. panic(r)
  338. }
  339. }
  340. }()
  341. v := reflect.ValueOf(st)
  342. if v.Kind() != reflect.Ptr || v.IsNil() {
  343. return 0, ErrPtrExpected
  344. }
  345. return decodePacketValue(buf, v)
  346. }
  347. func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
  348. rv := v
  349. kind := v.Kind()
  350. if kind == reflect.Ptr {
  351. if v.IsNil() {
  352. v.Set(reflect.New(v.Type().Elem()))
  353. }
  354. v = v.Elem()
  355. kind = v.Kind()
  356. }
  357. n := 0
  358. switch kind {
  359. default:
  360. return n, ErrUnhandledFieldType
  361. case reflect.Struct:
  362. if de, ok := rv.Interface().(decoder); ok {
  363. return de.Decode(buf)
  364. } else if de, ok := v.Interface().(decoder); ok {
  365. return de.Decode(buf)
  366. } else {
  367. for i := 0; i < v.NumField(); i++ {
  368. field := v.Field(i)
  369. n2, err := decodePacketValue(buf[n:], field)
  370. n += n2
  371. if err != nil {
  372. return n, err
  373. }
  374. }
  375. }
  376. case reflect.Bool:
  377. v.SetBool(buf[n] != 0)
  378. n++
  379. case reflect.Int32:
  380. v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
  381. n += 4
  382. case reflect.Int64:
  383. v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
  384. n += 8
  385. case reflect.String:
  386. ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
  387. v.SetString(string(buf[n+4 : n+4+ln]))
  388. n += 4 + ln
  389. case reflect.Slice:
  390. switch v.Type().Elem().Kind() {
  391. default:
  392. count := int(binary.BigEndian.Uint32(buf[n : n+4]))
  393. n += 4
  394. values := reflect.MakeSlice(v.Type(), count, count)
  395. v.Set(values)
  396. for i := 0; i < count; i++ {
  397. n2, err := decodePacketValue(buf[n:], values.Index(i))
  398. n += n2
  399. if err != nil {
  400. return n, err
  401. }
  402. }
  403. case reflect.Uint8:
  404. ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
  405. if ln < 0 {
  406. n += 4
  407. v.SetBytes(nil)
  408. } else {
  409. bytes := make([]byte, ln)
  410. copy(bytes, buf[n+4:n+4+ln])
  411. v.SetBytes(bytes)
  412. n += 4 + ln
  413. }
  414. }
  415. }
  416. return n, nil
  417. }
  418. func encodePacket(buf []byte, st interface{}) (n int, err error) {
  419. defer func() {
  420. if r := recover(); r != nil {
  421. if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
  422. err = ErrShortBuffer
  423. } else {
  424. panic(r)
  425. }
  426. }
  427. }()
  428. v := reflect.ValueOf(st)
  429. if v.Kind() != reflect.Ptr || v.IsNil() {
  430. return 0, ErrPtrExpected
  431. }
  432. return encodePacketValue(buf, v)
  433. }
  434. func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
  435. rv := v
  436. for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
  437. v = v.Elem()
  438. }
  439. n := 0
  440. switch v.Kind() {
  441. default:
  442. return n, ErrUnhandledFieldType
  443. case reflect.Struct:
  444. if en, ok := rv.Interface().(encoder); ok {
  445. return en.Encode(buf)
  446. } else if en, ok := v.Interface().(encoder); ok {
  447. return en.Encode(buf)
  448. } else {
  449. for i := 0; i < v.NumField(); i++ {
  450. field := v.Field(i)
  451. n2, err := encodePacketValue(buf[n:], field)
  452. n += n2
  453. if err != nil {
  454. return n, err
  455. }
  456. }
  457. }
  458. case reflect.Bool:
  459. if v.Bool() {
  460. buf[n] = 1
  461. } else {
  462. buf[n] = 0
  463. }
  464. n++
  465. case reflect.Int32:
  466. binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
  467. n += 4
  468. case reflect.Int64:
  469. binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
  470. n += 8
  471. case reflect.String:
  472. str := v.String()
  473. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
  474. copy(buf[n+4:n+4+len(str)], []byte(str))
  475. n += 4 + len(str)
  476. case reflect.Slice:
  477. switch v.Type().Elem().Kind() {
  478. default:
  479. count := v.Len()
  480. startN := n
  481. n += 4
  482. for i := 0; i < count; i++ {
  483. n2, err := encodePacketValue(buf[n:], v.Index(i))
  484. n += n2
  485. if err != nil {
  486. return n, err
  487. }
  488. }
  489. binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
  490. case reflect.Uint8:
  491. if v.IsNil() {
  492. binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
  493. n += 4
  494. } else {
  495. bytes := v.Bytes()
  496. binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
  497. copy(buf[n+4:n+4+len(bytes)], bytes)
  498. n += 4 + len(bytes)
  499. }
  500. }
  501. }
  502. return n, nil
  503. }
  504. func requestStructForOp(op int32) interface{} {
  505. switch op {
  506. case opClose:
  507. return &closeRequest{}
  508. case opCreate:
  509. return &CreateRequest{}
  510. case opDelete:
  511. return &DeleteRequest{}
  512. case opExists:
  513. return &existsRequest{}
  514. case opGetAcl:
  515. return &getAclRequest{}
  516. case opGetChildren:
  517. return &getChildrenRequest{}
  518. case opGetChildren2:
  519. return &getChildren2Request{}
  520. case opGetData:
  521. return &getDataRequest{}
  522. case opPing:
  523. return &pingRequest{}
  524. case opSetAcl:
  525. return &setAclRequest{}
  526. case opSetData:
  527. return &SetDataRequest{}
  528. case opSetWatches:
  529. return &setWatchesRequest{}
  530. case opSync:
  531. return &syncRequest{}
  532. case opSetAuth:
  533. return &setAuthRequest{}
  534. case opCheck:
  535. return &CheckVersionRequest{}
  536. case opMulti:
  537. return &multiRequest{}
  538. }
  539. return nil
  540. }