client.go 17 KB


  1. package v2
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "encoding/json"
  7. "fmt"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "go-common/app/infra/config/model"
  12. "go-common/library/ecode"
  13. "go-common/library/log"
  14. xtime "go-common/library/time"
  15. )
  16. const (
  17. _buildVerKey = "%d_%s"
  18. _pushKey = "%s_%s"
  19. _cacheKey = "%s_%d"
  20. _pushForceKey = "%s_%s_%s"
  21. _tagForceKey = "%d_%s_%d_tagforce"
  22. _tagIDkey = "%d_%s_tagID"
  23. _lastForceKey = "%d_%s_lastForce"
  24. )
  25. func lastForceKey(svr int64, bver string) string {
  26. return fmt.Sprintf(_tagIDkey, svr, bver)
  27. }
  28. func tagIDkey(svr int64, bver string) string {
  29. return fmt.Sprintf(_tagIDkey, svr, bver)
  30. }
  31. // tagForceKey...
  32. func tagForceKey(svr int64, bver string, tagID int64) string {
  33. return fmt.Sprintf(_tagForceKey, svr, bver, tagID)
  34. }
  35. // pushForceKey push
  36. func pushForceKey(svr, host, ip string) string {
  37. return fmt.Sprintf(_pushForceKey, svr, host, ip)
  38. }
  39. // PushKey push sub id
  40. func pushKey(svr, host string) string {
  41. return fmt.Sprintf(_pushKey, svr, host)
  42. }
  43. // buildVerKey version mapping key
  44. func buildVerKey(svr int64, bver string) string {
  45. return fmt.Sprintf(_buildVerKey, svr, bver)
  46. }
  47. // cacheKey config cache key
  48. func cacheKey(svr string, ver int64) string {
  49. return fmt.Sprintf(_cacheKey, svr, ver)
  50. }
  51. // genConfig generate config
  52. func genConfig(ver int64, values []*model.Value) (conf *model.Content, err error) {
  53. var b []byte
  54. if b, err = json.Marshal(values); err != nil {
  55. return
  56. }
  57. mb := md5.Sum(b)
  58. conf = &model.Content{
  59. Version: ver,
  60. Md5: hex.EncodeToString(mb[:]),
  61. Content: string(b),
  62. }
  63. return
  64. }
  65. // Push version to clients & generate config caches
  66. func (s *Service) Push(c context.Context, svr *model.Service) (err error) {
  67. var (
  68. hosts []*model.Host
  69. values []*model.Value
  70. conf *model.Content
  71. app *model.App
  72. ids []int64
  73. force int8
  74. )
  75. if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
  76. return
  77. }
  78. if values, err = s.dao.ConfigsByIDs(ids); err != nil {
  79. return
  80. }
  81. if len(values) == 0 {
  82. err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
  83. log.Error("%v", err)
  84. return
  85. }
  86. // compatible old version sdk
  87. if conf, err = genConfig(svr.Version, values); err != nil {
  88. log.Error("get config value:%s error(%v) ", values, err)
  89. return
  90. }
  91. cacheName := cacheKey(svr.Name, svr.Version)
  92. if err = s.dao.SetFile(cacheName, conf); err != nil {
  93. return
  94. }
  95. if app, err = s.app(svr.Name); err != nil {
  96. return
  97. }
  98. tag := s.setTag(app.ID, svr.BuildVersion, &cacheTag{Tag: svr.Version, ConfIDs: ids})
  99. s.setTagID(app.ID, tag.C.Tag, svr.BuildVersion)
  100. if force, err = s.dao.TagForce(svr.Version); err != nil {
  101. return
  102. }
  103. if force == 1 {
  104. s.setLFVForces(svr.Version, app.ID, svr.BuildVersion)
  105. }
  106. // push hosts
  107. if hosts, err = s.dao.Hosts(c, svr.Name); err != nil {
  108. log.Error("get hosts error. svr:%s, buildVer:%s, ver:%d", svr.Name, svr.BuildVersion, svr.Version)
  109. err = nil
  110. return
  111. }
  112. for _, h := range hosts {
  113. if h.State == model.HostOnline {
  114. pushKey := pushKey(h.Service, h.Name)
  115. if ok := s.pubEvent(pushKey, &model.Diff{Version: svr.Version, Diffs: tag.diff()}); ok {
  116. log.Info("s.events.Pub(%s, %d) ok: %t", pushKey, conf.Version, ok)
  117. }
  118. }
  119. }
  120. return
  121. }
  122. // Config return config content.
  123. func (s *Service) Config(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
  124. var (
  125. values, all []*model.Value
  126. )
  127. if _, err = s.appAuth(c, svrName, token); err != nil {
  128. return
  129. }
  130. cacheName := cacheKey(svrName, ver)
  131. if conf, err = s.dao.File(cacheName); err == nil {
  132. if len(ids) == 0 {
  133. return
  134. }
  135. if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
  136. return
  137. }
  138. } else {
  139. if all, err = s.values(ver); err != nil {
  140. return
  141. }
  142. if conf, err = genConfig(ver, all); err != nil {
  143. return
  144. }
  145. cacheName := cacheKey(svrName, ver)
  146. if err = s.dao.SetFile(cacheName, conf); err != nil {
  147. return
  148. }
  149. if len(ids) == 0 {
  150. return
  151. }
  152. }
  153. if len(all) == 0 {
  154. err = ecode.NothingFound
  155. return
  156. }
  157. for _, v := range all {
  158. for _, id := range ids {
  159. if v.ConfigID == id {
  160. values = append(values, v)
  161. }
  162. }
  163. }
  164. if len(values) == 0 {
  165. log.Error("Config(%s,%v) error", svrName, ids)
  166. err = ecode.NothingFound
  167. return
  168. }
  169. if conf, err = genConfig(ver, values); err != nil {
  170. log.Error("get config value:%s error(%v) ", values, err)
  171. return
  172. }
  173. return
  174. }
  175. // File get one file content.
  176. func (s *Service) File(c context.Context, svr *model.Service) (val string, err error) {
  177. var (
  178. curVer, appID int64
  179. all []*model.Value
  180. conf *model.Content
  181. tag *curTag
  182. )
  183. if appID, err = s.appAuth(c, svr.Name, svr.Token); err != nil {
  184. return
  185. }
  186. if svr.Version != model.UnknownVersion {
  187. curVer = svr.Version
  188. } else {
  189. // get current version, return if has new config version
  190. if tag, err = s.curTag(appID, svr.BuildVersion); err != nil {
  191. return
  192. }
  193. curVer = tag.cur()
  194. }
  195. cacheName := cacheKey(svr.Name, svr.Version)
  196. if conf, err = s.dao.File(cacheName); err == nil {
  197. if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
  198. return
  199. }
  200. } else {
  201. if all, err = s.values(curVer); err != nil {
  202. return
  203. }
  204. if conf, err = genConfig(curVer, all); err != nil {
  205. return
  206. }
  207. cacheName := cacheKey(svr.Name, curVer)
  208. if err = s.dao.SetFile(cacheName, conf); err != nil {
  209. return
  210. }
  211. }
  212. for _, v := range all {
  213. if v.Name == svr.File {
  214. val = v.Config
  215. break
  216. }
  217. }
  218. return
  219. }
  220. // CheckVersion check client version.
  221. func (s *Service) CheckVersion(c context.Context, rhost *model.Host, token string) (evt chan *model.Diff, err error) {
  222. var (
  223. appID, tagID int64
  224. tag *curTag
  225. ForceVersion int64
  226. tagForce int8
  227. lastForce int64
  228. )
  229. if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
  230. return
  231. }
  232. // set heartbeat
  233. rhost.HeartbeatTime = xtime.Time(time.Now().Unix())
  234. s.dao.SetHost(c, rhost, rhost.Service)
  235. evt = make(chan *model.Diff, 1)
  236. //请开始你的表演
  237. tagForce, err = s.tagForce(appID, rhost.BuildVersion)
  238. if err != nil {
  239. return
  240. }
  241. rhost.Force = tagForce
  242. s.dao.SetHost(c, rhost, rhost.Service)
  243. if rhost.ConfigVersion > 0 {
  244. // force has a higher priority than appoint
  245. if ForceVersion, err = s.curForce(rhost, appID); err != nil {
  246. return
  247. }
  248. if ForceVersion > 0 {
  249. rhost.ForceVersion = ForceVersion
  250. s.dao.SetHost(c, rhost, rhost.Service)
  251. if ForceVersion != rhost.ConfigVersion {
  252. evt <- &model.Diff{Version: ForceVersion}
  253. }
  254. return
  255. }
  256. if lastForce, err = s.lastForce(appID, rhost.ConfigVersion, rhost.BuildVersion); err != nil {
  257. return
  258. }
  259. if rhost.ConfigVersion <= lastForce {
  260. if rhost.ConfigVersion != lastForce {
  261. evt <- &model.Diff{Version: lastForce}
  262. }
  263. return
  264. }
  265. }
  266. if rhost.Appoint > 0 {
  267. if rhost.Appoint != rhost.ConfigVersion {
  268. evt <- &model.Diff{Version: rhost.Appoint}
  269. }
  270. return
  271. }
  272. //结束表演
  273. // get current version, return if has new config version
  274. if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
  275. return
  276. }
  277. if tagID = tag.cur(); tagID == 0 {
  278. err = ecode.NothingFound
  279. return
  280. }
  281. if tagID != rhost.ConfigVersion {
  282. ver := &model.Diff{Version: tagID}
  283. if rhost.ConfigVersion == tag.old() {
  284. ver.Diffs = tag.diff()
  285. }
  286. evt <- ver
  287. return
  288. }
  289. pushKey := pushKey(rhost.Service, rhost.Name)
  290. s.eLock.Lock()
  291. s.events[pushKey] = evt
  292. s.eLock.Unlock()
  293. return
  294. }
  295. // values get configs by tag id.
  296. func (s *Service) values(tagID int64) (values []*model.Value, err error) {
  297. var (
  298. ids []int64
  299. )
  300. if ids, err = s.dao.ConfIDs(tagID); err != nil {
  301. return
  302. }
  303. values, err = s.dao.ConfigsByIDs(ids)
  304. return
  305. }
  306. // appAuth check app is auth
  307. func (s *Service) appAuth(c context.Context, svr, token string) (appID int64, err error) {
  308. app, err := s.app(svr)
  309. if err != nil {
  310. return
  311. }
  312. if app.Token != token {
  313. err = ecode.AccessDenied
  314. return
  315. }
  316. appID = app.ID
  317. return
  318. }
  319. // Hosts return client hosts.
  320. func (s *Service) Hosts(c context.Context, svr string) (hosts []*model.Host, err error) {
  321. return s.dao.Hosts(c, svr)
  322. }
  323. // Builds all builds
  324. func (s *Service) Builds(c context.Context, svr string) (builds []string, err error) {
  325. var (
  326. app *model.App
  327. )
  328. if app, err = s.app(svr); err != nil {
  329. return
  330. }
  331. return s.dao.BuildsByAppID(app.ID)
  332. }
  333. // VersionSuccess return client versions which configuration is complete
  334. func (s *Service) VersionSuccess(c context.Context, svr, bver string) (versions *model.Versions, err error) {
  335. var (
  336. vers []*model.ReVer
  337. ver int64
  338. app *model.App
  339. )
  340. if app, err = s.app(svr); err != nil {
  341. return
  342. }
  343. if ver, err = s.dao.TagID(app.ID, bver); err != nil {
  344. log.Error("BuildVersion(%v) error(%v)", svr, err)
  345. return
  346. }
  347. if vers, err = s.dao.Tags(app.ID); err != nil {
  348. return
  349. }
  350. versions = &model.Versions{
  351. Version: vers,
  352. DefVer: ver,
  353. }
  354. return
  355. }
  356. // ClearHost clear service hosts.
  357. func (s *Service) ClearHost(c context.Context, svr string) (err error) {
  358. return s.dao.ClearHost(c, svr)
  359. }
  360. // pubEvent publish a event to chan.
  361. func (s *Service) pubEvent(key string, evt *model.Diff) (ok bool) {
  362. s.eLock.RLock()
  363. c, ok := s.events[key]
  364. s.eLock.RUnlock()
  365. if ok {
  366. c <- evt
  367. }
  368. return
  369. }
  370. // Unsub unsub a event.
  371. func (s *Service) Unsub(svr, host string) {
  372. key := pushKey(svr, host)
  373. s.eLock.Lock()
  374. delete(s.events, key)
  375. s.eLock.Unlock()
  376. }
  377. func (s *Service) curTag(appID int64, build string) (tag *curTag, err error) {
  378. var (
  379. ok bool
  380. tagID int64
  381. ids []int64
  382. tagForce int8
  383. )
  384. // get current version, return if has new config version
  385. verKey := buildVerKey(appID, build)
  386. s.tLock.RLock()
  387. tag, ok = s.tags[verKey]
  388. s.tLock.RUnlock()
  389. if !ok {
  390. if tagID, err = s.dao.TagID(appID, build); err != nil {
  391. return
  392. }
  393. if ids, err = s.dao.ConfIDs(tagID); err != nil {
  394. return
  395. }
  396. if tagForce, err = s.dao.TagForce(tagID); err != nil {
  397. return
  398. }
  399. tag = s.setTag(appID, build, &cacheTag{Tag: tagID, ConfIDs: ids, Force: tagForce})
  400. }
  401. return
  402. }
  403. func (s *Service) setTag(appID int64, bver string, cTag *cacheTag) (nTag *curTag) {
  404. var (
  405. oTag *curTag
  406. ok bool
  407. )
  408. verKey := buildVerKey(appID, bver)
  409. nTag = &curTag{C: cTag}
  410. s.tLock.Lock()
  411. oTag, ok = s.tags[verKey]
  412. if ok && oTag.C != nil {
  413. nTag.O = oTag.C
  414. }
  415. s.tags[verKey] = nTag
  416. s.tLock.Unlock()
  417. return
  418. }
  419. func (s *Service) app(svr string) (app *model.App, err error) {
  420. var (
  421. ok bool
  422. treeID int64
  423. )
  424. s.aLock.RLock()
  425. app, ok = s.apps[svr]
  426. s.aLock.RUnlock()
  427. if !ok {
  428. arrs := strings.Split(svr, "_")
  429. if len(arrs) != 3 {
  430. err = ecode.RequestErr
  431. return
  432. }
  433. if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
  434. return
  435. }
  436. if app, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
  437. log.Error("Token(%v) error(%v)", svr, err)
  438. return
  439. }
  440. s.aLock.Lock()
  441. s.apps[svr] = app
  442. s.aLock.Unlock()
  443. }
  444. return app, nil
  445. }
  446. //SetToken set token.
  447. func (s *Service) SetToken(svr, token string) (err error) {
  448. var (
  449. ok bool
  450. treeID int64
  451. app, nApp *model.App
  452. )
  453. s.aLock.RLock()
  454. app, ok = s.apps[svr]
  455. s.aLock.RUnlock()
  456. if ok {
  457. nApp = &model.App{ID: app.ID, Name: app.Name, Token: token}
  458. } else {
  459. arrs := strings.Split(svr, "_")
  460. if len(arrs) != 3 {
  461. err = ecode.RequestErr
  462. return
  463. }
  464. if treeID, err = strconv.ParseInt(arrs[0], 10, 64); err != nil {
  465. return
  466. }
  467. if nApp, err = s.dao.AppByTree(arrs[2], arrs[1], treeID); err != nil {
  468. log.Error("Token(%v) error(%v)", svr, err)
  469. return
  470. }
  471. }
  472. s.aLock.Lock()
  473. s.apps[svr] = nApp
  474. s.aLock.Unlock()
  475. return
  476. }
  477. //TmpBuilds get builds.
  478. func (s *Service) TmpBuilds(svr, env string) (builds []string, err error) {
  479. var (
  480. apps []*model.DBApp
  481. appIDs []int64
  482. )
  483. switch env {
  484. case "10":
  485. env = "dev"
  486. case "11":
  487. env = "fat1"
  488. case "13":
  489. env = "uat"
  490. case "14":
  491. env = "pre"
  492. case "3":
  493. env = "prod"
  494. default:
  495. }
  496. if apps, err = s.dao.AppsByNameEnv(svr, env); err != nil {
  497. return
  498. }
  499. if len(apps) == 0 {
  500. err = ecode.NothingFound
  501. return
  502. }
  503. for _, app := range apps {
  504. appIDs = append(appIDs, app.ID)
  505. }
  506. return s.dao.BuildsByAppIDs(appIDs)
  507. }
  508. // AppService ...
  509. func (s *Service) AppService(zone, env, token string) (service string, err error) {
  510. var (
  511. ok bool
  512. key string
  513. app *model.App
  514. )
  515. key = fmt.Sprintf("%s_%s_%s", zone, env, token)
  516. s.bLock.RLock()
  517. app, ok = s.services[key]
  518. s.bLock.RUnlock()
  519. if !ok {
  520. app, err = s.dao.AppGet(zone, env, token)
  521. if err != nil {
  522. log.Error("AppService error(%v)", err)
  523. return
  524. }
  525. s.bLock.Lock()
  526. s.services[key] = app
  527. s.bLock.Unlock()
  528. }
  529. service = fmt.Sprintf("%d_%s_%s", app.TreeID, app.Env, app.Zone)
  530. return
  531. }
  532. // Force version to clients & generate config caches
  533. func (s *Service) Force(c context.Context, svr *model.Service, hostnames map[string]string, sType int8) (err error) {
  534. var (
  535. values []*model.Value
  536. ids []int64
  537. )
  538. if sType == 1 { // push 1 clear other
  539. if ids, err = s.dao.ConfIDs(svr.Version); err != nil {
  540. return
  541. }
  542. if values, err = s.dao.ConfigsByIDs(ids); err != nil {
  543. return
  544. }
  545. if len(values) == 0 {
  546. err = fmt.Errorf("config values is empty. svr:%s, host:%s, buildVer:%s, ver:%d", svr.Name, svr.Host, svr.BuildVersion, svr.Version)
  547. log.Error("%v", err)
  548. return
  549. }
  550. }
  551. for key, val := range hostnames {
  552. pushForceKey := pushForceKey(svr.Name, key, val)
  553. s.pubForce(pushForceKey, svr.Version)
  554. log.Info("s.events.Pub(%s, %d)", pushForceKey, svr.Version)
  555. }
  556. return
  557. }
  558. func (s *Service) curForce(rhost *model.Host, appID int64) (version int64, err error) {
  559. var (
  560. ok bool
  561. )
  562. // get force version
  563. pushForceKey := pushForceKey(rhost.Service, rhost.Name, rhost.IP)
  564. s.fLock.RLock()
  565. version, ok = s.forces[pushForceKey]
  566. s.fLock.RUnlock()
  567. if !ok {
  568. if version, err = s.dao.Force(appID, rhost.Name); err != nil {
  569. return
  570. }
  571. s.fLock.Lock()
  572. s.forces[pushForceKey] = version
  573. s.fLock.Unlock()
  574. }
  575. return
  576. }
  577. // pubEvent publish a forces.
  578. func (s *Service) pubForce(key string, version int64) {
  579. s.fLock.Lock()
  580. s.forces[key] = version
  581. s.fLock.Unlock()
  582. }
  583. func (s *Service) tagForce(appID int64, build string) (force int8, err error) {
  584. var (
  585. ok bool
  586. tagID int64
  587. )
  588. key := tagIDkey(appID, build)
  589. s.tagIDLock.RLock()
  590. tagID, ok = s.tagID[key]
  591. s.tagIDLock.RUnlock()
  592. if !ok {
  593. if tagID, err = s.dao.TagID(appID, build); err != nil {
  594. return
  595. }
  596. s.setTagID(appID, tagID, build)
  597. }
  598. verKey := tagForceKey(appID, build, tagID)
  599. s.tfLock.RLock()
  600. force, ok = s.forceType[verKey]
  601. s.tfLock.RUnlock()
  602. if !ok {
  603. if force, err = s.dao.TagForce(tagID); err != nil {
  604. return
  605. }
  606. s.tfLock.Lock()
  607. s.forceType[verKey] = force
  608. s.tfLock.Unlock()
  609. }
  610. return
  611. }
  612. func (s *Service) setTagID(appID, tagID int64, build string) {
  613. key := tagIDkey(appID, build)
  614. s.tagIDLock.Lock()
  615. s.tagID[key] = tagID
  616. s.tagIDLock.Unlock()
  617. }
  618. func (s *Service) lastForce(appID, version int64, build string) (lastForce int64, err error) {
  619. var ok bool
  620. var buildID int64
  621. key := lastForceKey(appID, build)
  622. s.lfvLock.RLock()
  623. lastForce, ok = s.lfvforces[key]
  624. s.lfvLock.RUnlock()
  625. if !ok {
  626. if buildID, err = s.dao.BuildID(appID, build); err != nil {
  627. return
  628. }
  629. if lastForce, err = s.dao.LastForce(appID, buildID); err != nil {
  630. return
  631. }
  632. s.lfvLock.Lock()
  633. s.lfvforces[key] = lastForce
  634. s.lfvLock.Unlock()
  635. }
  636. return
  637. }
  638. func (s *Service) setLFVForces(lastForce, appID int64, build string) {
  639. key := lastForceKey(appID, build)
  640. s.lfvLock.Lock()
  641. s.lfvforces[key] = lastForce
  642. s.lfvLock.Unlock()
  643. }
  644. //CheckLatest ...
  645. func (s *Service) CheckLatest(c context.Context, rhost *model.Host, token string) (ver int64, err error) {
  646. var (
  647. appID, tagID int64
  648. tag *curTag
  649. )
  650. if appID, err = s.appAuth(c, rhost.Service, token); err != nil {
  651. return
  652. }
  653. // get current version, return if has new config version
  654. if tag, err = s.curTag(appID, rhost.BuildVersion); err != nil {
  655. return
  656. }
  657. if tagID = tag.cur(); tagID == 0 {
  658. err = ecode.NothingFound
  659. return
  660. }
  661. ver = tagID
  662. return
  663. }
  664. // ConfigCheck return config content.
  665. func (s *Service) ConfigCheck(c context.Context, svrName, token string, ver int64, ids []int64) (conf *model.Content, err error) {
  666. var (
  667. values, all []*model.Value
  668. appID int64
  669. tagAll *model.DBTag
  670. )
  671. if appID, err = s.appAuth(c, svrName, token); err != nil {
  672. return
  673. }
  674. if tagAll, err = s.dao.TagAll(ver); err != nil {
  675. return
  676. }
  677. if appID != tagAll.AppID {
  678. err = ecode.RequestErr
  679. return
  680. }
  681. cacheName := cacheKey(svrName, ver)
  682. if conf, err = s.dao.File(cacheName); err == nil {
  683. if len(ids) == 0 {
  684. return
  685. }
  686. if err = json.Unmarshal([]byte(conf.Content), &all); err != nil {
  687. return
  688. }
  689. } else {
  690. if all, err = s.values(ver); err != nil {
  691. return
  692. }
  693. if conf, err = genConfig(ver, all); err != nil {
  694. return
  695. }
  696. cacheName := cacheKey(svrName, ver)
  697. if err = s.dao.SetFile(cacheName, conf); err != nil {
  698. return
  699. }
  700. if len(ids) == 0 {
  701. return
  702. }
  703. }
  704. if len(all) == 0 {
  705. err = ecode.NothingFound
  706. return
  707. }
  708. for _, v := range all {
  709. for _, id := range ids {
  710. if v.ConfigID == id {
  711. values = append(values, v)
  712. }
  713. }
  714. }
  715. if len(values) == 0 {
  716. log.Error("Config(%s,%v) error", svrName, ids)
  717. err = ecode.NothingFound
  718. return
  719. }
  720. if conf, err = genConfig(ver, values); err != nil {
  721. log.Error("get config value:%s error(%v) ", values, err)
  722. return
  723. }
  724. return
  725. }