123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import json
- from threading import Lock
- import logging
- import datetime
- import os
- import time
- import shutil
- import paramiko
- import scp
- default_max_length = 1000
- class Handle:
- def __init__(self, max_length, temp_file, official_dir, rubbish,
- host, port, user, password, remote_dir, interval=5 * 60):
- """
- :param max_length: Every 1000 rows of data are deposited in an official file.
- :param temp_file: Temporary file path.
- :param official_dir: Official folder Path.
- :param rubbish: A temporary file recycle bin that periodically empties the folder.
- :param host: Remote server address for uploading files.
- :param port: Remote server port for uploading files.
- :param user: Remote server username for uploading files.
- :param password: Remote server password for uploading files.
- :param remote_dir: Remote server folder for uploading files.
- :param interval: Put temporary files into official files to upload at regular intervals.
- """
- self.lock = Lock()
- # Data results collected by multiple plug-ins need to be queued to write to files.
- # The Q is the queue.
- self.Q = []
- self.max_item_length = max_length
- self.temp_file = temp_file
- self.official_dir = official_dir
- self.interval = interval
- self.current_row = 0
- self.rubbish = rubbish
- self.host = host
- self.port = port
- self.user = user
- self.password = password
- self.remote_dir = remote_dir
- def add_task(self, content):
- if not isinstance(content, dict):
- logging.warning('[HANDLE] add task content: %s but type is not dict', content)
- return
- self.lock.acquire()
- content = self.content_format(content)
- self.Q.append(content)
- self.lock.release()
- @staticmethod
- def content_format(content):
- if not content.__contains__('host'):
- content['host'] = ''
- if not content.__contains__('expr'):
- content['expr'] = 1
- if not content.__contains__('dev_id'):
- content['dev_id'] = ''
- if not content.__contains__('port'):
- content['port'] = ''
- if not content.__contains__('type'):
- content['type'] = ''
- if not content.__contains__('schema'):
- content['schema'] = ''
- if not content.__contains__('timestamp'):
- content['timestamp'] = int(datetime.datetime.now().timestamp() * 1000)
- if not content.__contains__('username'):
- content['username'] = ''
- # if not content.__contains__('pwd'):
- # content['pwd'] = ''
- if not content.__contains__('kpi'):
- content['kpi'] = {}
- return content
- def to_file(self, name):
- try:
- shutil.copy(self.temp_file, name)
- uploader = Uploader(self.host, self.port, self.user, self.password, self.official_dir, self.remote_dir)
- err = uploader.upload()
- if err is not None:
- logging.error('[HANDLE] upload file with '
- 'host: %s, port: %d, user: %s, password: %s, local: %s, error: %s',
- self.host, self.port, self.user, self.password, name, err)
- return err
- for f in os.listdir(self.official_dir):
- if os.path.isdir(f):
- continue
- shutil.move(name, self.rubbish)
- except Exception as e:
- return e
- return None
- def listen(self):
- temp_file_mode = 'a+'
- temp_file_ptr = self.create_temp_file(temp_file_mode)
- beg = int(datetime.datetime.now().timestamp())
- end = beg
- q = []
- while True:
- self.sleep()
- end = int(datetime.datetime.now().timestamp())
- self.lock.acquire()
- q = self.Q
- self.Q = []
- self.lock.release()
- if len(q) < 1:
- continue
- # Open temp file ptr. It might get None again, so execute continue one more time.
- if temp_file_ptr is None or temp_file_ptr.closed:
- temp_file_ptr = self.create_temp_file(temp_file_mode)
- # continue
- for item in q:
- item_value = ''
- try:
- item_value = json.dumps(item)
- temp_file_ptr.write(item_value + '\n')
- self.current_row += 1
- except Exception as e:
- logging.error("[HANDLE] write item(%s) error :%s", item_value, e)
- logging.debug('[HANDLE] listen and ')
- # If the currently recorded data line is greater than or equal to the largest data line,
- # the temporary file is written to the official file.
- # If the file has not been saved for more than three minutes, and the temporary file is
- # not empty, an official file is also saved.
- official_file = ''
- if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0):
- # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg)
- try:
- now = int(datetime.datetime.now().timestamp() * 1000)
- official_file = os.path.join(self.official_dir, str(now) + '.tok')
- temp_file_ptr.close()
- msg = self.to_file(official_file)
- if msg is not None:
- logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg)
- temp_file_mode = 'a+'
- continue
- temp_file_mode = 'w+'
- self.current_row = 0
- beg = int(datetime.datetime.now().timestamp())
- except Exception as e:
- logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s',
- self.temp_file, official_file, e)
- def stop(self):
- pass
- def create_temp_file(self, mode):
- try:
- f = open(self.temp_file, mode=mode)
- if mode == 'w+':
- f.seek(0)
- f.truncate()
- return f
- except Exception as e:
- logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e)
- return None
- @staticmethod
- def sleep():
- time.sleep(1)
- defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor', 'rubbish', '', '', '', '', '', )
- def add_task(content):
- defaultHandler.add_task(content)
- def listen():
- defaultHandler.listen()
- class Uploader:
- def __init__(self, host, port, username, password, local, remote):
- self.host = host
- self.port = port
- self.username = username
- self.password = password
- self.local = local
- self.remote = remote
- def upload(self):
- """
- Upload files or folders to remote servers via SCP
- :return:
- """
- try:
- ssh_client = paramiko.SSHClient()
- ssh_client.load_system_host_keys()
- ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- ssh_client.connect(self.host, self.port, self.username, self.password)
- scp_client = scp.SCPClient(ssh_client.get_transport())
- scp_client.put(self.local, self.remote, recursive=True)
- ssh_client.close()
- return None
- except Exception as e:
- logging.error('[UPLOAD] upload files with '
- 'host: %s, port: %d, username: %s, password: %s, local: %s, remote: %s error: %s',
- self.host, self.port, self.username, self.password, self.local, self.remote, e)
- return e
|