upload.py 7.9 KB


  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import json
  4. from threading import Lock
  5. import logging
  6. import datetime
  7. import os
  8. import time
  9. import shutil
  10. import paramiko
  11. import scp
  12. default_max_length = 1000
  13. class Handle:
  14. def __init__(self, max_length, temp_file, official_dir, rubbish,
  15. host, port, user, password, remote_dir, interval=5 * 60):
  16. """
  17. :param max_length: Every 1000 rows of data are deposited in an official file.
  18. :param temp_file: Temporary file path.
  19. :param official_dir: Official folder Path.
  20. :param rubbish: A temporary file recycle bin that periodically empties the folder.
  21. :param host: Remote server address for uploading files.
  22. :param port: Remote server port for uploading files.
  23. :param user: Remote server username for uploading files.
  24. :param password: Remote server password for uploading files.
  25. :param remote_dir: Remote server folder for uploading files.
  26. :param interval: Put temporary files into official files to upload at regular intervals.
  27. """
  28. self.lock = Lock()
  29. # Data results collected by multiple plug-ins need to be queued to write to files.
  30. # The Q is the queue.
  31. self.Q = []
  32. self.max_item_length = max_length
  33. self.temp_file = temp_file
  34. self.official_dir = official_dir
  35. self.interval = interval
  36. self.current_row = 0
  37. self.rubbish = rubbish
  38. self.host = host
  39. self.port = port
  40. self.user = user
  41. self.password = password
  42. self.remote_dir = remote_dir
  43. def add_task(self, content):
  44. if not isinstance(content, dict):
  45. logging.warning('[HANDLE] add task content: %s but type is not dict', content)
  46. return
  47. self.lock.acquire()
  48. content = self.content_format(content)
  49. self.Q.append(content)
  50. self.lock.release()
  51. @staticmethod
  52. def content_format(content):
  53. if not content.__contains__('host'):
  54. content['host'] = ''
  55. if not content.__contains__('expr'):
  56. content['expr'] = 1
  57. if not content.__contains__('dev_id'):
  58. content['dev_id'] = ''
  59. if not content.__contains__('port'):
  60. content['port'] = ''
  61. if not content.__contains__('type'):
  62. content['type'] = ''
  63. if not content.__contains__('schema'):
  64. content['schema'] = ''
  65. if not content.__contains__('timestamp'):
  66. content['timestamp'] = int(datetime.datetime.now().timestamp() * 1000)
  67. if not content.__contains__('username'):
  68. content['username'] = ''
  69. # if not content.__contains__('pwd'):
  70. # content['pwd'] = ''
  71. if not content.__contains__('kpi'):
  72. content['kpi'] = {}
  73. return content
  74. def to_file(self, name):
  75. try:
  76. shutil.copy(self.temp_file, name)
  77. uploader = Uploader(self.host, self.port, self.user, self.password, self.official_dir, self.remote_dir)
  78. err = uploader.upload()
  79. if err is not None:
  80. logging.error('[HANDLE] upload file with '
  81. 'host: %s, port: %d, user: %s, password: %s, local: %s, error: %s',
  82. self.host, self.port, self.user, self.password, name, err)
  83. return err
  84. for f in os.listdir(self.official_dir):
  85. if os.path.isdir(f):
  86. continue
  87. shutil.move(name, self.rubbish)
  88. except Exception as e:
  89. return e
  90. return None
  91. def listen(self):
  92. temp_file_mode = 'a+'
  93. temp_file_ptr = self.create_temp_file(temp_file_mode)
  94. beg = int(datetime.datetime.now().timestamp())
  95. end = beg
  96. q = []
  97. while True:
  98. self.sleep()
  99. end = int(datetime.datetime.now().timestamp())
  100. self.lock.acquire()
  101. q = self.Q
  102. self.Q = []
  103. self.lock.release()
  104. if len(q) < 1:
  105. continue
  106. # Open temp file ptr. It might get None again, so execute continue one more time.
  107. if temp_file_ptr is None or temp_file_ptr.closed:
  108. temp_file_ptr = self.create_temp_file(temp_file_mode)
  109. # continue
  110. for item in q:
  111. item_value = ''
  112. try:
  113. item_value = json.dumps(item)
  114. temp_file_ptr.write(item_value + '\n')
  115. self.current_row += 1
  116. except Exception as e:
  117. logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
  118. logging.debug('[HANDLE] listen and ')
  119. # If the currently recorded data line is greater than or equal to the largest data line,
  120. # the temporary file is written to the official file.
  121. # If the file has not been saved for more than three minutes, and the temporary file is
  122. # not empty, an official file is also saved.
  123. official_file = ''
  124. if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0):
  125. # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
  126. try:
  127. now = int(datetime.datetime.now().timestamp() * 1000)
  128. official_file = os.path.join(self.official_dir, str(now) + '.tok')
  129. temp_file_ptr.close()
  130. msg = self.to_file(official_file)
  131. if msg is not None:
  132. logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg)
  133. temp_file_mode = 'a+'
  134. continue
  135. temp_file_mode = 'w+'
  136. self.current_row = 0
  137. beg = int(datetime.datetime.now().timestamp())
  138. except Exception as e:
  139. logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s',
  140. self.temp_file, official_file, e)
  141. def stop(self):
  142. pass
  143. def create_temp_file(self, mode):
  144. try:
  145. f = open(self.temp_file, mode=mode)
  146. if mode == 'w+':
  147. f.seek(0)
  148. f.truncate()
  149. return f
  150. except Exception as e:
  151. logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e)
  152. return None
  153. @staticmethod
  154. def sleep():
  155. time.sleep(1)
  156. defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor', 'rubbish', '', '', '', '', '', )
  157. def add_task(content):
  158. defaultHandler.add_task(content)
  159. def listen():
  160. defaultHandler.listen()
  161. class Uploader:
  162. def __init__(self, host, port, username, password, local, remote):
  163. self.host = host
  164. self.port = port
  165. self.username = username
  166. self.password = password
  167. self.local = local
  168. self.remote = remote
  169. def upload(self):
  170. """
  171. Upload files or folders to remote servers via SCP
  172. :return:
  173. """
  174. try:
  175. ssh_client = paramiko.SSHClient()
  176. ssh_client.load_system_host_keys()
  177. ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  178. ssh_client.connect(self.host, self.port, self.username, self.password)
  179. scp_client = scp.SCPClient(ssh_client.get_transport())
  180. scp_client.put(self.local, self.remote, recursive=True)
  181. ssh_client.close()
  182. return None
  183. except Exception as e:
  184. logging.error('[UPLOAD] upload files with '
  185. 'host: %s, port: %d, username: %s, password: %s, local: %s, remote: %s error: %s',
  186. self.host, self.port, self.username, self.password, self.local, self.remote, e)
  187. return e