client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. package v1
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "time"
  9. "go-common/app/infra/config/model"
  10. "go-common/library/database/sql"
  11. "go-common/library/ecode"
  12. "go-common/library/log"
  13. xtime "go-common/library/time"
  14. )
  15. const (
  16. _buildVerKey = "%s_%s_%s"
  17. _pushKey = "%s_%s_%s"
  18. _cacheKey = "%s_%s_%d_%s"
  19. _cacheKey2 = "%s_%s_%d_%s_2"
  20. _fileKey = "%s_%d"
  21. )
  22. var (
  23. addBusiness = "前端接口api添加配置版本"
  24. addInfo = "添加版本:%d"
  25. copyBusiness = "前端接口api拷贝配置版本"
  26. copyInfo = "拷贝版本:%d,新的版本:%d"
  27. updateBusiness = "前端接口api更新配置"
  28. updateInfo = "更新版本:%d"
  29. )
  30. // PushKey push sub id
  31. func pushKey(svr, host, env string) string {
  32. return fmt.Sprintf(_pushKey, svr, host, env)
  33. }
  34. // buildVerKey version mapping key
  35. func buildVerKey(svr, bver, env string) string {
  36. return fmt.Sprintf(_buildVerKey, svr, bver, env)
  37. }
  38. // cacheKey config cache key
  39. func cacheKey(svr, bver, env string, ver int64) string {
  40. return fmt.Sprintf(_cacheKey, svr, bver, ver, env)
  41. }
  42. // cacheKey config cache key
  43. func cacheKey2(svr, bver, env string, ver int64) string {
  44. return fmt.Sprintf(_cacheKey2, svr, bver, ver, env)
  45. }
  46. // fileKey
  47. func fileKey(filename string, ver int64) string {
  48. return fmt.Sprintf(_fileKey, filename, ver)
  49. }
  50. // tokenKey
  51. func tokenKey(svr, env string) string {
  52. return fmt.Sprintf("%s_%s", svr, env)
  53. }
  54. // genConfig generate config
  55. func genConfig(ver int64, cs []*model.NSValue) (conf *model.Content, err error) {
  56. var b []byte
  57. data := make(map[string]string)
  58. for _, c := range cs {
  59. data[c.Name] = c.Config
  60. }
  61. if b, err = json.Marshal(data); err != nil {
  62. return
  63. }
  64. mb := md5.Sum(b)
  65. conf = &model.Content{
  66. Version: ver,
  67. Md5: hex.EncodeToString(mb[:]),
  68. Content: string(b),
  69. }
  70. return
  71. }
  72. // genConfig2 generate config
  73. func genConfig2(ver int64, cs []*model.NSValue, ns map[int64]string) (conf *model.Content, err error) {
  74. var (
  75. b []byte
  76. v string
  77. ok bool
  78. s *model.Namespace
  79. )
  80. nsc := make(map[string]*model.Namespace)
  81. for _, c := range cs {
  82. if v, ok = ns[c.NamespaceID]; !ok && c.NamespaceID != 0 {
  83. continue
  84. }
  85. if s, ok = nsc[v]; !ok {
  86. s = &model.Namespace{Name: v, Data: map[string]string{}}
  87. nsc[v] = s
  88. }
  89. s.Data[c.Name] = c.Config
  90. }
  91. if b, err = json.Marshal(nsc); err != nil {
  92. return
  93. }
  94. mb := md5.Sum(b)
  95. conf = &model.Content{
  96. Version: ver,
  97. Md5: hex.EncodeToString(mb[:]),
  98. Content: string(b),
  99. }
  100. return
  101. }
  102. // Push version to clients & generate config caches
  103. func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
  104. var (
  105. hosts []*model.Host
  106. values []*model.NSValue
  107. conf *model.Content
  108. conf2 *model.Content
  109. namespaces map[int64]string
  110. )
  111. if values, err = s.dao.Values(c, svr.Version); err != nil {
  112. return
  113. }
  114. if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
  115. return
  116. }
  117. if len(values) == 0 {
  118. err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
  119. log.Error("%v", err)
  120. return
  121. }
  122. // compatible old version sdk
  123. if conf, err = genConfig(svr.Version, values); err != nil {
  124. log.Error("get config value:%s error(%v) ", values, err)
  125. return
  126. }
  127. cacheKey := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
  128. if err = s.dao.SetFile(cacheKey, conf); err != nil {
  129. log.Error("set confCashe error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
  130. err = nil
  131. }
  132. if conf2, err = genConfig2(svr.Version, values, namespaces); err != nil {
  133. log.Error("get config2 value:%s error(%v) ", values, err)
  134. return
  135. }
  136. cacheKey2 := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
  137. if err = s.dao.SetFile(cacheKey2, conf2); err != nil {
  138. log.Error("set confCashe2 error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Env)
  139. err = nil
  140. }
  141. s.setVersion(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
  142. // push hosts
  143. if hosts, err = s.dao.Hosts(c, svr.Name, svr.Env); err != nil {
  144. log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
  145. err = nil
  146. return
  147. }
  148. for _, h := range hosts {
  149. if h.State == model.HostOnline {
  150. pushKey := pushKey(h.Service, h.Name, svr.Env)
  151. if ok := s.pubEvent(pushKey, &model.Version{Version: conf.Version}); ok {
  152. log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
  153. }
  154. }
  155. }
  156. return
  157. }
  158. // Config return config content.
  159. func (s *Service) Config(c context.Context, svr *model.Service) (conf *model.Content, err error) {
  160. var values []*model.NSValue
  161. if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
  162. return
  163. }
  164. cacheName := cacheKey(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
  165. if conf, err = s.dao.File(cacheName); err == nil {
  166. return
  167. }
  168. if values, err = s.dao.Values(c, svr.Version); err != nil {
  169. return
  170. }
  171. if len(values) == 0 {
  172. err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
  173. log.Error("%v", err)
  174. return
  175. }
  176. if conf, err = genConfig(svr.Version, values); err != nil {
  177. log.Error("get config value:%s error(%v) ", values, err)
  178. return
  179. }
  180. if err = s.dao.SetFile(cacheName, conf); err != nil {
  181. err = nil
  182. }
  183. return
  184. }
  185. // Config2 return config content.
  186. func (s *Service) Config2(c context.Context, svr *model.Service) (conf *model.Content, err error) {
  187. var (
  188. values []*model.NSValue
  189. namespaces map[int64]string
  190. )
  191. if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
  192. return
  193. }
  194. cacheName := cacheKey2(svr.Name, svr.BuildVersion, svr.Env, svr.Version)
  195. if conf, err = s.dao.File(cacheName); err == nil {
  196. return
  197. }
  198. if namespaces, err = s.dao.Namespaces(c, svr.Version); err != nil {
  199. return
  200. }
  201. if values, err = s.dao.Values(c, svr.Version); err != nil {
  202. return
  203. }
  204. if len(values) == 0 {
  205. err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
  206. log.Error("%v", err)
  207. return
  208. }
  209. if conf, err = genConfig2(svr.Version, values, namespaces); err != nil {
  210. log.Error("get config value:(%s) error(%v) ", values, err)
  211. return
  212. }
  213. if err = s.dao.SetFile(cacheName, conf); err != nil {
  214. err = nil
  215. }
  216. return
  217. }
  218. // File get one file content.
  219. func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
  220. var (
  221. curVer int64
  222. ok bool
  223. )
  224. if err = s.appAuth(c, svr.Name, svr.Env, svr.Token); err != nil {
  225. return
  226. }
  227. if svr.Version != model.UnknownVersion {
  228. curVer = svr.Version
  229. } else {
  230. curVer, ok = s.version(svr.Name, svr.BuildVersion, svr.Env)
  231. if !ok {
  232. if curVer, err = s.dao.BuildVersion(c, svr.Name, svr.BuildVersion, svr.Env); err != nil {
  233. log.Error("BuildVersion(%v) error(%v)", svr, err)
  234. return
  235. }
  236. s.setVersion(svr.Name, svr.BuildVersion, svr.Env, curVer)
  237. }
  238. }
  239. fKey := fileKey(svr.File, curVer)
  240. if val, err = s.dao.FileStr(fKey); err == nil {
  241. return
  242. }
  243. if val, err = s.dao.Value(c, svr.File, curVer); err != nil {
  244. log.Error("Value(%v) error(%v)", svr.File, err)
  245. return
  246. }
  247. s.dao.SetFileStr(fKey, val)
  248. return
  249. }
  250. // CheckVersion check client version.
  251. func (s *Service) CheckVersion(c context.Context, rhost *model.Host, env, token string) (evt chan *model.Version, err error) {
  252. var (
  253. curVer int64
  254. )
  255. if err = s.appAuth(c, rhost.Service, env, token); err != nil {
  256. return
  257. }
  258. // set heartbeat
  259. rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
  260. if err = s.dao.SetHost(c, rhost, rhost.Service, env); err != nil {
  261. err = nil
  262. }
  263. evt = make(chan *model.Version, 1)
  264. if rhost.Appoint > 0 {
  265. if rhost.Appoint != rhost.ConfigVersion {
  266. evt <- &model.Version{Version: rhost.Appoint}
  267. }
  268. return
  269. }
  270. // get current version, return if has new config version
  271. if curVer, err = s.curVer(c, rhost.Service, rhost.BuildVersion, env); err != nil {
  272. return
  273. }
  274. if curVer == model.UnknownVersion {
  275. err = ecode.NothingFound
  276. return
  277. }
  278. if curVer != rhost.ConfigVersion {
  279. evt <- &model.Version{Version: curVer}
  280. return
  281. }
  282. pushKey := pushKey(rhost.Service, rhost.Name, env)
  283. s.eLock.Lock()
  284. s.events[pushKey] = evt
  285. s.eLock.Unlock()
  286. return
  287. }
  288. // AppAuth check app is auth
  289. func (s *Service) appAuth(c context.Context, svr, env, token string) (err error) {
  290. var (
  291. dbToken string
  292. ok bool
  293. tokenKey = tokenKey(svr, env)
  294. )
  295. s.eLock.RLock()
  296. dbToken, ok = s.token[tokenKey]
  297. s.eLock.RUnlock()
  298. if !ok {
  299. if dbToken, err = s.dao.Token(c, svr, env); err != nil {
  300. log.Error("Token(%v,%v) error(%v)", svr, env, err)
  301. return
  302. }
  303. s.SetToken(c, svr, env, dbToken)
  304. }
  305. if dbToken != token {
  306. err = ecode.AccessDenied
  307. }
  308. return
  309. }
  310. // SetToken update Token
  311. func (s *Service) SetToken(c context.Context, svr, env, token string) {
  312. tokenKey := tokenKey(svr, env)
  313. s.eLock.Lock()
  314. s.token[tokenKey] = token
  315. s.eLock.Unlock()
  316. }
  317. // Hosts return client hosts.
  318. func (s *Service) Hosts(c context.Context, svr, env string) (hosts []*model.Host, err error) {
  319. return s.dao.Hosts(c, svr, env)
  320. }
  321. // VersionSuccess return client versions which configuration is complete
  322. func (s *Service) VersionSuccess(c context.Context, svr, env, bver string) (versions *model.Versions, err error) {
  323. var (
  324. vers []*model.ReVer
  325. ver int64
  326. )
  327. if vers, err = s.dao.Versions(c, svr, env, model.ConfigEnd); err != nil {
  328. log.Error("Versions(%v,%v,%v) error(%v)", svr, env, bver, err)
  329. return
  330. }
  331. if ver, err = s.dao.BuildVersion(c, svr, bver, env); err != nil {
  332. log.Error("BuildVersion(%v) error(%v)", svr, err)
  333. return
  334. }
  335. versions = &model.Versions{
  336. Version: vers,
  337. DefVer: ver,
  338. }
  339. return
  340. }
  341. // VersionIng return client versions which configuration is creating
  342. func (s *Service) VersionIng(c context.Context, svr, env string) (vers []int64, err error) {
  343. var (
  344. res []*model.ReVer
  345. )
  346. if res, err = s.dao.Versions(c, svr, env, model.ConfigIng); err != nil {
  347. log.Error("Versions(%v,%v) error(%v)", svr, env, err)
  348. return
  349. }
  350. vers = make([]int64, 0)
  351. for _, reVer := range res {
  352. vers = append(vers, reVer.Version)
  353. }
  354. return
  355. }
  356. // Builds all builds
  357. func (s *Service) Builds(c context.Context, svr, env string) (builds []string, err error) {
  358. return s.dao.Builds(c, svr, env)
  359. }
  360. // AddConfigs insert config into db.
  361. func (s *Service) AddConfigs(c context.Context, svr, env, token, user string, data map[string]string) (err error) {
  362. var (
  363. svrID int64
  364. ver int64
  365. )
  366. if err = s.appAuth(c, svr, env, token); err != nil {
  367. return
  368. }
  369. if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
  370. return
  371. }
  372. var tx *sql.Tx
  373. if tx, err = s.dao.BeginTran(c); err != nil {
  374. log.Error("begin tran error(%v)", err)
  375. return
  376. }
  377. if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
  378. tx.Rollback()
  379. return
  380. }
  381. if len(data) != 0 {
  382. if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
  383. tx.Rollback()
  384. return
  385. }
  386. }
  387. if err = tx.Commit(); err != nil {
  388. log.Error("tx.Commit error(%v)", err)
  389. return
  390. }
  391. s.dao.InsertLog(c, user, addBusiness, fmt.Sprintf(addInfo, ver))
  392. return
  393. }
  394. // CopyConfigs copy config in newVer.
  395. func (s *Service) CopyConfigs(c context.Context, svr, env, token, user string, build string) (ver int64, err error) {
  396. var (
  397. svrID int64
  398. curVer int64
  399. values []*model.NSValue
  400. )
  401. if err = s.appAuth(c, svr, env, token); err != nil {
  402. return
  403. }
  404. if curVer, err = s.curVer(c, svr, build, env); err != nil {
  405. return
  406. }
  407. if values, err = s.dao.Values(c, curVer); err != nil {
  408. return
  409. }
  410. data := make(map[string]string)
  411. for _, c := range values {
  412. data[c.Name] = c.Config
  413. }
  414. if svrID, err = s.dao.ServiceID(c, svr, env); err != nil {
  415. return
  416. }
  417. var tx *sql.Tx
  418. if tx, err = s.dao.BeginTran(c); err != nil {
  419. log.Error("begin tran error(%v)", err)
  420. return
  421. }
  422. if ver, err = s.dao.TxInsertVer(tx, svrID, user); err != nil {
  423. tx.Rollback()
  424. return
  425. }
  426. if err = s.dao.TxInsertValues(c, tx, ver, user, data); err != nil {
  427. tx.Rollback()
  428. return
  429. }
  430. if err = tx.Commit(); err != nil {
  431. log.Error("tx.Commit error(%v)", err)
  432. return
  433. }
  434. s.dao.InsertLog(c, user, copyBusiness, fmt.Sprintf(copyInfo, curVer, ver))
  435. return
  436. }
  437. // UpdateConfigs update config.
  438. func (s *Service) UpdateConfigs(c context.Context, svr, env, token, user string, ver int64, data map[string]string) (err error) {
  439. var (
  440. values []*model.NSValue
  441. addData = make(map[string]string)
  442. udata = make(map[string]string)
  443. )
  444. if err = s.appAuth(c, svr, env, token); err != nil {
  445. return
  446. }
  447. if len(data) == 0 {
  448. return
  449. }
  450. if values, err = s.dao.Values(c, ver); err != nil {
  451. return
  452. }
  453. if len(values) == 0 {
  454. return ecode.NothingFound
  455. }
  456. oldData := make(map[string]string)
  457. for _, c := range values {
  458. oldData[c.Name] = c.Config
  459. }
  460. for k, v := range data {
  461. if _, ok := oldData[k]; ok {
  462. udata[k] = v
  463. } else {
  464. addData[k] = v
  465. }
  466. }
  467. var tx *sql.Tx
  468. if tx, err = s.dao.BeginTran(c); err != nil {
  469. log.Error("begin tran error(%v)", err)
  470. return
  471. }
  472. if len(addData) != 0 {
  473. if err = s.dao.TxInsertValues(c, tx, ver, user, addData); err != nil {
  474. tx.Rollback()
  475. return
  476. }
  477. }
  478. if len(udata) != 0 {
  479. if err = s.dao.TxUpdateValues(tx, ver, user, udata); err != nil {
  480. tx.Rollback()
  481. return
  482. }
  483. }
  484. if err = tx.Commit(); err != nil {
  485. log.Error("tx.Commit error(%v)", err)
  486. return
  487. }
  488. s.dao.InsertLog(c, user, updateBusiness, fmt.Sprintf(updateInfo, ver))
  489. return
  490. }
  491. func (s *Service) version(svr, bver, env string) (ver int64, ok bool) {
  492. verKey := buildVerKey(svr, bver, env)
  493. s.vLock.RLock()
  494. ver, ok = s.versions[verKey]
  495. s.vLock.RUnlock()
  496. return
  497. }
  498. func (s *Service) setVersion(svr, bver, env string, ver int64) {
  499. verKey := buildVerKey(svr, bver, env)
  500. s.vLock.Lock()
  501. s.versions[verKey] = ver
  502. s.vLock.Unlock()
  503. }
  504. // ClearHost clear service hosts.
  505. func (s *Service) ClearHost(c context.Context, svr, env string) (err error) {
  506. return s.dao.ClearHost(c, svr, env)
  507. }
  508. // pubEvent publish a event to chan.
  509. func (s *Service) pubEvent(key string, evt *model.Version) (ok bool) {
  510. s.eLock.RLock()
  511. c, ok := s.events[key]
  512. s.eLock.RUnlock()
  513. if ok {
  514. c <- evt
  515. }
  516. return
  517. }
  518. // Unsub unsub a event.
  519. func (s *Service) Unsub(svr, host, env string) {
  520. key := pushKey(svr, host, env)
  521. s.eLock.Lock()
  522. delete(s.events, key)
  523. s.eLock.Unlock()
  524. }
  525. func (s *Service) curVer(c context.Context, svr, build, env string) (ver int64, err error) {
  526. var ok bool
  527. // get current version, return if has new config version
  528. ver, ok = s.version(svr, build, env)
  529. if !ok {
  530. if ver, err = s.dao.BuildVersion(c, svr, build, env); err != nil {
  531. return
  532. }
  533. s.setVersion(svr, build, env, ver)
  534. }
  535. return
  536. }