diff --git a/src/polystar/dependency_injection.py b/src/polystar/dependency_injection.py index 6cb199adc8d5c980233015f2b9bbedc99e6ca0fe..90911c13aab14b2ea1e725f8e0c836afe7e5a163 100644 --- a/src/polystar/dependency_injection.py +++ b/src/polystar/dependency_injection.py @@ -9,7 +9,8 @@ from polystar.communication.board_a import BoardA from polystar.communication.cs_link_abc import CSLinkABC from polystar.communication.screen import Screen from polystar.constants import LABEL_MAP_PATH -from polystar.frame_generators.camera_frame_generator import RaspiV2CameraFrameGenerator, WebcamFrameGenerator +from polystar.frame_generators.camera_frame_generator import make_csi_camera_frame_generator +from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGenerator from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC from polystar.models.camera import Camera from polystar.models.label_map import LabelMap @@ -22,10 +23,7 @@ from polystar.target_pipeline.detected_objects.detected_objects_factory import D from polystar.target_pipeline.object_selectors.closest_object_selector import ClosestObjectSelector from polystar.target_pipeline.object_selectors.object_selector_abc import ObjectSelectorABC from polystar.target_pipeline.objects_detectors.objects_detector_abc import ObjectsDetectorABC -from polystar.target_pipeline.objects_filters.confidence_object_filter import ( - ConfidenceObjectsFilter, - RobotArmorConfidenceObjectsFilter, -) +from polystar.target_pipeline.objects_filters.confidence_object_filter import RobotArmorConfidenceObjectsFilter from polystar.target_pipeline.objects_filters.objects_filter_abc import ObjectsFilterABC from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC from polystar.target_pipeline.objects_linker.simple_objects_linker import SimpleObjectsLinker @@ -113,5 +111,5 @@ class CommonModule(Module): @singleton def provide_webcam(self) -> FrameGeneratorABC: if self.settings.is_prod: - return RaspiV2CameraFrameGenerator(1_280, 720) - return WebcamFrameGenerator() + return make_csi_camera_frame_generator(1_280, 720) + return CV2FrameGenerator() diff --git a/src/polystar/frame_generators/camera_frame_generator.py b/src/polystar/frame_generators/camera_frame_generator.py index eb4ed9e51606c83b166a31d158ec7513c079d83d..b9e9c83c2f928d936e7089153f725a6fe713a180 100644 --- a/src/polystar/frame_generators/camera_frame_generator.py +++ b/src/polystar/frame_generators/camera_frame_generator.py @@ -1,31 +1,17 @@ -from dataclasses import dataclass -from typing import Any, Iterable - import cv2 -from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGeneratorABC - - -@dataclass -class RaspiV2CameraFrameGenerator(CV2FrameGeneratorABC): - width: int - height: int - - def _capture_params(self) -> Iterable[Any]: - return ( - "nvarguscamerasrc ! " - "video/x-raw(memory:NVMM), " - f"width=(int){self.width}, height=(int){self.height}, " - "format=(string)NV12, framerate=60/1 ! " - "nvvidconv flip-method=0 ! " - f"video/x-raw, width=(int){self.width}, height=(int){self.height}, " - "format=(string)BGRx ! " - "videoconvert ! appsink drop=true sync=false", - cv2.CAP_GSTREAMER, - ) +from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGenerator -@dataclass -class WebcamFrameGenerator(CV2FrameGeneratorABC): - def _capture_params(self) -> Iterable[Any]: - return (0,) +def make_csi_camera_frame_generator(width: int, height: int) -> CV2FrameGenerator: + return CV2FrameGenerator( + "nvarguscamerasrc ! " + "video/x-raw(memory:NVMM), " + f"width=(int){width}, height=(int){height}, " + "format=(string)NV12, framerate=60/1 ! " + "nvvidconv flip-method=0 ! " + f"video/x-raw, width=(int){width}, height=(int){height}, " + "format=(string)BGRx ! " + "videoconvert ! appsink drop=true sync=false", + cv2.CAP_GSTREAMER, + ) diff --git a/src/polystar/frame_generators/cv2_frame_generator_abc.py b/src/polystar/frame_generators/cv2_frame_generator_abc.py index 167eccef483c973d726f3f747cdcac58dc2477d2..6ffaab57877540f2ca2616cec8c37b1db9a0568e 100644 --- a/src/polystar/frame_generators/cv2_frame_generator_abc.py +++ b/src/polystar/frame_generators/cv2_frame_generator_abc.py @@ -1,30 +1,33 @@ -from abc import ABC, abstractmethod -from dataclasses import dataclass from typing import Any, Iterable, Iterator -import cv2 +from cv2.cv2 import CAP_PROP_BUFFERSIZE, VideoCapture from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC from polystar.models.image import Image -@dataclass -class CV2FrameGeneratorABC(FrameGeneratorABC, ABC): +class CV2FrameGenerator(FrameGeneratorABC): + def __init__(self, *capture_params: Any): + self.capture_params = capture_params or (0,) + def __iter__(self) -> Iterator[Image]: - _cap = self._open() - while 1: - is_open, frame = _cap.read() - if not is_open: - break - yield frame - _cap.release() - - def _open(self) -> cv2.VideoCapture: - _cap = cv2.VideoCapture(*self._capture_params()) - _cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) - assert _cap.isOpened() - return _cap - - @abstractmethod - def _capture_params(self) -> Iterable[Any]: - pass + return CV2Capture(self.capture_params) + + +class CV2Capture(Iterator[Image]): + def __init__(self, capture_params: Iterable): + self._cap = VideoCapture(*capture_params) + assert self._cap.isOpened() + self._cap.set(CAP_PROP_BUFFERSIZE, 0) + + def __next__(self) -> Image: + success, frame = self._cap.read() + + if success: + return frame + + raise StopIteration() + + def __del__(self): + if self._cap.isOpened(): + self._cap.release() diff --git a/src/polystar/frame_generators/fps_video_frame_generator.py b/src/polystar/frame_generators/fps_video_frame_generator.py index 926d6b6d4955b2393b62ab9251bc04fedd5e7125..83889e41597f2be4422c2ef32de01c77714ce750 100644 --- a/src/polystar/frame_generators/fps_video_frame_generator.py +++ b/src/polystar/frame_generators/fps_video_frame_generator.py @@ -1,17 +1,14 @@ -from dataclasses import dataclass -from typing import Iterator +from pathlib import Path +from typing import Iterator, Optional from polystar.frame_generators.video_frame_generator import VideoFrameGenerator from polystar.models.image import Image -@dataclass class FPSVideoFrameGenerator(VideoFrameGenerator): - - desired_fps: int - - def __post_init__(self): - self.frame_rate: int = self._video_fps // self.desired_fps + def __init__(self, video_path: Path, desired_fps: int, offset_seconds: Optional[int] = None): + super().__init__(video_path, offset_seconds) + self.frame_rate: int = self._video_fps // desired_fps def __iter__(self) -> Iterator[Image]: for i, frame in enumerate(super().__iter__(), -1): diff --git a/src/polystar/frame_generators/video_frame_generator.py b/src/polystar/frame_generators/video_frame_generator.py index 001eb0fe6cb7a5f15d0d0422dbf1f673b51f14cf..1d9220b736ba128ddb62a8eebff1fb963dcb42f0 100644 --- a/src/polystar/frame_generators/video_frame_generator.py +++ b/src/polystar/frame_generators/video_frame_generator.py @@ -1,24 +1,24 @@ -import cv2 - -from dataclasses import dataclass from pathlib import Path -from typing import Any, Iterable, Optional +from typing import Iterable, Iterator, Optional import ffmpeg from cv2.cv2 import CAP_PROP_POS_FRAMES from memoized_property import memoized_property -from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGeneratorABC - +from polystar.frame_generators.cv2_frame_generator_abc import CV2Capture, CV2FrameGenerator +from polystar.models.image import Image -@dataclass -class VideoFrameGenerator(CV2FrameGeneratorABC): - video_path: Path - offset_seconds: Optional[int] +class VideoFrameGenerator(CV2FrameGenerator): + def __init__(self, video_path: Path, offset_seconds: Optional[int] = None): + super().__init__(str(video_path)) + self.offset_seconds = offset_seconds + self.video_path = video_path - def _capture_params(self) -> Iterable[Any]: - return (str(self.video_path),) + def __iter__(self) -> Iterator[Image]: + if self.offset_seconds: + return CV2CaptureWithOffset(self.capture_params, self._video_fps * self.offset_seconds - 2) + return CV2Capture(self.capture_params) @memoized_property def _video_fps(self) -> int: @@ -29,8 +29,8 @@ class VideoFrameGenerator(CV2FrameGeneratorABC): return round(eval(stream_info["avg_frame_rate"])) raise ValueError(f"No fps found for video {self.video_path.name}") - def _open(self) -> cv2.VideoCapture: - _cap = super()._open() - if self.offset_seconds: - _cap.set(CAP_PROP_POS_FRAMES, self._video_fps * self.offset_seconds - 2) - return _cap + +class CV2CaptureWithOffset(CV2Capture): + def __init__(self, capture_params: Iterable, offset_frames: int): + super().__init__(capture_params) + self._cap.set(CAP_PROP_POS_FRAMES, offset_frames)