service.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. //go:generate ./regenerate.sh
  19. // Package service provides an implementation for channelz service server.
  20. package service
  21. import (
  22. "context"
  23. "net"
  24. "time"
  25. "github.com/golang/protobuf/ptypes"
  26. durpb "github.com/golang/protobuf/ptypes/duration"
  27. wrpb "github.com/golang/protobuf/ptypes/wrappers"
  28. "google.golang.org/grpc"
  29. channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
  30. channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/connectivity"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/status"
  36. )
  37. func init() {
  38. channelz.TurnOn()
  39. }
  40. func convertToPtypesDuration(sec int64, usec int64) *durpb.Duration {
  41. return ptypes.DurationProto(time.Duration(sec*1e9 + usec*1e3))
  42. }
  43. // RegisterChannelzServiceToServer registers the channelz service to the given server.
  44. func RegisterChannelzServiceToServer(s *grpc.Server) {
  45. channelzgrpc.RegisterChannelzServer(s, newCZServer())
  46. }
  47. func newCZServer() channelzgrpc.ChannelzServer {
  48. return &serverImpl{}
  49. }
  50. type serverImpl struct{}
  51. func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
  52. switch s {
  53. case connectivity.Idle:
  54. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
  55. case connectivity.Connecting:
  56. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
  57. case connectivity.Ready:
  58. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
  59. case connectivity.TransientFailure:
  60. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
  61. case connectivity.Shutdown:
  62. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
  63. default:
  64. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
  65. }
  66. }
  67. func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
  68. pbt := &channelzpb.ChannelTrace{}
  69. pbt.NumEventsLogged = ct.EventNum
  70. if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
  71. pbt.CreationTimestamp = ts
  72. }
  73. var events []*channelzpb.ChannelTraceEvent
  74. for _, e := range ct.Events {
  75. cte := &channelzpb.ChannelTraceEvent{
  76. Description: e.Desc,
  77. Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity),
  78. }
  79. if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
  80. cte.Timestamp = ts
  81. }
  82. if e.RefID != 0 {
  83. switch e.RefType {
  84. case channelz.RefChannel:
  85. cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
  86. case channelz.RefSubChannel:
  87. cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
  88. }
  89. }
  90. events = append(events, cte)
  91. }
  92. pbt.Events = events
  93. return pbt
  94. }
  95. func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
  96. c := &channelzpb.Channel{}
  97. c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
  98. c.Data = &channelzpb.ChannelData{
  99. State: connectivityStateToProto(cm.ChannelData.State),
  100. Target: cm.ChannelData.Target,
  101. CallsStarted: cm.ChannelData.CallsStarted,
  102. CallsSucceeded: cm.ChannelData.CallsSucceeded,
  103. CallsFailed: cm.ChannelData.CallsFailed,
  104. }
  105. if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
  106. c.Data.LastCallStartedTimestamp = ts
  107. }
  108. nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
  109. for id, ref := range cm.NestedChans {
  110. nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
  111. }
  112. c.ChannelRef = nestedChans
  113. subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
  114. for id, ref := range cm.SubChans {
  115. subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
  116. }
  117. c.SubchannelRef = subChans
  118. sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
  119. for id, ref := range cm.Sockets {
  120. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  121. }
  122. c.SocketRef = sockets
  123. c.Data.Trace = channelTraceToProto(cm.Trace)
  124. return c
  125. }
  126. func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
  127. sc := &channelzpb.Subchannel{}
  128. sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
  129. sc.Data = &channelzpb.ChannelData{
  130. State: connectivityStateToProto(cm.ChannelData.State),
  131. Target: cm.ChannelData.Target,
  132. CallsStarted: cm.ChannelData.CallsStarted,
  133. CallsSucceeded: cm.ChannelData.CallsSucceeded,
  134. CallsFailed: cm.ChannelData.CallsFailed,
  135. }
  136. if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
  137. sc.Data.LastCallStartedTimestamp = ts
  138. }
  139. nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
  140. for id, ref := range cm.NestedChans {
  141. nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
  142. }
  143. sc.ChannelRef = nestedChans
  144. subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
  145. for id, ref := range cm.SubChans {
  146. subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
  147. }
  148. sc.SubchannelRef = subChans
  149. sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
  150. for id, ref := range cm.Sockets {
  151. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  152. }
  153. sc.SocketRef = sockets
  154. sc.Data.Trace = channelTraceToProto(cm.Trace)
  155. return sc
  156. }
  157. func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
  158. switch v := se.(type) {
  159. case *credentials.TLSChannelzSecurityValue:
  160. return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
  161. CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
  162. LocalCertificate: v.LocalCertificate,
  163. RemoteCertificate: v.RemoteCertificate,
  164. }}}
  165. case *credentials.OtherChannelzSecurityValue:
  166. otherSecurity := &channelzpb.Security_OtherSecurity{
  167. Name: v.Name,
  168. }
  169. if anyval, err := ptypes.MarshalAny(v.Value); err == nil {
  170. otherSecurity.Value = anyval
  171. }
  172. return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
  173. }
  174. return nil
  175. }
  176. func addrToProto(a net.Addr) *channelzpb.Address {
  177. switch a.Network() {
  178. case "udp":
  179. // TODO: Address_OtherAddress{}. Need proto def for Value.
  180. case "ip":
  181. // Note zone info is discarded through the conversion.
  182. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
  183. case "ip+net":
  184. // Note mask info is discarded through the conversion.
  185. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
  186. case "tcp":
  187. // Note zone info is discarded through the conversion.
  188. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
  189. case "unix", "unixgram", "unixpacket":
  190. return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
  191. default:
  192. }
  193. return &channelzpb.Address{}
  194. }
  195. func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
  196. s := &channelzpb.Socket{}
  197. s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
  198. s.Data = &channelzpb.SocketData{
  199. StreamsStarted: sm.SocketData.StreamsStarted,
  200. StreamsSucceeded: sm.SocketData.StreamsSucceeded,
  201. StreamsFailed: sm.SocketData.StreamsFailed,
  202. MessagesSent: sm.SocketData.MessagesSent,
  203. MessagesReceived: sm.SocketData.MessagesReceived,
  204. KeepAlivesSent: sm.SocketData.KeepAlivesSent,
  205. }
  206. if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
  207. s.Data.LastLocalStreamCreatedTimestamp = ts
  208. }
  209. if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
  210. s.Data.LastRemoteStreamCreatedTimestamp = ts
  211. }
  212. if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
  213. s.Data.LastMessageSentTimestamp = ts
  214. }
  215. if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
  216. s.Data.LastMessageReceivedTimestamp = ts
  217. }
  218. s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
  219. s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
  220. if sm.SocketData.SocketOptions != nil {
  221. s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
  222. }
  223. if sm.SocketData.Security != nil {
  224. s.Security = securityToProto(sm.SocketData.Security)
  225. }
  226. if sm.SocketData.LocalAddr != nil {
  227. s.Local = addrToProto(sm.SocketData.LocalAddr)
  228. }
  229. if sm.SocketData.RemoteAddr != nil {
  230. s.Remote = addrToProto(sm.SocketData.RemoteAddr)
  231. }
  232. s.RemoteName = sm.SocketData.RemoteName
  233. return s
  234. }
  235. func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
  236. metrics, end := channelz.GetTopChannels(req.GetStartChannelId())
  237. resp := &channelzpb.GetTopChannelsResponse{}
  238. for _, m := range metrics {
  239. resp.Channel = append(resp.Channel, channelMetricToProto(m))
  240. }
  241. resp.End = end
  242. return resp, nil
  243. }
  244. func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
  245. s := &channelzpb.Server{}
  246. s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
  247. s.Data = &channelzpb.ServerData{
  248. CallsStarted: sm.ServerData.CallsStarted,
  249. CallsSucceeded: sm.ServerData.CallsSucceeded,
  250. CallsFailed: sm.ServerData.CallsFailed,
  251. }
  252. if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
  253. s.Data.LastCallStartedTimestamp = ts
  254. }
  255. sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
  256. for id, ref := range sm.ListenSockets {
  257. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  258. }
  259. s.ListenSocket = sockets
  260. return s
  261. }
  262. func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
  263. metrics, end := channelz.GetServers(req.GetStartServerId())
  264. resp := &channelzpb.GetServersResponse{}
  265. for _, m := range metrics {
  266. resp.Server = append(resp.Server, serverMetricToProto(m))
  267. }
  268. resp.End = end
  269. return resp, nil
  270. }
  271. func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
  272. metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId())
  273. resp := &channelzpb.GetServerSocketsResponse{}
  274. for _, m := range metrics {
  275. resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
  276. }
  277. resp.End = end
  278. return resp, nil
  279. }
  280. func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
  281. var metric *channelz.ChannelMetric
  282. if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
  283. return &channelzpb.GetChannelResponse{}, nil
  284. }
  285. resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
  286. return resp, nil
  287. }
  288. func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
  289. var metric *channelz.SubChannelMetric
  290. if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
  291. return &channelzpb.GetSubchannelResponse{}, nil
  292. }
  293. resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
  294. return resp, nil
  295. }
  296. func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
  297. var metric *channelz.SocketMetric
  298. if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
  299. return &channelzpb.GetSocketResponse{}, nil
  300. }
  301. resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
  302. return resp, nil
  303. }
  304. func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) {
  305. return nil, status.Error(codes.Unimplemented, "GetServer not implemented")
  306. }