import numpy as np
import cv2
import zlib
import time
import mmap
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst,GObject
import signal
import time
Gst.init(None)
"""
GST_ARGUS: 3264 x 2464 FR = 21.000000 fps
GST_ARGUS: 3264 x 1848 FR = 28.000001 fps
GST_ARGUS: 1920 x 1080 FR = 29.999999 fps
GST_ARGUS: 1640 x 1232 FR = 29.999999 fps
GST_ARGUS: 1280 x 720 FR = 59.999999 fps
GST_ARGUS: 1280 x 720 FR = 120.000005 fps
"""
class CustomMjpeg:
max_height = 0
max_width = 0
key_height = 0
key_height = 0
height = 0
width = 0
key_data = []
data = []
is_key = 0
def __init__(self, sensorid,
width=3264, height=2464, framerate=21, min_width=None , min_height=None):
BASE_GST_STR = 'nvarguscamerasrc sensor-id=' + str(sensorid) + " "
BASE_GST_STR += '! video/x-raw(memory:NVMM), width=' + str(width) +\
', height=' + str(height) +\
', format=(string)NV12, framerate=(fraction)'\
+ str(framerate) + '/1\
! nvvidconv'
if None is min_width or None is min_height:
pass
else:
BASE_GST_STR += ' ! video/x-raw(memory:NVMM)\
, width=(int)' + str(min_width) +\
', height=(int)' + str(min_height) +\
'! nvvidconv'
#' ! nvjpegenc ! appsink '
BASE_GST_STR += '! nvvidconv ! nvjpegenc ! appsink '
# ! nvvidconv ! nvjpegenc ! shmsink socket-path='+str(self.shm_file_path)
# ! nvvidconv ! nvjpegenc ! shmsink socket-path='+str(self.shm_file_path) + 'sync=false async=false'
# GStreamerパイプラインを作成
self.video_capture = cv2.VideoCapture(BASE_GST_STR, cv2.CAP_GSTREAMER)
# パイプラインのバスを取得
# 共有メモリからデータを読み込むコールバックを登録
# self.bus.add_signal_watch()
# self.bus.connect("message::element", self.__read_jpeg_data)
# パイプラインを再生
def __del__(self):
# パイプラインを停止
self.video_capture.release()
pass
def get_jpeg_data(self):
# print("step1")
ret,data = self.video_capture.read()
#image = cv2.imdecode(data[0],cv2.IMREAD_COLOR)
# print(len(data[0]))
if ret:
return data[0]
else:
return None
if __name__ == "__main__":
cj1 = CustomMjpeg(0,1640,1232,30)
cj2 = CustomMjpeg(1,1640,1232,30)
st = time.time()
while True:
cj1.get_jpeg_data()
cj2.get_jpeg_data()
print(1/(time.time()-st))
st = time.time()
loop = GObject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pass