config_attr.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package dao
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "go-common/app/job/main/search/model"
  10. "go-common/library/database/sql"
  11. "go-common/library/log"
  12. )
  13. const (
  14. _getAttrsSQL = "SELECT appid,db_name,es_name,table_prefix,table_format,index_prefix,index_version,index_format,index_type,index_id,index_mapping, " +
  15. "data_index_suffix,review_num,review_time,sleep,size,business,data_fields,data_extra,sql_by_id,sql_by_mtime,sql_by_idmtime,databus_info,databus_index_id FROM digger_app WHERE appid=?"
  16. )
  17. type attr struct {
  18. d *Dao
  19. appID string
  20. attrs *model.Attrs
  21. }
  22. func newAttr(d *Dao, appID string) (ar *attr) {
  23. ar = &attr{
  24. d: d,
  25. appID: appID,
  26. attrs: new(model.Attrs),
  27. }
  28. if err := ar.initAttrs(); err != nil {
  29. //fmt.Println("strace:init>", err)
  30. log.Error("d.initAttrs error (%v)", err)
  31. }
  32. return
  33. }
  34. func (ar *attr) initAttrs() (err error) {
  35. var sqlAttrs *model.SQLAttrs
  36. for {
  37. if sqlAttrs, err = ar.getSQLAttrs(context.TODO()); err != nil || sqlAttrs == nil {
  38. log.Error("d.Attrs error (%v)", err)
  39. time.Sleep(time.Second * 3)
  40. continue
  41. }
  42. break
  43. }
  44. // attr-src
  45. ar.attrs.Business = sqlAttrs.Business
  46. ar.attrs.AppID = sqlAttrs.AppID
  47. ar.attrs.DBName = sqlAttrs.DBName
  48. ar.attrs.ESName = sqlAttrs.ESName
  49. ar.attrs.DtbName = sqlAttrs.DtbName
  50. // attr-table
  51. if err = ar.parseTable(sqlAttrs); err != nil {
  52. err = fmt.Errorf("parseTable appid(%s) err(%v)", ar.appID, err)
  53. return
  54. }
  55. // attr-index
  56. if err = ar.parseIndex(sqlAttrs); err != nil {
  57. err = fmt.Errorf("parseIndex appid(%s) err(%v)", ar.appID, err)
  58. return
  59. }
  60. // attr-datasql
  61. if err = ar.parseDataSQL(sqlAttrs); err != nil {
  62. err = fmt.Errorf("parseDataSQL appid(%s) err(%v)", ar.appID, err)
  63. return
  64. }
  65. // attr-sql
  66. // attr-data_extra
  67. if err = ar.parseExtraData(sqlAttrs); err != nil {
  68. err = fmt.Errorf("parseExtraData appid(%s) err(%v)", ar.appID, err)
  69. return
  70. }
  71. // attr-databus
  72. if err = ar.parseDatabus(sqlAttrs); err != nil {
  73. err = fmt.Errorf("parseDatabus appid(%s) err(%v)", ar.appID, err)
  74. return
  75. }
  76. // attr-other
  77. ar.attrs.Other = &model.AttrOther{
  78. ReviewNum: sqlAttrs.ReviewNum,
  79. ReviewTime: sqlAttrs.ReviewTime,
  80. Sleep: sqlAttrs.Sleep,
  81. Size: sqlAttrs.Size,
  82. }
  83. return
  84. }
  85. func (ar *attr) getSQLAttrs(c context.Context) (res *model.SQLAttrs, err error) {
  86. res = new(model.SQLAttrs)
  87. row := ar.d.SearchDB.QueryRow(c, _getAttrsSQL, ar.appID)
  88. //fmt.Println("appID", ar.appID)
  89. if err = row.Scan(&res.AppID, &res.DBName, &res.ESName, &res.TablePrefix, &res.TableFormat, &res.IndexAliasPrefix, &res.IndexVersion, &res.IndexFormat, &res.IndexType, &res.IndexID, &res.IndexMapping,
  90. &res.DataIndexSuffix, &res.ReviewNum, &res.ReviewTime, &res.Sleep, &res.Size, &res.Business, &res.DataFields, &res.DataExtraInfo, &res.SQLByID, &res.SQLByMTime, &res.SQLByIDMTime, &res.DatabusInfo, &res.DatabusIndexID); err != nil {
  91. if err == sql.ErrNoRows {
  92. err = nil
  93. res = nil
  94. }
  95. }
  96. return
  97. }
  98. func (ar *attr) parseTable(sqlAttrs *model.SQLAttrs) (err error) {
  99. table := new(model.AttrTable)
  100. table.TablePrefix = sqlAttrs.TablePrefix
  101. table.TableFormat = sqlAttrs.TableFormat
  102. tableFormat := strings.Split(table.TableFormat, ",")
  103. if len(tableFormat) != 5 {
  104. err = fmt.Errorf("wrong tableForamt(%s)", tableFormat)
  105. return
  106. }
  107. if table.TableSplit = tableFormat[0]; table.TableSplit != "single" {
  108. if table.TableFrom, err = strconv.Atoi(tableFormat[1]); err != nil {
  109. return
  110. }
  111. if table.TableTo, err = strconv.Atoi(tableFormat[2]); err != nil {
  112. return
  113. }
  114. }
  115. table.TableZero = tableFormat[3]
  116. table.TableFixed = (tableFormat[4] == "fixed")
  117. ar.attrs.Table = table
  118. return
  119. }
  120. func (ar *attr) parseIndex(sqlAttrs *model.SQLAttrs) (err error) {
  121. index := new(model.AttrIndex)
  122. index.IndexAliasPrefix = sqlAttrs.IndexAliasPrefix
  123. index.IndexEntityPrefix = sqlAttrs.IndexAliasPrefix + sqlAttrs.IndexVersion
  124. index.IndexFormat = sqlAttrs.IndexFormat
  125. index.IndexType = sqlAttrs.IndexType
  126. index.IndexID = sqlAttrs.IndexID
  127. index.IndexMapping = sqlAttrs.IndexMapping
  128. indexFormat := strings.Split(index.IndexFormat, ",")
  129. if len(indexFormat) != 5 {
  130. err = fmt.Errorf("wrong indexFormat(%s)", indexFormat)
  131. return
  132. }
  133. if index.IndexID == "base" {
  134. err = fmt.Errorf("indexID Prohibition 'base' (%s)", indexFormat)
  135. return
  136. }
  137. if index.IndexSplit = indexFormat[0]; index.IndexSplit != "single" {
  138. if index.IndexFrom, err = strconv.Atoi(indexFormat[1]); err != nil {
  139. return
  140. }
  141. if index.IndexTo, err = strconv.Atoi(indexFormat[2]); err != nil {
  142. return
  143. }
  144. }
  145. index.IndexZero = indexFormat[3]
  146. index.IndexFixed = (indexFormat[4] == "fixed")
  147. ar.attrs.Index = index
  148. return
  149. }
  150. func (ar *attr) parseDataSQL(sqlAttrs *model.SQLAttrs) (err error) {
  151. dataSQL := new(model.AttrDataSQL)
  152. dataSQL.DataIndexFormatFields = make(map[string]string)
  153. dataSQL.DataDtbFields = make(map[string][]string)
  154. dataSQL.DataFieldsV2 = make(map[string]model.AttrDataFields)
  155. dataSQL.DataIndexSuffix = sqlAttrs.DataIndexSuffix
  156. dataSQL.DataFields = sqlAttrs.DataFields
  157. dataSQL.DataExtraInfo = sqlAttrs.DataExtraInfo
  158. if dataSQL.DataFields == "" {
  159. return
  160. }
  161. p := []model.AttrDataFields{} //DataFieldsV2
  162. sqlFields := []string{}
  163. if e := json.Unmarshal([]byte(dataSQL.DataFields), &p); e != nil {
  164. fields := strings.Split(dataSQL.DataFields, ",")
  165. for _, v := range fields {
  166. exp := strings.Split(v, ":")
  167. indexFieldName := exp[0]
  168. dataSQL.DataIndexFields = append(dataSQL.DataIndexFields, indexFieldName)
  169. sqlFields = append(sqlFields, exp[1])
  170. dataSQL.DataIndexFormatFields[indexFieldName] = exp[2]
  171. if exp[3] == "n" {
  172. dataSQL.DataIndexRemoveFields = append(dataSQL.DataIndexRemoveFields, indexFieldName)
  173. }
  174. }
  175. } else {
  176. // json方式
  177. for _, v := range p {
  178. dataSQL.DataFieldsV2[v.ESField] = v
  179. dataSQL.DataIndexFields = append(dataSQL.DataIndexFields, v.ESField)
  180. sqlFields = append(sqlFields, v.SQL)
  181. dataSQL.DataIndexFormatFields[v.ESField] = v.Expect
  182. if v.Stored == "n" {
  183. dataSQL.DataIndexRemoveFields = append(dataSQL.DataIndexRemoveFields, v.ESField)
  184. }
  185. if v.InDtb == "y" {
  186. dataSQL.DataDtbFields[v.Field] = append(dataSQL.DataDtbFields[v.Field], v.ESField)
  187. }
  188. }
  189. }
  190. //fmt.Println(dataSQL.DataDtbFields)
  191. //sqlFields顺序和attr.DataIndexFields要一致
  192. if (len(sqlFields) != len(dataSQL.DataIndexFields)) && (len(sqlFields) == 0 || len(dataSQL.DataIndexFields) == 0) {
  193. log.Error("sqlFields and attr.DataIndexFields are different")
  194. return
  195. }
  196. dataSQL.SQLFields = strings.Join(sqlFields, ",")
  197. if ar.attrs.Table.TableSplit == "single" {
  198. dataSQL.SQLByID = fmt.Sprintf(sqlAttrs.SQLByID, dataSQL.SQLFields)
  199. dataSQL.SQLByMTime = fmt.Sprintf(sqlAttrs.SQLByMTime, dataSQL.SQLFields)
  200. dataSQL.SQLByIDMTime = fmt.Sprintf(sqlAttrs.SQLByIDMTime, dataSQL.SQLFields)
  201. } else {
  202. dataSQL.SQLByID = sqlAttrs.SQLByID
  203. dataSQL.SQLByMTime = sqlAttrs.SQLByMTime
  204. dataSQL.SQLByIDMTime = sqlAttrs.SQLByIDMTime
  205. }
  206. ar.attrs.DataSQL = dataSQL
  207. return
  208. }
  209. func (ar *attr) parseExtraData(sqlAttrs *model.SQLAttrs) (err error) {
  210. if sqlAttrs.DataExtraInfo != "" {
  211. err = json.Unmarshal([]byte(sqlAttrs.DataExtraInfo), &ar.attrs.DataExtras)
  212. }
  213. // append all format field from extra data
  214. for _, v := range ar.attrs.DataExtras {
  215. if v.FieldsStr == "" {
  216. continue
  217. }
  218. fields := strings.Split(v.FieldsStr, ",")
  219. for _, v := range fields {
  220. exp := strings.Split(v, ":")
  221. ar.attrs.DataSQL.DataIndexFormatFields[exp[0]] = exp[2]
  222. }
  223. }
  224. return
  225. }
  226. func (ar *attr) parseDatabus(sqlAttrs *model.SQLAttrs) (err error) {
  227. dtb := new(model.AttrDatabus)
  228. if sqlAttrs.DatabusInfo != "" {
  229. databusInfo := strings.Split(sqlAttrs.DatabusInfo, ",")
  230. if len(databusInfo) != 3 {
  231. err = fmt.Errorf("wrong databusInfo(%s)", databusInfo)
  232. return
  233. }
  234. dtb.Databus = databusInfo[0]
  235. if dtb.AggCount, err = strconv.Atoi(databusInfo[1]); err != nil {
  236. return
  237. }
  238. if dtb.Ticker, err = strconv.Atoi(databusInfo[2]); err != nil {
  239. return
  240. }
  241. }
  242. if sqlAttrs.DatabusIndexID != "" {
  243. databusIndexID := strings.Split(sqlAttrs.DatabusIndexID, ":")
  244. if len(databusIndexID) != 2 {
  245. err = fmt.Errorf("wrong databusIndexID(%s)", databusIndexID)
  246. return
  247. }
  248. dtb.PrimaryID = databusIndexID[0]
  249. dtb.RelatedID = databusIndexID[1]
  250. }
  251. ar.attrs.Databus = dtb
  252. return
  253. }