ipc.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. import logging
  4. import math
  5. import cv2
  6. from PIL import Image, ImageStat
  7. from plugins import base
  8. from onvif import ONVIFCamera
  9. import datetime
  10. import zeep
  11. import uuid
  12. import upload
  13. def zeep_pythonvalue(self, xmlvalue):
  14. return xmlvalue
  15. zeep.xsd.simple.AnySimpleType.pythonvalue = zeep_pythonvalue
  16. class IPC(base.Base):
  17. def __init__(self, *args, **kwargs):
  18. super(IPC, self).__init__(*args, **kwargs)
  19. if kwargs.__contains__('name'):
  20. self.name = kwargs['name']
  21. else:
  22. self.name = 'ipc' + uuid.uuid1().hex[0:10]
  23. self.host = kwargs['host']
  24. self.port = kwargs['port']
  25. self.user = kwargs['user']
  26. self.password = kwargs['password']
  27. self.image_dir = kwargs['image_dir']
  28. self.wsdl_dir = 'resource/wsdl'
  29. if kwargs.__contains__('wsdl_dir') and kwargs['wsdl_dir'] != "":
  30. self.wsdl_dir = kwargs['wsdl_dir']
  31. self.kpi = kwargs['kpi']
  32. def name(self):
  33. return self.name
  34. def config_check(self, *args, **kwargs):
  35. pass
  36. def exec(self):
  37. image_name = self.snapshot()
  38. analyze_result = self.analyze()
  39. data = {'data': analyze_result, 'image': image_name}
  40. # upload.add_task(data)
  41. return
  42. def snapshot(self):
  43. camera = ONVIFCamera(self.host, self.port, self.user, self.password, self.wsdl_dir)
  44. media = camera.create_media_service()
  45. media_profile = media.GetProfiles()[0]
  46. profiles = media.GetProfiles()
  47. if len(profiles) < 1:
  48. return
  49. token = profiles[0]['token']
  50. snapshot_detail = media.GetSnapshotUri(token)
  51. snapshot_url = snapshot_detail['url']
  52. image_name = str(int(datetime.datetime.now().timestamp() * 1000)) + ".png"
  53. # download image via image url
  54. # urlretrieve(snapshot_url, path.join(self.image_dir, image_name))
  55. return image_name
  56. def analyze(self, path):
  57. if len(self.kpi) < 1:
  58. return
  59. kpi_result = {}
  60. if self.kpi.__contains__('img_cast'):
  61. kpi_result['img_cast'] = self.get_img_cast(path)
  62. if self.kpi.__contains__('img_ybcast'):
  63. kpi_result['img_ybcast'] = self.get_img_ybcast(path)
  64. if self.kpi.__contains__('img_light'):
  65. kpi_result['img_light'] = self.get_img_light(path)
  66. if self.kpi.__contains__('img_rgcast'):
  67. kpi_result['img_rgcast'] = self.get_img_rgcast(path)
  68. if self.kpi.__contains__('img_lightcast'):
  69. kpi_result['img_lightcast'] = self.get_img_lightcast(path)
  70. if self.kpi.__contains__('img_clear'):
  71. kpi_result['img_clear'] = self.get_img_clear(path)
  72. if self.kpi.__contains__('img_quality'):
  73. kpi_result['img_quality'] = self.get_img_quality(path)
  74. return kpi_result
  75. # https://blog.csdn.net/sparrowwf/article/details/86595162
  76. @staticmethod
  77. def get_img_cast(path):
  78. image = cv2.imread(path)
  79. img = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
  80. l_channel, a_channel, b_channel = cv2.split(img)
  81. h, w, _ = img.shape
  82. da = a_channel.sum() / (h * w) - 128
  83. db = b_channel.sum() / (h * w) - 128
  84. histA = [0] * 256
  85. histB = [0] * 256
  86. for i in range(h):
  87. for j in range(w):
  88. ta = a_channel[i][j]
  89. tb = b_channel[i][j]
  90. histA[ta] += 1
  91. histB[tb] += 1
  92. msqA = 0
  93. msqB = 0
  94. for y in range(256):
  95. msqA += float(abs(y - 128 - da)) * histA[y] / (w * h)
  96. msqB += float(abs(y - 128 - db)) * histB[y] / (w * h)
  97. import math
  98. result = math.sqrt(da * da + db * db) / math.sqrt(msqA * msqA + msqB * msqB)
  99. return result
  100. @staticmethod
  101. def get_img_ybcast(path):
  102. return 1
  103. # https://stackoverflow.com/questions/3490727/what-are-some-methods-to-analyze-image-brightness-using-python
  104. @staticmethod
  105. def get_img_light(path):
  106. try:
  107. im = Image.open(path)
  108. stat = ImageStat.Stat(im)
  109. r, g, b = stat.mean
  110. return math.sqrt(0.241 * (r ** 2) + 0.691 * (g ** 2) + 0.068 * (b ** 2))
  111. except Exception as e:
  112. logging.error('[IPC] calculate image light with file: %s error: %s', path, e)
  113. return 0
  114. @staticmethod
  115. def get_img_rgcast(path):
  116. return 1
  117. @staticmethod
  118. def get_img_lightcast(path):
  119. return 1
  120. # https://blog.csdn.net/luolinll1212/article/details/84107066
  121. @staticmethod
  122. def get_img_clear(path):
  123. image = cv2.imread(path)
  124. img2gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  125. result = cv2.Laplacian(img2gray, cv2.CV_64F).var()
  126. return result
  127. @staticmethod
  128. def get_img_quality(path):
  129. return 1