resource.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package databus
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "time"
  7. "go-common/app/admin/main/aegis/model"
  8. "go-common/app/admin/main/aegis/model/resource"
  9. "go-common/library/conf/env"
  10. "go-common/library/log"
  11. "go-common/library/queue/databus"
  12. )
  13. //var
  14. var (
  15. ErrInfo = errors.New("error info")
  16. ErrNilAgent = errors.New("nil agent")
  17. _formtTime = "2006-01-02 15:04:05"
  18. agent *databus.Databus
  19. )
  20. //RscMsg .
  21. type RscMsg struct {
  22. Action string `json:"action"`
  23. BizID int64 `json:"business_id"`
  24. Raw json.RawMessage `json:"raw"`
  25. }
  26. //AddInfo info for add
  27. type AddInfo struct {
  28. BusinessID int64 `json:"business_id"`
  29. NetID int64 `json:"net_id"`
  30. OID string `json:"oid"`
  31. MID int64 `json:"mid"`
  32. Content string `json:"content"`
  33. Extra1 int64 `json:"extra1"`
  34. Extra2 int64 `json:"extra2"`
  35. Extra3 int64 `json:"extra3"`
  36. Extra4 int64 `json:"extra4"`
  37. Extra5 int64 `json:"extra5"`
  38. Extra6 int64 `json:"extra6"`
  39. Extra1s string `json:"extra1s"`
  40. Extra2s string `json:"extra2s"`
  41. Extra3s string `json:"extra3s"`
  42. Extra4s string `json:"extra4s"`
  43. MetaData string `json:"metadata"`
  44. ExtraTime1 time.Time
  45. OCtime time.Time
  46. Ptime time.Time
  47. }
  48. //UpdateInfo info for update
  49. type UpdateInfo = model.UpdateOption
  50. //CancelInfo info for cancel
  51. type CancelInfo = model.CancelOption
  52. //InitAegis .
  53. func InitAegis(c *databus.Config) {
  54. if c == nil {
  55. c = _defaultConfig
  56. if d, ok := _defaultAddrConfig[env.DeployEnv]; ok {
  57. c.Key = d.Key
  58. c.Secret = d.Secret
  59. c.Addr = d.Addr
  60. }
  61. }
  62. if agent != nil {
  63. agent.Close()
  64. }
  65. agent = databus.New(c)
  66. }
  67. //CloseAegis .
  68. func CloseAegis() {
  69. if agent != nil {
  70. agent.Close()
  71. }
  72. }
  73. //Add .
  74. func Add(m *AddInfo) (err error) {
  75. if agent == nil {
  76. return ErrNilAgent
  77. }
  78. if m == nil || m.BusinessID <= 0 || m.NetID <= 0 || len(m.OID) == 0 {
  79. return ErrInfo
  80. }
  81. opt := &model.AddOption{
  82. Resource: resource.Resource{
  83. BusinessID: m.BusinessID,
  84. OID: m.OID,
  85. MID: m.MID,
  86. Content: m.Content,
  87. Extra1: m.Extra1,
  88. Extra2: m.Extra2,
  89. Extra3: m.Extra3,
  90. Extra4: m.Extra4,
  91. Extra5: m.Extra5,
  92. Extra6: m.Extra6,
  93. Extra1s: m.Extra1s,
  94. Extra2s: m.Extra2s,
  95. Extra3s: m.Extra3s,
  96. Extra4s: m.Extra4s,
  97. ExtraTime1: m.ExtraTime1.Format(_formtTime),
  98. OCtime: m.OCtime.Format(_formtTime),
  99. Ptime: m.Ptime.Format(_formtTime),
  100. },
  101. NetID: m.NetID,
  102. }
  103. msg, err := formatMsg(opt, m.BusinessID, "add")
  104. if err != nil {
  105. return ErrInfo
  106. }
  107. return aegisSend(context.Background(), m.OID, msg)
  108. }
  109. //Update .
  110. func Update(m *UpdateInfo) (err error) {
  111. if agent == nil {
  112. return ErrNilAgent
  113. }
  114. if m == nil || m.BusinessID <= 0 || m.NetID <= 0 || len(m.OID) == 0 || len(m.Update) == 0 {
  115. return ErrInfo
  116. }
  117. msg, err := formatMsg(m, m.BusinessID, "update")
  118. if err != nil {
  119. return ErrInfo
  120. }
  121. return aegisSend(context.Background(), m.OID, msg)
  122. }
  123. //Cancel .
  124. func Cancel(m *CancelInfo) (err error) {
  125. if agent == nil {
  126. return ErrNilAgent
  127. }
  128. if m == nil || m.BusinessID <= 0 || len(m.Oids) == 0 {
  129. return ErrInfo
  130. }
  131. msg, err := formatMsg(m, m.BusinessID, "cancel")
  132. if err != nil {
  133. return ErrInfo
  134. }
  135. return aegisSend(context.Background(), m.Oids[0], msg)
  136. }
  137. //aegisSend .
  138. func aegisSend(c context.Context, key string, msg interface{}) (err error) {
  139. log.Info("start to send key(%s) msg(%+v)", key, msg)
  140. for retry := 0; retry < 3; retry++ {
  141. if err = agent.Send(c, key, msg); err == nil {
  142. break
  143. }
  144. }
  145. if err != nil {
  146. log.Error("s.aegisPub.Send(%s) error(%v) msg(%+v) ", key, err, msg)
  147. }
  148. return
  149. }
  150. func formatMsg(m interface{}, bizid int64, action string) (*RscMsg, error) {
  151. raw, err := json.Marshal(m)
  152. if err != nil {
  153. return nil, ErrInfo
  154. }
  155. msg := &RscMsg{
  156. Action: action,
  157. BizID: bizid,
  158. Raw: raw,
  159. }
  160. return msg, nil
  161. }