リモコン移動作業ロボット:ソースコード:CustomMjpeg.py

import numpy as np
import cv2
import zlib
import time

import mmap
import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst,GObject
import signal
import time

Gst.init(None)
"""
GST_ARGUS: 3264 x 2464 FR = 21.000000 fps 
GST_ARGUS: 3264 x 1848 FR = 28.000001 fps 
GST_ARGUS: 1920 x 1080 FR = 29.999999 fps 
GST_ARGUS: 1640 x 1232 FR = 29.999999 fps 
GST_ARGUS: 1280 x 720 FR = 59.999999 fps 
GST_ARGUS: 1280 x 720 FR = 120.000005 fps 
"""
class CustomMjpeg:
    max_height = 0
    max_width = 0
    key_height = 0
    key_height = 0
    height = 0
    width = 0
    key_data = []
    data = []
    is_key = 0

    def __init__(self, sensorid, 
                 width=3264, height=2464, framerate=21, min_width=None , min_height=None):

        BASE_GST_STR = 'nvarguscamerasrc sensor-id=' + str(sensorid) + " "
        BASE_GST_STR += '! video/x-raw(memory:NVMM), width=' + str(width) +\
                        ', height=' + str(height) +\
                        ', format=(string)NV12, framerate=(fraction)'\
                         + str(framerate) + '/1\
                         ! nvvidconv'
        if None is min_width or None is min_height:
            pass    
        else:
            BASE_GST_STR += ' ! video/x-raw(memory:NVMM)\
                             , width=(int)' + str(min_width) +\
                            ', height=(int)' + str(min_height) +\
                            '! nvvidconv'
                            #' ! nvjpegenc ! appsink '
        BASE_GST_STR += '! nvvidconv !  nvjpegenc ! appsink '
#                        ! nvvidconv !  nvjpegenc ! shmsink socket-path='+str(self.shm_file_path)
#                        ! nvvidconv !  nvjpegenc ! shmsink socket-path='+str(self.shm_file_path) + 'sync=false async=false'

        # GStreamerパイプラインを作成
        self.video_capture = cv2.VideoCapture(BASE_GST_STR, cv2.CAP_GSTREAMER)
        # パイプラインのバスを取得
        # 共有メモリからデータを読み込むコールバックを登録
#        self.bus.add_signal_watch()
#        self.bus.connect("message::element", self.__read_jpeg_data)
        # パイプラインを再生
    
    def __del__(self):
        # パイプラインを停止
        self.video_capture.release()
        pass



    def get_jpeg_data(self):
#        print("step1")
        ret,data = self.video_capture.read()
        #image = cv2.imdecode(data[0],cv2.IMREAD_COLOR)
#        print(len(data[0]))
        if ret:
            return data[0]
        else:
            return None    
        
if __name__ == "__main__":
    cj1 = CustomMjpeg(0,1640,1232,30)
    cj2 = CustomMjpeg(1,1640,1232,30)
    st = time.time()
    while True:
        cj1.get_jpeg_data()
        cj2.get_jpeg_data()
        print(1/(time.time()-st))
        st = time.time()
    loop = GObject.MainLoop()
    try:
        loop.run()
    except KeyboardInterrupt:
        pass