#!/usr/bin/env python # -*- coding:utf-8 -*- import json from threading import Lock import logging import datetime import os import time import shutil import paramiko import scp default_max_length = 1000 class Handle: def __init__(self, max_length, temp_file, official_dir, rubbish, host, port, user, password, remote_dir, interval=5 * 60): """ :param max_length: Every 1000 rows of data are deposited in an official file. :param temp_file: Temporary file path. :param official_dir: Official folder Path. :param rubbish: A temporary file recycle bin that periodically empties the folder. :param host: Remote server address for uploading files. :param port: Remote server port for uploading files. :param user: Remote server username for uploading files. :param password: Remote server password for uploading files. :param remote_dir: Remote server folder for uploading files. :param interval: Put temporary files into official files to upload at regular intervals. """ self.lock = Lock() # Data results collected by multiple plug-ins need to be queued to write to files. # The Q is the queue. self.Q = [] self.max_item_length = max_length self.temp_file = temp_file self.official_dir = official_dir self.interval = interval self.current_row = 0 self.rubbish = rubbish self.host = host self.port = port self.user = user self.password = password self.remote_dir = remote_dir def add_task(self, content): if not isinstance(content, dict): logging.warning('[HANDLE] add task content: %s but type is not dict', content) return self.lock.acquire() content = self.content_format(content) self.Q.append(content) self.lock.release() @staticmethod def content_format(content): if not content.__contains__('host'): content['host'] = '' if not content.__contains__('expr'): content['expr'] = 1 if not content.__contains__('dev_id'): content['dev_id'] = '' if not content.__contains__('port'): content['port'] = '' if not content.__contains__('type'): content['type'] = '' if not content.__contains__('schema'): content['schema'] = '' if not content.__contains__('timestamp'): content['timestamp'] = int(datetime.datetime.now().timestamp() * 1000) if not content.__contains__('username'): content['username'] = '' # if not content.__contains__('pwd'): # content['pwd'] = '' if not content.__contains__('kpi'): content['kpi'] = {} return content def to_file(self, name): try: shutil.copy(self.temp_file, name) uploader = Uploader(self.host, self.port, self.user, self.password, self.official_dir, self.remote_dir) err = uploader.upload() if err is not None: logging.error('[HANDLE] upload file with ' 'host: %s, port: %d, user: %s, password: %s, local: %s, error: %s', self.host, self.port, self.user, self.password, name, err) return err for f in os.listdir(self.official_dir): if os.path.isdir(f): continue shutil.move(name, self.rubbish) except Exception as e: return e return None def listen(self): temp_file_mode = 'a+' temp_file_ptr = self.create_temp_file(temp_file_mode) beg = int(datetime.datetime.now().timestamp()) end = beg q = [] while True: self.sleep() end = int(datetime.datetime.now().timestamp()) self.lock.acquire() q = self.Q self.Q = [] self.lock.release() if len(q) < 1: continue # Open temp file ptr. It might get None again, so execute continue one more time. if temp_file_ptr is None or temp_file_ptr.closed: temp_file_ptr = self.create_temp_file(temp_file_mode) # continue for item in q: item_value = '' try: item_value = json.dumps(item) temp_file_ptr.write(item_value + '\n') self.current_row += 1 except Exception as e: logging.error("[HANDLE] write item(%s) error :%s", item_value, e) logging.debug('[HANDLE] listen and ') # If the currently recorded data line is greater than or equal to the largest data line, # the temporary file is written to the official file. # If the file has not been saved for more than three minutes, and the temporary file is # not empty, an official file is also saved. official_file = '' if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0): # logging.error("row: %d, max length: %d, end - beg: %d", self.current_row, self.max_item_length,end - beg) try: now = int(datetime.datetime.now().timestamp() * 1000) official_file = os.path.join(self.official_dir, str(now) + '.tok') temp_file_ptr.close() msg = self.to_file(official_file) if msg is not None: logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg) temp_file_mode = 'a+' continue temp_file_mode = 'w+' self.current_row = 0 beg = int(datetime.datetime.now().timestamp()) except Exception as e: logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s', self.temp_file, official_file, e) def stop(self): pass def create_temp_file(self, mode): try: f = open(self.temp_file, mode=mode) if mode == 'w+': f.seek(0) f.truncate() return f except Exception as e: logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e) return None @staticmethod def sleep(): time.sleep(1) defaultHandler = Handle(default_max_length, '/tmp/temp.txt', '/data/elec-monitor', 'rubbish', '', '', '', '', '', ) def add_task(content): defaultHandler.add_task(content) def listen(): defaultHandler.listen() class Uploader: def __init__(self, host, port, username, password, local, remote): self.host = host self.port = port self.username = username self.password = password self.local = local self.remote = remote def upload(self): """ Upload files or folders to remote servers via SCP :return: """ try: ssh_client = paramiko.SSHClient() ssh_client.load_system_host_keys() ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(self.host, self.port, self.username, self.password) scp_client = scp.SCPClient(ssh_client.get_transport()) scp_client.put(self.local, self.remote, recursive=True) ssh_client.close() return None except Exception as e: logging.error('[UPLOAD] upload files with ' 'host: %s, port: %d, username: %s, password: %s, local: %s, remote: %s error: %s', self.host, self.port, self.username, self.password, self.local, self.remote, e) return e