#!/usr/bin/env python # -*- coding:utf-8 -*- import json from threading import Lock import logging import datetime from os import path import time import shutil class Handle: def __init__(self, max_length, temp_file, official_dir, interval=5 * 60): """ :param max_length: Every 1000 rows of data are deposited in an official file. :param temp_file: Temporary file path. :param official_dir: Official folder Path. :param interval: Put temporary files into official files to upload at regular intervals. """ self.lock = Lock() # Data results collected by multiple plug-ins need to be queued to write to files. # The Q is the queue. self.Q = [] self.max_item_length = max_length self.temp_file = temp_file self.official_dir = official_dir self.interval = interval self.current_row = 0 def add_task(self, content): if not isinstance(content, dict): logging.warning('[HANDLE] add task content: %s but type is not dict', content) return self.lock.acquire() content = self.content_format(content) self.Q.append(content) self.lock.release() @staticmethod def content_format(content): if not content.__contains__('dev_ip'): content['dev_ip'] = '' if not content.__contains__('expr'): content['expr'] = 1 if not content.__contains__('dev_id'): content['dev_id'] = '' if not content.__contains__('port'): content['port'] = '' if not content.__contains__('type'): content['type'] = '' if not content.__contains__('schema'): content['schema'] = '' if not content.__contains__('login_name'): content['login_name'] = '' if not content.__contains__('pwd'): content['pwd'] = '' if not content.__contains__('kpi'): content['kpi'] = {} return content def to_file(self, name): try: shutil.copy(self.temp_file, name) except Exception as e: return e return None def listen(self): temp_file_mode = 'a+' temp_file_ptr = self.create_temp_file(temp_file_mode) beg = int(datetime.datetime.now().timestamp()) end = beg while True: self.sleep() end = int(datetime.datetime.now().timestamp()) self.lock.acquire() q = self.Q self.lock.release() if len(q) < 1: continue # Open temp file ptr. It might get None again, so execute continue one more time. if temp_file_ptr is None or temp_file_ptr.closed: temp_file_ptr = self.create_temp_file(temp_file_mode) continue for item in q: item_value = '' try: item_value = json.dumps(item) temp_file_ptr.write(item_value) self.current_row += 1 except Exception as e: logging.error("[HANDLE] write item(%s) error :%s", item_value, e) logging.debug('[HANDLE] listen and ') # If the currently recorded data line is greater than or equal to the largest data line, # the temporary file is written to the official file. # If the file has not been saved for more than three minutes, and the temporary file is # not empty, an official file is also saved. official_file = '' if self.current_row >= self.max_item_length or (end - beg >= 3 * 60 and self.current_row > 0): try: now = int(datetime.datetime.now().timestamp() * 1000) official_file = path.join(self.official_dir, str(now) + '.tok') msg = self.to_file(official_file) if msg is not None: logging.error('[HANDLE] to official file(%s) error: %s', official_file, msg) temp_file_mode = 'a+' continue temp_file_ptr.close() temp_file_mode = 'w+' self.current_row = 0 beg = int(datetime.datetime.now().timestamp()) except Exception as e: logging.error('[HANDLE] copy temp file(%s) to official file(%s) error: %s', self.temp_file, official_file, e) def create_temp_file(self, mode): try: f = open(self.temp_file, mode=mode) return f except Exception as e: logging.error('[HANDLE] create temp file(%s) error: %s', self.temp_file, e) return None @staticmethod def sleep(): time.sleep(1) class Uploader: def __init__(self): pass