upload.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import json
  4. from threading import Lock
  5. import logging
  6. import datetime
  7. from os import path
  8. import time
  9. import shutil
  10. import paramiko
  11. import scp
  12. default_max_length = 1000
  13. class Handle:
  14. def __init__(self, max_length, temp_file, official_dir, interval=5 * 60):
  15. """
  16. :param max_length: Every 1000 rows of data are deposited in an official file.
  17. :param temp_file: Temporary file path.
  18. :param official_dir: Official folder Path.
  19. :param interval: Put temporary files into official files to upload at regular intervals.
  20. """
  21. self.lock = Lock()
  22. # Data results collected by multiple plug-ins need to be queued to write to files.
  23. # The Q is the queue.
  24. self.Q = []
  25. self.max_item_length = max_length
  26. self.temp_file = temp_file
  27. self.official_dir = official_dir
  28. self.interval = interval
  29. self.current_row = 0
  30. def add_task(self, content):
  31. if not isinstance(content, dict):
  32. logging.warning('[HANDLE] add task content: %s but type is not dict', content)
  33. return
  34. self.lock.acquire()
  35. content = self.content_format(content)
  36. self.Q.append(content)
  37. self.lock.release()
  38. @staticmethod
  39. def content_format(content):
  40. if not content.__contains__('dev_ip'):
  41. content['dev_ip'] = ''
  42. if not content.__contains__('expr'):
  43. content['expr'] = 1
  44. if not content.__contains__('dev_id'):
  45. content['dev_id'] = ''
  46. if not content.__contains__('port'):
  47. content['port'] = ''
  48. if not content.__contains__('type'):
  49. content['type'] = ''
  50. if not content.__contains__('schema'):
  51. content['schema'] = ''
  52. if not content.__contains__('timestamp'):
  53. content['timestamp'] = int(datetime.datetime.now().timestamp() * 1000)
  54. if not content.__contains__('login_name'):
  55. content['login_name'] = ''
  56. # if not content.__contains__('pwd'):
  57. # content['pwd'] = ''
  58. if not content.__contains__('kpi'):
  59. content['kpi'] = {}
  60. return content
  61. def to_file(self, name):
  62. try:
  63. shutil.copy(self.temp_file, name)
  64. except Exception as e:
  65. return e
  66. return None
  67. def listen(self):
  68. temp_file_mode = 'a+'
  69. temp_file_ptr = self.create_temp_file(temp_file_mode)
  70. beg = int(datetime.datetime.now().timestamp())
  71. end = beg
  72. q = []
  73. while True:
  74. self.sleep()
  75. end = int(datetime.datetime.now().timestamp())
  76. self.lock.acquire()
  77. q = self.Q
  78. self.Q = []
  79. self.lock.release()
  80. if len(q) < 1:
  81. continue
  82. # Open temp file ptr. It might get None again, so execute continue one more time.
  83. if temp_file_ptr is None or temp_file_ptr.closed:
  84. temp_file_ptr = self.create_temp_file(temp_file_mode)
  85. # continue
  86. for item in q:
  87. item_value = ''
  88. try:
  89. item_value = json.dumps(item)
  90. temp_file_ptr.write(item_value + '\n')
  91. self.current_row += 1
  92. except Exception as e:
  93. logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
  94. logging.debug('[HANDLE] listen and ')
  95. # If the currently recorded data line is greater than or equal to the largest data line,
  96. # the temporary file is written to the official file.
  97. # If the file has not been saved for more than three minutes, and the temporary file is
  98. # not empty, an official file is also saved.
  99. official_file = ''
  100. if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0):
  101. # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
  102. try:
  103. now = int(datetime.datetime.now().timestamp() * 1000)
  104. official_file = path.join(self.official_dir, str(now) + '.tok')
  105. temp_file_ptr.close()
  106. msg = self.to_file(official_file)
  107. if msg is not None:
  108. logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg)
  109. temp_file_mode = 'a+'
  110. continue
  111. temp_file_mode = 'w+'
  112. self.current_row = 0
  113. beg = int(datetime.datetime.now().timestamp())
  114. except Exception as e:
  115. logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s',
  116. self.temp_file, official_file, e)
  117. def stop(self):
  118. pass
  119. def create_temp_file(self, mode):
  120. try:
  121. f = open(self.temp_file, mode=mode)
  122. if mode == 'w+':
  123. f.seek(0)
  124. f.truncate()
  125. return f
  126. except Exception as e:
  127. logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e)
  128. return None
  129. @staticmethod
  130. def sleep():
  131. time.sleep(1)
  132. defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor')
  133. def add_task(content):
  134. defaultHandler.add_task(content)
  135. def listen():
  136. defaultHandler.listen()
  137. class Uploader:
  138. def __init__(self, host, port, username, password, local, remote):
  139. self.host = host
  140. self.port = port
  141. self.username = username
  142. self.password = password
  143. self.local = local
  144. self.remote = remote
  145. def upload(self):
  146. try:
  147. ssh_client = paramiko.SSHClient()
  148. ssh_client.load_system_host_keys()
  149. ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  150. ssh_client.connect(self.host, self.port, self.username, self.password)
  151. scp_client = scp.SCPClient(ssh_client.get_transport())
  152. scp_client.put(self.local, self.remote, recursive=True)
  153. return None
  154. except Exception as e:
  155. logging.error('[UPLOAD] upload files with '
  156. 'host: %s, port: %d, username: %s, password: %s, local: %s, remote: %s error: %s',
  157. self.host, self.port, self.username, self.password, self.local, self.remote, e)
  158. return e