service.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package service
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "strings"
  8. "go-common/app/admin/main/cache/conf"
  9. "go-common/app/admin/main/cache/dao"
  10. "go-common/app/admin/main/cache/model"
  11. "go-common/library/ecode"
  12. "github.com/BurntSushi/toml"
  13. )
  14. // Service struct
  15. type Service struct {
  16. c *conf.Config
  17. dao *dao.Dao
  18. opsMcs []*model.OpsCacheMemcache
  19. opsRds []*model.OpsCacheRedis
  20. }
  21. // New init
  22. func New(c *conf.Config) (s *Service) {
  23. s = &Service{
  24. c: c,
  25. dao: dao.New(c),
  26. }
  27. go s.loadOpsproc()
  28. return s
  29. }
  30. // Ping Service
  31. func (s *Service) Ping(c context.Context) (err error) {
  32. return
  33. }
  34. // Close Service
  35. func (s *Service) Close() {
  36. s.dao.Close()
  37. }
  38. func (s *Service) appids(c context.Context, cookie, appid string) (appids []string, err error) {
  39. msg, err := s.dao.Auth(c, cookie)
  40. if err != nil {
  41. err = ecode.AccessDenied
  42. return
  43. }
  44. tmp, ok := msg["token"]
  45. if !ok {
  46. err = ecode.NothingFound
  47. return
  48. }
  49. token, ok := tmp.(string)
  50. if !ok {
  51. err = ecode.NothingFound
  52. return
  53. }
  54. nodes, err := s.dao.Role(c, token)
  55. if err != nil {
  56. return
  57. }
  58. if appid == "" {
  59. for _, node := range nodes.Data {
  60. appids = append(appids, node.Path)
  61. }
  62. } else {
  63. for _, node := range nodes.Data {
  64. if appid == node.Path {
  65. appids = []string{appid}
  66. break
  67. }
  68. }
  69. }
  70. return
  71. }
  72. func (s *Service) treeid(c context.Context, cookie, appid string) (treeid int64, err error) {
  73. if appid == "" {
  74. err = ecode.AccessDenied
  75. return
  76. }
  77. msg, err := s.dao.Auth(c, cookie)
  78. if err != nil {
  79. err = ecode.AccessDenied
  80. return
  81. }
  82. tmp, ok := msg["token"]
  83. if !ok {
  84. err = ecode.NothingFound
  85. return
  86. }
  87. token, ok := tmp.(string)
  88. if !ok {
  89. err = ecode.NothingFound
  90. return
  91. }
  92. nodes, err := s.dao.Role(c, token)
  93. if err != nil {
  94. return
  95. }
  96. for _, node := range nodes.Data {
  97. if appid == node.Path {
  98. treeid = node.ID
  99. return
  100. }
  101. }
  102. return
  103. }
  104. // Clusters get clusters.
  105. func (s *Service) Clusters(c context.Context, req *model.ClusterReq) (resp *model.ClusterResp, err error) {
  106. appids, err := s.appids(c, req.Cookie, req.AppID)
  107. if err != nil {
  108. err = ecode.AccessDenied
  109. return
  110. }
  111. resp = new(model.ClusterResp)
  112. if len(appids) == 0 {
  113. return
  114. }
  115. if err = s.dao.DB.Where("appids in (?) AND zone=? AND type=?", appids, req.Zone, req.Type).Order("id").Offset((req.PN - 1) * req.PS).Limit(req.PS).Find(&resp.Clusters).Error; err != nil {
  116. return
  117. }
  118. var count int64
  119. s.dao.DB.Model(&model.Cluster{}).Where("appids in (?) AND zone=? AND type=?", appids, req.Zone, req.Type).Count(&count)
  120. resp.Total = count
  121. return
  122. }
  123. // AddCluster add new cluster.
  124. func (s *Service) AddCluster(c context.Context, req *model.AddClusterReq) (resp *model.EmpResp, err error) {
  125. cluster := &model.Cluster{
  126. Name: req.Name,
  127. Type: req.Type,
  128. AppID: req.AppID,
  129. Zone: req.Zone,
  130. HashMethod: req.HashMethod,
  131. HashDistribution: req.HashDistribution,
  132. HashTag: req.HashTag,
  133. DailTimeout: req.DailTimeout,
  134. ReadTimeout: req.ReadTimeout,
  135. WriteTimeout: req.WriteTimeout,
  136. NodeConn: req.NodeConn,
  137. ListenAddr: req.ListenAddr,
  138. ListenProto: req.ListenProto,
  139. PingFailLimit: req.PingFailLimit,
  140. PingAutoEject: req.PingAutoEject,
  141. }
  142. if req.ID == 0 {
  143. err = s.dao.DB.Create(cluster).Error
  144. } else {
  145. cluster.ID = req.ID
  146. s.dao.DB.Save(cluster)
  147. }
  148. return
  149. }
  150. // DelCluster del cluster of req id.
  151. func (s *Service) DelCluster(c context.Context, req *model.DelClusterReq) (resp *model.EmpResp, err error) {
  152. err = s.dao.DB.Exec("DELETE FROM cluster where id= ?", req.ID).Error
  153. if err != nil {
  154. return
  155. }
  156. err = s.dao.DB.Exec("DELETE FROM nodes where cid= ?", req.ID).Error
  157. return
  158. }
  159. // Cluster search cluster by appid or cluster name.
  160. func (s *Service) Cluster(c context.Context, req *model.ClusterReq) (resp []*model.Cluster, err error) {
  161. if req.Type != "" {
  162. err = s.dao.DB.Where("appids=? AND zone=? AND type=?", req.AppID, req.Zone, req.Type).Find(&resp).Error
  163. } else {
  164. err = s.dao.DB.Where("appids=? AND zone=?", req.AppID, req.Zone).Find(&resp).Error
  165. }
  166. if err != nil {
  167. return
  168. }
  169. for _, clu := range resp {
  170. err = s.dao.DB.Where("cid = ?", clu.ID).Find(&clu.Nodes).Error
  171. if err != nil {
  172. return nil, err
  173. }
  174. }
  175. return
  176. }
  177. // ModifyCluster add or del cluster nodes.
  178. func (s *Service) ModifyCluster(c context.Context, req *model.ModifyClusterReq) (resp *model.EmpResp, err error) {
  179. var nodes []*model.NodeDtl
  180. err = json.Unmarshal([]byte(req.Nodes), &nodes)
  181. if err != nil {
  182. return
  183. }
  184. var id = req.ID
  185. if req.Name != "" {
  186. var cluster = &model.Cluster{}
  187. err = s.dao.DB.Where("name = ?", req.Name).First(cluster).Error
  188. if err != nil {
  189. return
  190. }
  191. id = cluster.ID
  192. }
  193. if req.Action == 2 {
  194. var alias []string
  195. for _, ali := range nodes {
  196. alias = append(alias, ali.Alias)
  197. }
  198. //err = s.dao.DB.Delete(&nodes).Error
  199. err = s.dao.DB.Exec("DELETE FROM nodes WHERE alias in (?) ", strings.Join(alias, ",")).Error
  200. return
  201. } else if req.Action == 1 {
  202. // var nodes []*model.NodeDtl
  203. for _, node := range nodes {
  204. node.Cid = id
  205. err = s.dao.DB.Create(node).Error
  206. }
  207. return
  208. }
  209. return
  210. }
  211. // ClusterDtl get cluster detail about nodes info.
  212. func (s *Service) ClusterDtl(c context.Context, req *model.ClusterDtlReq) (resp *model.ClusterDtlResp, err error) {
  213. resp = new(model.ClusterDtlResp)
  214. err = s.dao.DB.Where("cid = ?", req.ID).Find(&resp.Nodes).Error
  215. // TODO(lintanghui):get node info
  216. return
  217. }
  218. // Toml return a toml file of cluster infos.
  219. func (s *Service) Toml(c context.Context, req *model.ClusterReq) (resp []byte, err error) {
  220. clusters, err := s.Cluster(c, req)
  221. if err != nil {
  222. return
  223. }
  224. for _, cluster := range clusters {
  225. for _, node := range cluster.Nodes {
  226. cluster.Servers = append(cluster.Servers, fmt.Sprintf("%s:%d %s", node.Addr, node.Weight, node.Alias))
  227. }
  228. }
  229. buf := bytes.NewBuffer(resp)
  230. t := struct {
  231. Clusters []*model.Cluster `toml:"clusters"`
  232. }{
  233. Clusters: clusters,
  234. }
  235. err = toml.NewEncoder(buf).Encode(t)
  236. resp = buf.Bytes()
  237. return
  238. }