123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668 |
- /*
- *
- * Copyright 2018 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- // Package channelz defines APIs for enabling channelz service, entry
- // registration/deletion, and accessing channelz data. It also defines channelz
- // metric struct formats.
- //
- // All APIs in this package are experimental.
- package channelz
- import (
- "sort"
- "sync"
- "sync/atomic"
- "time"
- "google.golang.org/grpc/grpclog"
- )
- const (
- defaultMaxTraceEntry int32 = 30
- )
- var (
- db dbWrapper
- idGen idGenerator
- // EntryPerPage defines the number of channelz entries to be shown on a web page.
- EntryPerPage = 50
- curState int32
- maxTraceEntry = defaultMaxTraceEntry
- )
- // TurnOn turns on channelz data collection.
- func TurnOn() {
- if !IsOn() {
- NewChannelzStorage()
- atomic.StoreInt32(&curState, 1)
- }
- }
- // IsOn returns whether channelz data collection is on.
- func IsOn() bool {
- return atomic.CompareAndSwapInt32(&curState, 1, 1)
- }
- // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
- // Setting it to 0 will disable channel tracing.
- func SetMaxTraceEntry(i int32) {
- atomic.StoreInt32(&maxTraceEntry, i)
- }
- // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
- func ResetMaxTraceEntryToDefault() {
- atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
- }
- func getMaxTraceEntry() int {
- i := atomic.LoadInt32(&maxTraceEntry)
- return int(i)
- }
- // dbWarpper wraps around a reference to internal channelz data storage, and
- // provide synchronized functionality to set and get the reference.
- type dbWrapper struct {
- mu sync.RWMutex
- DB *channelMap
- }
- func (d *dbWrapper) set(db *channelMap) {
- d.mu.Lock()
- d.DB = db
- d.mu.Unlock()
- }
- func (d *dbWrapper) get() *channelMap {
- d.mu.RLock()
- defer d.mu.RUnlock()
- return d.DB
- }
- // NewChannelzStorage initializes channelz data storage and id generator.
- //
- // Note: This function is exported for testing purpose only. User should not call
- // it in most cases.
- func NewChannelzStorage() {
- db.set(&channelMap{
- topLevelChannels: make(map[int64]struct{}),
- channels: make(map[int64]*channel),
- listenSockets: make(map[int64]*listenSocket),
- normalSockets: make(map[int64]*normalSocket),
- servers: make(map[int64]*server),
- subChannels: make(map[int64]*subChannel),
- })
- idGen.reset()
- }
- // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
- // boolean indicating whether there's more top channels to be queried for.
- //
- // The arg id specifies that only top channel with id at or above it will be included
- // in the result. The returned slice is up to a length of EntryPerPage, and is
- // sorted in ascending id order.
- func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
- return db.get().GetTopChannels(id)
- }
- // GetServers returns a slice of server's ServerMetric, along with a
- // boolean indicating whether there's more servers to be queried for.
- //
- // The arg id specifies that only server with id at or above it will be included
- // in the result. The returned slice is up to a length of EntryPerPage, and is
- // sorted in ascending id order.
- func GetServers(id int64) ([]*ServerMetric, bool) {
- return db.get().GetServers(id)
- }
- // GetServerSockets returns a slice of server's (identified by id) normal socket's
- // SocketMetric, along with a boolean indicating whether there's more sockets to
- // be queried for.
- //
- // The arg startID specifies that only sockets with id at or above it will be
- // included in the result. The returned slice is up to a length of EntryPerPage,
- // and is sorted in ascending id order.
- func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
- return db.get().GetServerSockets(id, startID)
- }
- // GetChannel returns the ChannelMetric for the channel (identified by id).
- func GetChannel(id int64) *ChannelMetric {
- return db.get().GetChannel(id)
- }
- // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
- func GetSubChannel(id int64) *SubChannelMetric {
- return db.get().GetSubChannel(id)
- }
- // GetSocket returns the SocketInternalMetric for the socket (identified by id).
- func GetSocket(id int64) *SocketMetric {
- return db.get().GetSocket(id)
- }
- // RegisterChannel registers the given channel c in channelz database with ref
- // as its reference name, and add it to the child list of its parent (identified
- // by pid). pid = 0 means no parent. It returns the unique channelz tracking id
- // assigned to this channel.
- func RegisterChannel(c Channel, pid int64, ref string) int64 {
- id := idGen.genID()
- cn := &channel{
- refName: ref,
- c: c,
- subChans: make(map[int64]string),
- nestedChans: make(map[int64]string),
- id: id,
- pid: pid,
- trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
- }
- if pid == 0 {
- db.get().addChannel(id, cn, true, pid, ref)
- } else {
- db.get().addChannel(id, cn, false, pid, ref)
- }
- return id
- }
- // RegisterSubChannel registers the given channel c in channelz database with ref
- // as its reference name, and add it to the child list of its parent (identified
- // by pid). It returns the unique channelz tracking id assigned to this subchannel.
- func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
- if pid == 0 {
- grpclog.Error("a SubChannel's parent id cannot be 0")
- return 0
- }
- id := idGen.genID()
- sc := &subChannel{
- refName: ref,
- c: c,
- sockets: make(map[int64]string),
- id: id,
- pid: pid,
- trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
- }
- db.get().addSubChannel(id, sc, pid, ref)
- return id
- }
- // RegisterServer registers the given server s in channelz database. It returns
- // the unique channelz tracking id assigned to this server.
- func RegisterServer(s Server, ref string) int64 {
- id := idGen.genID()
- svr := &server{
- refName: ref,
- s: s,
- sockets: make(map[int64]string),
- listenSockets: make(map[int64]string),
- id: id,
- }
- db.get().addServer(id, svr)
- return id
- }
- // RegisterListenSocket registers the given listen socket s in channelz database
- // with ref as its reference name, and add it to the child list of its parent
- // (identified by pid). It returns the unique channelz tracking id assigned to
- // this listen socket.
- func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
- if pid == 0 {
- grpclog.Error("a ListenSocket's parent id cannot be 0")
- return 0
- }
- id := idGen.genID()
- ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
- db.get().addListenSocket(id, ls, pid, ref)
- return id
- }
- // RegisterNormalSocket registers the given normal socket s in channelz database
- // with ref as its reference name, and add it to the child list of its parent
- // (identified by pid). It returns the unique channelz tracking id assigned to
- // this normal socket.
- func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
- if pid == 0 {
- grpclog.Error("a NormalSocket's parent id cannot be 0")
- return 0
- }
- id := idGen.genID()
- ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
- db.get().addNormalSocket(id, ns, pid, ref)
- return id
- }
- // RemoveEntry removes an entry with unique channelz trakcing id to be id from
- // channelz database.
- func RemoveEntry(id int64) {
- db.get().removeEntry(id)
- }
- // TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
- // to the channel trace.
- // The Parent field is optional. It is used for event that will be recorded in the entity's parent
- // trace also.
- type TraceEventDesc struct {
- Desc string
- Severity Severity
- Parent *TraceEventDesc
- }
- // AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
- func AddTraceEvent(id int64, desc *TraceEventDesc) {
- if getMaxTraceEntry() == 0 {
- return
- }
- db.get().traceEvent(id, desc)
- }
- // channelMap is the storage data structure for channelz.
- // Methods of channelMap can be divided in two two categories with respect to locking.
- // 1. Methods acquire the global lock.
- // 2. Methods that can only be called when global lock is held.
- // A second type of method need always to be called inside a first type of method.
- type channelMap struct {
- mu sync.RWMutex
- topLevelChannels map[int64]struct{}
- servers map[int64]*server
- channels map[int64]*channel
- subChannels map[int64]*subChannel
- listenSockets map[int64]*listenSocket
- normalSockets map[int64]*normalSocket
- }
- func (c *channelMap) addServer(id int64, s *server) {
- c.mu.Lock()
- s.cm = c
- c.servers[id] = s
- c.mu.Unlock()
- }
- func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
- c.mu.Lock()
- cn.cm = c
- cn.trace.cm = c
- c.channels[id] = cn
- if isTopChannel {
- c.topLevelChannels[id] = struct{}{}
- } else {
- c.findEntry(pid).addChild(id, cn)
- }
- c.mu.Unlock()
- }
- func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
- c.mu.Lock()
- sc.cm = c
- sc.trace.cm = c
- c.subChannels[id] = sc
- c.findEntry(pid).addChild(id, sc)
- c.mu.Unlock()
- }
- func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
- c.mu.Lock()
- ls.cm = c
- c.listenSockets[id] = ls
- c.findEntry(pid).addChild(id, ls)
- c.mu.Unlock()
- }
- func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
- c.mu.Lock()
- ns.cm = c
- c.normalSockets[id] = ns
- c.findEntry(pid).addChild(id, ns)
- c.mu.Unlock()
- }
- // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
- // wait on the deletion of its children and until no other entity's channel trace references it.
- // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
- // shutting down server will lead to the server being also deleted.
- func (c *channelMap) removeEntry(id int64) {
- c.mu.Lock()
- c.findEntry(id).triggerDelete()
- c.mu.Unlock()
- }
- // c.mu must be held by the caller
- func (c *channelMap) decrTraceRefCount(id int64) {
- e := c.findEntry(id)
- if v, ok := e.(tracedChannel); ok {
- v.decrTraceRefCount()
- e.deleteSelfIfReady()
- }
- }
- // c.mu must be held by the caller.
- func (c *channelMap) findEntry(id int64) entry {
- var v entry
- var ok bool
- if v, ok = c.channels[id]; ok {
- return v
- }
- if v, ok = c.subChannels[id]; ok {
- return v
- }
- if v, ok = c.servers[id]; ok {
- return v
- }
- if v, ok = c.listenSockets[id]; ok {
- return v
- }
- if v, ok = c.normalSockets[id]; ok {
- return v
- }
- return &dummyEntry{idNotFound: id}
- }
- // c.mu must be held by the caller
- // deleteEntry simply deletes an entry from the channelMap. Before calling this
- // method, caller must check this entry is ready to be deleted, i.e removeEntry()
- // has been called on it, and no children still exist.
- // Conditionals are ordered by the expected frequency of deletion of each entity
- // type, in order to optimize performance.
- func (c *channelMap) deleteEntry(id int64) {
- var ok bool
- if _, ok = c.normalSockets[id]; ok {
- delete(c.normalSockets, id)
- return
- }
- if _, ok = c.subChannels[id]; ok {
- delete(c.subChannels, id)
- return
- }
- if _, ok = c.channels[id]; ok {
- delete(c.channels, id)
- delete(c.topLevelChannels, id)
- return
- }
- if _, ok = c.listenSockets[id]; ok {
- delete(c.listenSockets, id)
- return
- }
- if _, ok = c.servers[id]; ok {
- delete(c.servers, id)
- return
- }
- }
- func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
- c.mu.Lock()
- child := c.findEntry(id)
- childTC, ok := child.(tracedChannel)
- if !ok {
- c.mu.Unlock()
- return
- }
- childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
- if desc.Parent != nil {
- parent := c.findEntry(child.getParentID())
- var chanType RefChannelType
- switch child.(type) {
- case *channel:
- chanType = RefChannel
- case *subChannel:
- chanType = RefSubChannel
- }
- if parentTC, ok := parent.(tracedChannel); ok {
- parentTC.getChannelTrace().append(&TraceEvent{
- Desc: desc.Parent.Desc,
- Severity: desc.Parent.Severity,
- Timestamp: time.Now(),
- RefID: id,
- RefName: childTC.getRefName(),
- RefType: chanType,
- })
- childTC.incrTraceRefCount()
- }
- }
- c.mu.Unlock()
- }
- type int64Slice []int64
- func (s int64Slice) Len() int { return len(s) }
- func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
- func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
- func copyMap(m map[int64]string) map[int64]string {
- n := make(map[int64]string)
- for k, v := range m {
- n[k] = v
- }
- return n
- }
- func min(a, b int) int {
- if a < b {
- return a
- }
- return b
- }
- func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
- c.mu.RLock()
- l := len(c.topLevelChannels)
- ids := make([]int64, 0, l)
- cns := make([]*channel, 0, min(l, EntryPerPage))
- for k := range c.topLevelChannels {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
- var end bool
- var t []*ChannelMetric
- for i, v := range ids[idx:] {
- if count == EntryPerPage {
- break
- }
- if cn, ok := c.channels[v]; ok {
- cns = append(cns, cn)
- t = append(t, &ChannelMetric{
- NestedChans: copyMap(cn.nestedChans),
- SubChans: copyMap(cn.subChans),
- })
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
- for i, cn := range cns {
- t[i].ChannelData = cn.c.ChannelzMetric()
- t[i].ID = cn.id
- t[i].RefName = cn.refName
- t[i].Trace = cn.trace.dumpData()
- }
- return t, end
- }
- func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
- c.mu.RLock()
- l := len(c.servers)
- ids := make([]int64, 0, l)
- ss := make([]*server, 0, min(l, EntryPerPage))
- for k := range c.servers {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
- count := 0
- var end bool
- var s []*ServerMetric
- for i, v := range ids[idx:] {
- if count == EntryPerPage {
- break
- }
- if svr, ok := c.servers[v]; ok {
- ss = append(ss, svr)
- s = append(s, &ServerMetric{
- ListenSockets: copyMap(svr.listenSockets),
- })
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
- for i, svr := range ss {
- s[i].ServerData = svr.s.ChannelzMetric()
- s[i].ID = svr.id
- s[i].RefName = svr.refName
- }
- return s, end
- }
- func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
- var svr *server
- var ok bool
- c.mu.RLock()
- if svr, ok = c.servers[id]; !ok {
- // server with id doesn't exist.
- c.mu.RUnlock()
- return nil, true
- }
- svrskts := svr.sockets
- l := len(svrskts)
- ids := make([]int64, 0, l)
- sks := make([]*normalSocket, 0, min(l, EntryPerPage))
- for k := range svrskts {
- ids = append(ids, k)
- }
- sort.Sort(int64Slice(ids))
- idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
- count := 0
- var end bool
- for i, v := range ids[idx:] {
- if count == EntryPerPage {
- break
- }
- if ns, ok := c.normalSockets[v]; ok {
- sks = append(sks, ns)
- count++
- }
- if i == len(ids[idx:])-1 {
- end = true
- break
- }
- }
- c.mu.RUnlock()
- if count == 0 {
- end = true
- }
- var s []*SocketMetric
- for _, ns := range sks {
- sm := &SocketMetric{}
- sm.SocketData = ns.s.ChannelzMetric()
- sm.ID = ns.id
- sm.RefName = ns.refName
- s = append(s, sm)
- }
- return s, end
- }
- func (c *channelMap) GetChannel(id int64) *ChannelMetric {
- cm := &ChannelMetric{}
- var cn *channel
- var ok bool
- c.mu.RLock()
- if cn, ok = c.channels[id]; !ok {
- // channel with id doesn't exist.
- c.mu.RUnlock()
- return nil
- }
- cm.NestedChans = copyMap(cn.nestedChans)
- cm.SubChans = copyMap(cn.subChans)
- // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
- // holding the lock to prevent potential data race.
- chanCopy := cn.c
- c.mu.RUnlock()
- cm.ChannelData = chanCopy.ChannelzMetric()
- cm.ID = cn.id
- cm.RefName = cn.refName
- cm.Trace = cn.trace.dumpData()
- return cm
- }
- func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
- cm := &SubChannelMetric{}
- var sc *subChannel
- var ok bool
- c.mu.RLock()
- if sc, ok = c.subChannels[id]; !ok {
- // subchannel with id doesn't exist.
- c.mu.RUnlock()
- return nil
- }
- cm.Sockets = copyMap(sc.sockets)
- // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
- // holding the lock to prevent potential data race.
- chanCopy := sc.c
- c.mu.RUnlock()
- cm.ChannelData = chanCopy.ChannelzMetric()
- cm.ID = sc.id
- cm.RefName = sc.refName
- cm.Trace = sc.trace.dumpData()
- return cm
- }
- func (c *channelMap) GetSocket(id int64) *SocketMetric {
- sm := &SocketMetric{}
- c.mu.RLock()
- if ls, ok := c.listenSockets[id]; ok {
- c.mu.RUnlock()
- sm.SocketData = ls.s.ChannelzMetric()
- sm.ID = ls.id
- sm.RefName = ls.refName
- return sm
- }
- if ns, ok := c.normalSockets[id]; ok {
- c.mu.RUnlock()
- sm.SocketData = ns.s.ChannelzMetric()
- sm.ID = ns.id
- sm.RefName = ns.refName
- return sm
- }
- c.mu.RUnlock()
- return nil
- }
- type idGenerator struct {
- id int64
- }
- func (i *idGenerator) reset() {
- atomic.StoreInt64(&i.id, 0)
- }
- func (i *idGenerator) genID() int64 {
- return atomic.AddInt64(&i.id, 1)
- }
|