123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609 |
- package zk
- import (
- "encoding/binary"
- "errors"
- "log"
- "reflect"
- "runtime"
- "time"
- )
- var (
- ErrUnhandledFieldType = errors.New("zk: unhandled field type")
- ErrPtrExpected = errors.New("zk: encode/decode expect a non-nil pointer to struct")
- ErrShortBuffer = errors.New("zk: buffer too small")
- )
- type defaultLogger struct{}
- func (defaultLogger) Printf(format string, a ...interface{}) {
- log.Printf(format, a...)
- }
- type ACL struct {
- Perms int32
- Scheme string
- ID string
- }
- type Stat struct {
- Czxid int64 // The zxid of the change that caused this znode to be created.
- Mzxid int64 // The zxid of the change that last modified this znode.
- Ctime int64 // The time in milliseconds from epoch when this znode was created.
- Mtime int64 // The time in milliseconds from epoch when this znode was last modified.
- Version int32 // The number of changes to the data of this znode.
- Cversion int32 // The number of changes to the children of this znode.
- Aversion int32 // The number of changes to the ACL of this znode.
- EphemeralOwner int64 // The session id of the owner of this znode if the znode is an ephemeral node. If it is not an ephemeral node, it will be zero.
- DataLength int32 // The length of the data field of this znode.
- NumChildren int32 // The number of children of this znode.
- Pzxid int64 // last modified children
- }
- // ServerClient is the information for a single Zookeeper client and its session.
- // This is used to parse/extract the output fo the `cons` command.
- type ServerClient struct {
- Queued int64
- Received int64
- Sent int64
- SessionID int64
- Lcxid int64
- Lzxid int64
- Timeout int32
- LastLatency int32
- MinLatency int32
- AvgLatency int32
- MaxLatency int32
- Established time.Time
- LastResponse time.Time
- Addr string
- LastOperation string // maybe?
- Error error
- }
- // ServerClients is a struct for the FLWCons() function. It's used to provide
- // the list of Clients.
- //
- // This is needed because FLWCons() takes multiple servers.
- type ServerClients struct {
- Clients []*ServerClient
- Error error
- }
- // ServerStats is the information pulled from the Zookeeper `stat` command.
- type ServerStats struct {
- Sent int64
- Received int64
- NodeCount int64
- MinLatency int64
- AvgLatency int64
- MaxLatency int64
- Connections int64
- Outstanding int64
- Epoch int32
- Counter int32
- BuildTime time.Time
- Mode Mode
- Version string
- Error error
- }
- type requestHeader struct {
- Xid int32
- Opcode int32
- }
- type responseHeader struct {
- Xid int32
- Zxid int64
- Err ErrCode
- }
- type multiHeader struct {
- Type int32
- Done bool
- Err ErrCode
- }
- type auth struct {
- Type int32
- Scheme string
- Auth []byte
- }
- // Generic request structs
- type pathRequest struct {
- Path string
- }
- type PathVersionRequest struct {
- Path string
- Version int32
- }
- type pathWatchRequest struct {
- Path string
- Watch bool
- }
- type pathResponse struct {
- Path string
- }
- type statResponse struct {
- Stat Stat
- }
- //
- type CheckVersionRequest PathVersionRequest
- type closeRequest struct{}
- type closeResponse struct{}
- type connectRequest struct {
- ProtocolVersion int32
- LastZxidSeen int64
- TimeOut int32
- SessionID int64
- Passwd []byte
- }
- type connectResponse struct {
- ProtocolVersion int32
- TimeOut int32
- SessionID int64
- Passwd []byte
- }
- type CreateRequest struct {
- Path string
- Data []byte
- Acl []ACL
- Flags int32
- }
- type createResponse pathResponse
- type DeleteRequest PathVersionRequest
- type deleteResponse struct{}
- type errorResponse struct {
- Err int32
- }
- type existsRequest pathWatchRequest
- type existsResponse statResponse
- type getAclRequest pathRequest
- type getAclResponse struct {
- Acl []ACL
- Stat Stat
- }
- type getChildrenRequest pathRequest
- type getChildrenResponse struct {
- Children []string
- }
- type getChildren2Request pathWatchRequest
- type getChildren2Response struct {
- Children []string
- Stat Stat
- }
- type getDataRequest pathWatchRequest
- type getDataResponse struct {
- Data []byte
- Stat Stat
- }
- type getMaxChildrenRequest pathRequest
- type getMaxChildrenResponse struct {
- Max int32
- }
- type getSaslRequest struct {
- Token []byte
- }
- type pingRequest struct{}
- type pingResponse struct{}
- type setAclRequest struct {
- Path string
- Acl []ACL
- Version int32
- }
- type setAclResponse statResponse
- type SetDataRequest struct {
- Path string
- Data []byte
- Version int32
- }
- type setDataResponse statResponse
- type setMaxChildren struct {
- Path string
- Max int32
- }
- type setSaslRequest struct {
- Token string
- }
- type setSaslResponse struct {
- Token string
- }
- type setWatchesRequest struct {
- RelativeZxid int64
- DataWatches []string
- ExistWatches []string
- ChildWatches []string
- }
- type setWatchesResponse struct{}
- type syncRequest pathRequest
- type syncResponse pathResponse
- type setAuthRequest auth
- type setAuthResponse struct{}
- type multiRequestOp struct {
- Header multiHeader
- Op interface{}
- }
- type multiRequest struct {
- Ops []multiRequestOp
- DoneHeader multiHeader
- }
- type multiResponseOp struct {
- Header multiHeader
- String string
- Stat *Stat
- Err ErrCode
- }
- type multiResponse struct {
- Ops []multiResponseOp
- DoneHeader multiHeader
- }
- func (r *multiRequest) Encode(buf []byte) (int, error) {
- total := 0
- for _, op := range r.Ops {
- op.Header.Done = false
- n, err := encodePacketValue(buf[total:], reflect.ValueOf(op))
- if err != nil {
- return total, err
- }
- total += n
- }
- r.DoneHeader.Done = true
- n, err := encodePacketValue(buf[total:], reflect.ValueOf(r.DoneHeader))
- if err != nil {
- return total, err
- }
- total += n
- return total, nil
- }
- func (r *multiRequest) Decode(buf []byte) (int, error) {
- r.Ops = make([]multiRequestOp, 0)
- r.DoneHeader = multiHeader{-1, true, -1}
- total := 0
- for {
- header := &multiHeader{}
- n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
- if err != nil {
- return total, err
- }
- total += n
- if header.Done {
- r.DoneHeader = *header
- break
- }
- req := requestStructForOp(header.Type)
- if req == nil {
- return total, ErrAPIError
- }
- n, err = decodePacketValue(buf[total:], reflect.ValueOf(req))
- if err != nil {
- return total, err
- }
- total += n
- r.Ops = append(r.Ops, multiRequestOp{*header, req})
- }
- return total, nil
- }
- func (r *multiResponse) Decode(buf []byte) (int, error) {
- var multiErr error
- r.Ops = make([]multiResponseOp, 0)
- r.DoneHeader = multiHeader{-1, true, -1}
- total := 0
- for {
- header := &multiHeader{}
- n, err := decodePacketValue(buf[total:], reflect.ValueOf(header))
- if err != nil {
- return total, err
- }
- total += n
- if header.Done {
- r.DoneHeader = *header
- break
- }
- res := multiResponseOp{Header: *header}
- var w reflect.Value
- switch header.Type {
- default:
- return total, ErrAPIError
- case opError:
- w = reflect.ValueOf(&res.Err)
- case opCreate:
- w = reflect.ValueOf(&res.String)
- case opSetData:
- res.Stat = new(Stat)
- w = reflect.ValueOf(res.Stat)
- case opCheck, opDelete:
- }
- if w.IsValid() {
- n, err := decodePacketValue(buf[total:], w)
- if err != nil {
- return total, err
- }
- total += n
- }
- r.Ops = append(r.Ops, res)
- if multiErr == nil && res.Err != errOk {
- // Use the first error as the error returned from Multi().
- multiErr = res.Err.toError()
- }
- }
- return total, multiErr
- }
- type watcherEvent struct {
- Type EventType
- State State
- Path string
- }
- type decoder interface {
- Decode(buf []byte) (int, error)
- }
- type encoder interface {
- Encode(buf []byte) (int, error)
- }
- func decodePacket(buf []byte, st interface{}) (n int, err error) {
- defer func() {
- if r := recover(); r != nil {
- if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
- err = ErrShortBuffer
- } else {
- panic(r)
- }
- }
- }()
- v := reflect.ValueOf(st)
- if v.Kind() != reflect.Ptr || v.IsNil() {
- return 0, ErrPtrExpected
- }
- return decodePacketValue(buf, v)
- }
- func decodePacketValue(buf []byte, v reflect.Value) (int, error) {
- rv := v
- kind := v.Kind()
- if kind == reflect.Ptr {
- if v.IsNil() {
- v.Set(reflect.New(v.Type().Elem()))
- }
- v = v.Elem()
- kind = v.Kind()
- }
- n := 0
- switch kind {
- default:
- return n, ErrUnhandledFieldType
- case reflect.Struct:
- if de, ok := rv.Interface().(decoder); ok {
- return de.Decode(buf)
- } else if de, ok := v.Interface().(decoder); ok {
- return de.Decode(buf)
- } else {
- for i := 0; i < v.NumField(); i++ {
- field := v.Field(i)
- n2, err := decodePacketValue(buf[n:], field)
- n += n2
- if err != nil {
- return n, err
- }
- }
- }
- case reflect.Bool:
- v.SetBool(buf[n] != 0)
- n++
- case reflect.Int32:
- v.SetInt(int64(binary.BigEndian.Uint32(buf[n : n+4])))
- n += 4
- case reflect.Int64:
- v.SetInt(int64(binary.BigEndian.Uint64(buf[n : n+8])))
- n += 8
- case reflect.String:
- ln := int(binary.BigEndian.Uint32(buf[n : n+4]))
- v.SetString(string(buf[n+4 : n+4+ln]))
- n += 4 + ln
- case reflect.Slice:
- switch v.Type().Elem().Kind() {
- default:
- count := int(binary.BigEndian.Uint32(buf[n : n+4]))
- n += 4
- values := reflect.MakeSlice(v.Type(), count, count)
- v.Set(values)
- for i := 0; i < count; i++ {
- n2, err := decodePacketValue(buf[n:], values.Index(i))
- n += n2
- if err != nil {
- return n, err
- }
- }
- case reflect.Uint8:
- ln := int(int32(binary.BigEndian.Uint32(buf[n : n+4])))
- if ln < 0 {
- n += 4
- v.SetBytes(nil)
- } else {
- bytes := make([]byte, ln)
- copy(bytes, buf[n+4:n+4+ln])
- v.SetBytes(bytes)
- n += 4 + ln
- }
- }
- }
- return n, nil
- }
- func encodePacket(buf []byte, st interface{}) (n int, err error) {
- defer func() {
- if r := recover(); r != nil {
- if e, ok := r.(runtime.Error); ok && e.Error() == "runtime error: slice bounds out of range" {
- err = ErrShortBuffer
- } else {
- panic(r)
- }
- }
- }()
- v := reflect.ValueOf(st)
- if v.Kind() != reflect.Ptr || v.IsNil() {
- return 0, ErrPtrExpected
- }
- return encodePacketValue(buf, v)
- }
- func encodePacketValue(buf []byte, v reflect.Value) (int, error) {
- rv := v
- for v.Kind() == reflect.Ptr || v.Kind() == reflect.Interface {
- v = v.Elem()
- }
- n := 0
- switch v.Kind() {
- default:
- return n, ErrUnhandledFieldType
- case reflect.Struct:
- if en, ok := rv.Interface().(encoder); ok {
- return en.Encode(buf)
- } else if en, ok := v.Interface().(encoder); ok {
- return en.Encode(buf)
- } else {
- for i := 0; i < v.NumField(); i++ {
- field := v.Field(i)
- n2, err := encodePacketValue(buf[n:], field)
- n += n2
- if err != nil {
- return n, err
- }
- }
- }
- case reflect.Bool:
- if v.Bool() {
- buf[n] = 1
- } else {
- buf[n] = 0
- }
- n++
- case reflect.Int32:
- binary.BigEndian.PutUint32(buf[n:n+4], uint32(v.Int()))
- n += 4
- case reflect.Int64:
- binary.BigEndian.PutUint64(buf[n:n+8], uint64(v.Int()))
- n += 8
- case reflect.String:
- str := v.String()
- binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(str)))
- copy(buf[n+4:n+4+len(str)], []byte(str))
- n += 4 + len(str)
- case reflect.Slice:
- switch v.Type().Elem().Kind() {
- default:
- count := v.Len()
- startN := n
- n += 4
- for i := 0; i < count; i++ {
- n2, err := encodePacketValue(buf[n:], v.Index(i))
- n += n2
- if err != nil {
- return n, err
- }
- }
- binary.BigEndian.PutUint32(buf[startN:startN+4], uint32(count))
- case reflect.Uint8:
- if v.IsNil() {
- binary.BigEndian.PutUint32(buf[n:n+4], uint32(0xffffffff))
- n += 4
- } else {
- bytes := v.Bytes()
- binary.BigEndian.PutUint32(buf[n:n+4], uint32(len(bytes)))
- copy(buf[n+4:n+4+len(bytes)], bytes)
- n += 4 + len(bytes)
- }
- }
- }
- return n, nil
- }
- func requestStructForOp(op int32) interface{} {
- switch op {
- case opClose:
- return &closeRequest{}
- case opCreate:
- return &CreateRequest{}
- case opDelete:
- return &DeleteRequest{}
- case opExists:
- return &existsRequest{}
- case opGetAcl:
- return &getAclRequest{}
- case opGetChildren:
- return &getChildrenRequest{}
- case opGetChildren2:
- return &getChildren2Request{}
- case opGetData:
- return &getDataRequest{}
- case opPing:
- return &pingRequest{}
- case opSetAcl:
- return &setAclRequest{}
- case opSetData:
- return &SetDataRequest{}
- case opSetWatches:
- return &setWatchesRequest{}
- case opSync:
- return &syncRequest{}
- case opSetAuth:
- return &setAuthRequest{}
- case opCheck:
- return &CheckVersionRequest{}
- case opMulti:
- return &multiRequest{}
- }
- return nil
- }
|