Jetson nano を使用してカメラデータ取得とjpegデータ送信:ソースコード:CustomMjpeg.py

import numpy as np

import cv2

import zlib

import time

import mmap

import gi

gi.require_version(‘Gst’, ‘1.0’)

from gi.repository import Gst,GObject

import signal

import time

Gst.init(None)

“””

GST_ARGUS: 3264 x 2464 FR = 21.000000 fps

GST_ARGUS: 3264 x 1848 FR = 28.000001 fps

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps

GST_ARGUS: 1280 x 720 FR = 59.999999 fps

GST_ARGUS: 1280 x 720 FR = 120.000005 fps

“””

class CustomMjpeg:

    max_height = 0

    max_width = 0

    key_height = 0

    key_height = 0

    height = 0

    width = 0

    key_data = []

    data = []

    is_key = 0

    def __init__(self, sensorid,

                 width=3264, height=2464, framerate=21, min_width=None , min_height=None):

        BASE_GST_STR = ‘nvarguscamerasrc sensor-id=’ + str(sensorid) + ” “

        BASE_GST_STR += ‘! video/x-raw(memory:NVMM), width=’ + str(width) +\

                        ‘, height=’ + str(height) +\

                        ‘, format=(string)NV12, framerate=(fraction)’\

                         + str(framerate) + ‘/1\

                         ! nvvidconv’

        if None is min_width or None is min_height:

            pass    

        else:

            BASE_GST_STR += ‘ ! video/x-raw(memory:NVMM)\

                             , width=(int)’ + str(min_width) +\

                            ‘, height=(int)’ + str(min_height) +\

                            ‘ ! nvjpegenc ! appsink ‘

        BASE_GST_STR += ‘! nvvidconv !  nvjpegenc ! appsink ‘

#                        ! nvvidconv !  nvjpegenc ! shmsink socket-path=’+str(self.shm_file_path)

#                        ! nvvidconv !  nvjpegenc ! shmsink socket-path=’+str(self.shm_file_path) + ‘sync=false async=false’

        # GStreamerパイプラインを作成

        self.video_capture = cv2.VideoCapture(BASE_GST_STR, cv2.CAP_GSTREAMER)

    def __del__(self):

        # パイプラインを停止

        self.video_capture.release()

        pass

    def get_jpeg_data(self):

        ret,data = self.video_capture.read()

        # image = cv2.imdecode(data[0],cv2.IMREAD_COLOR)

        if ret:

            return data[0]

        else:

            return None    

if __name__ == “__main__”:

    cj1 = CustomMjpeg(0,1640,1232,30)

    cj2 = CustomMjpeg(1,1640,1232,30)

    st = time.time()

    while True:

        cj1.get_jpeg_data()

        cj2.get_jpeg_data()

        print(1/(time.time()-st))

        st = time.time()

    loop = GObject.MainLoop()

    try:

        loop.run()

    except KeyboardInterrupt:

        pass