#!/usr/bin/env python # -*- coding:utf-8 -*- import unittest from plugins import base import uuid import logging from threading import Lock import datetime import schedule import time mock_lock = Lock() fail_num = success_num = 0 class MockPlugin(base.Base): def __init__(self, *args, **kwargs): super(MockPlugin, self).__init__(*args, **kwargs) if kwargs.__contains__('name'): self.name = kwargs['name'] else: self.name = 'ipc' + uuid.uuid1().hex[0:10] self.host = kwargs['host'] self.port = kwargs['port'] self.user = kwargs['username'] self.password = kwargs['pwd'] self.args = args self.kwargs = kwargs def name(self): return self.name def config_check(self, *args, **kwargs): if kwargs.__contains__('error'): return kwargs['error'] return None @staticmethod def record(is_success): global success_num, fail_num mock_lock.acquire() if is_success: success_num += 1 else: fail_num += 1 mock_lock.release() def exec(self): err = self.config_check(**self.kwargs) if err is not None: logging.error('[MockPlugin] check config error: %s', err) self.record(False) return self.record(True) class ScheduleTest(unittest.TestCase): @staticmethod def generate_valid_config(success_count, fail_count): config = { 'id': int(datetime.datetime.now().timestamp()) } data = [] for i in range(success_count): item = { 'host': 'local.pc', 'port': 2000, 'dev_id': uuid.uuid1(), 'type': 'mock_plugin', 'schema': 'onvif', 'username': 'admin', 'pwd': '123456', 'expr': 10, 'kpi': { 'key1': 1 } } data.append(item) for i in range(fail_count): item = { 'host': 'local.pc', 'port': 2000, 'dev_id': uuid.uuid1(), 'type': 'mock_plugin', 'schema': 'onvif', 'username': 'admin', 'pwd': '123456', 'expr': 10, 'kpi': { 'key1': 1 }, 'error': 'data error' } data.append(item) config['data'] = data return config @staticmethod def generate_invalid_config(count): config = { 'id': int(datetime.datetime.now().timestamp()) } keys = ['host', 'port', 'dev_id', 'type', 'schema', 'username', 'pwd'] data = [] for count_index in range(count): item = {} index = count_index % len(keys) for i, k in enumerate(keys): if index == i: continue if k == 'port': item['port'] = 2000 if k == 'type': item['type'] = 'invalid_type' else: item[k] = k data.append(item) config['data'] = data return config @staticmethod def generate_non_object(count): config = { 'id': int(datetime.datetime.now().timestamp()) } data = [] for index in range(count): data.append(index) config['data'] = data return config @staticmethod def reset(): global success_num, fail_num success_num = 0 fail_num = 0 def test_schedule_serial(self): # config is None scheduler = schedule.Schedule(None) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(0, success_num) self.assertEqual(0, fail_num) self.reset() # config have no field data scheduler = schedule.Schedule({'id': int(datetime.datetime.now().timestamp())}) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(0, success_num) self.assertEqual(0, fail_num) self.reset() # config.data is not list scheduler = schedule.Schedule({ 'id': int(datetime.datetime.now().timestamp()), 'data': 10 }) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(0, success_num) self.assertEqual(0, fail_num) self.reset() # valid config, 10 success, 10 fail. config = self.generate_valid_config(10, 10) scheduler = schedule.Schedule(config) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(10, success_num) self.assertEqual(10, fail_num) self.reset() # invalid config, 0 success, 0 fail. config = self.generate_invalid_config(10) scheduler = schedule.Schedule(config) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) # As invalid config, plugin won't be execute. self.assertEqual(0, success_num) self.assertEqual(0, fail_num) self.reset() # non-object config, 0 success, 0 fail. config = self.generate_non_object(10) scheduler = schedule.Schedule(config) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(0, success_num) self.assertEqual(0, fail_num) self.reset() # invalid plugin type, 5 success, 0 fail config = self.generate_valid_config(10, 0) config_data = config['data'] for index, item in enumerate(config_data): if index == 5: break config_data[index]['type'] = 'mock_type' del config_data[index]['expr'] del config_data[index]['kpi'] config['data'] = config_data scheduler = schedule.Schedule(config) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(5, success_num) self.assertEqual(0, fail_num) self.reset() # multi config, 10 success, 10 fail config_data = [None] config = self.generate_valid_config(10, 10) invalid_config = self.generate_invalid_config(10) non_object_config = self.generate_non_object(10) invalid_type_config = self.generate_valid_config(10, 0) invalid_type_data = invalid_type_config['data'] for index, item in enumerate(invalid_type_data): invalid_type_data[index]['type'] = 'mock_type' config_data = config_data + config['data'] config_data = config_data + invalid_config['data'] config_data = config_data + non_object_config['data'] config_data = config_data + invalid_type_data config['data'] = config_data # check config self.assertEqual(len(config_data), 51) scheduler = schedule.Schedule(config) scheduler.plugins = { 'mock_plugin': MockPlugin } scheduler.deliver() time.sleep(1) self.assertEqual(10, success_num) self.assertEqual(10, fail_num) self.reset() def test_schedule_concurrent(self): pass if __name__ == '__main__': unittest.main()