From 2c7501108a7bcdba13dc694a09655a699a2788ae Mon Sep 17 00:00:00 2001
From: Mathieu Beligon <mathieu@feedly.com>
Date: Thu, 4 Mar 2021 19:48:40 -0500
Subject: [PATCH] [CV2FrameGenerator] Simplify

---
 src/polystar/dependency_injection.py          | 12 ++---
 .../camera_frame_generator.py                 | 40 +++++-----------
 .../cv2_frame_generator_abc.py                | 47 ++++++++++---------
 .../fps_video_frame_generator.py              | 13 ++---
 .../frame_generators/video_frame_generator.py | 34 +++++++-------
 5 files changed, 65 insertions(+), 81 deletions(-)

diff --git a/src/polystar/dependency_injection.py b/src/polystar/dependency_injection.py
index 6cb199a..90911c1 100644
--- a/src/polystar/dependency_injection.py
+++ b/src/polystar/dependency_injection.py
@@ -9,7 +9,8 @@ from polystar.communication.board_a import BoardA
 from polystar.communication.cs_link_abc import CSLinkABC
 from polystar.communication.screen import Screen
 from polystar.constants import LABEL_MAP_PATH
-from polystar.frame_generators.camera_frame_generator import RaspiV2CameraFrameGenerator, WebcamFrameGenerator
+from polystar.frame_generators.camera_frame_generator import make_csi_camera_frame_generator
+from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGenerator
 from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC
 from polystar.models.camera import Camera
 from polystar.models.label_map import LabelMap
@@ -22,10 +23,7 @@ from polystar.target_pipeline.detected_objects.detected_objects_factory import D
 from polystar.target_pipeline.object_selectors.closest_object_selector import ClosestObjectSelector
 from polystar.target_pipeline.object_selectors.object_selector_abc import ObjectSelectorABC
 from polystar.target_pipeline.objects_detectors.objects_detector_abc import ObjectsDetectorABC
-from polystar.target_pipeline.objects_filters.confidence_object_filter import (
-    ConfidenceObjectsFilter,
-    RobotArmorConfidenceObjectsFilter,
-)
+from polystar.target_pipeline.objects_filters.confidence_object_filter import RobotArmorConfidenceObjectsFilter
 from polystar.target_pipeline.objects_filters.objects_filter_abc import ObjectsFilterABC
 from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC
 from polystar.target_pipeline.objects_linker.simple_objects_linker import SimpleObjectsLinker
@@ -113,5 +111,5 @@ class CommonModule(Module):
     @singleton
     def provide_webcam(self) -> FrameGeneratorABC:
         if self.settings.is_prod:
-            return RaspiV2CameraFrameGenerator(1_280, 720)
-        return WebcamFrameGenerator()
+            return make_csi_camera_frame_generator(1_280, 720)
+        return CV2FrameGenerator()
diff --git a/src/polystar/frame_generators/camera_frame_generator.py b/src/polystar/frame_generators/camera_frame_generator.py
index eb4ed9e..b9e9c83 100644
--- a/src/polystar/frame_generators/camera_frame_generator.py
+++ b/src/polystar/frame_generators/camera_frame_generator.py
@@ -1,31 +1,17 @@
-from dataclasses import dataclass
-from typing import Any, Iterable
-
 import cv2
 
-from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGeneratorABC
-
-
-@dataclass
-class RaspiV2CameraFrameGenerator(CV2FrameGeneratorABC):
-    width: int
-    height: int
-
-    def _capture_params(self) -> Iterable[Any]:
-        return (
-            "nvarguscamerasrc ! "
-            "video/x-raw(memory:NVMM), "
-            f"width=(int){self.width}, height=(int){self.height}, "
-            "format=(string)NV12, framerate=60/1 ! "
-            "nvvidconv flip-method=0 ! "
-            f"video/x-raw, width=(int){self.width}, height=(int){self.height}, "
-            "format=(string)BGRx ! "
-            "videoconvert ! appsink drop=true sync=false",
-            cv2.CAP_GSTREAMER,
-        )
+from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGenerator
 
 
-@dataclass
-class WebcamFrameGenerator(CV2FrameGeneratorABC):
-    def _capture_params(self) -> Iterable[Any]:
-        return (0,)
+def make_csi_camera_frame_generator(width: int, height: int) -> CV2FrameGenerator:
+    return CV2FrameGenerator(
+        "nvarguscamerasrc ! "
+        "video/x-raw(memory:NVMM), "
+        f"width=(int){width}, height=(int){height}, "
+        "format=(string)NV12, framerate=60/1 ! "
+        "nvvidconv flip-method=0 ! "
+        f"video/x-raw, width=(int){width}, height=(int){height}, "
+        "format=(string)BGRx ! "
+        "videoconvert ! appsink drop=true sync=false",
+        cv2.CAP_GSTREAMER,
+    )
diff --git a/src/polystar/frame_generators/cv2_frame_generator_abc.py b/src/polystar/frame_generators/cv2_frame_generator_abc.py
index 167ecce..6ffaab5 100644
--- a/src/polystar/frame_generators/cv2_frame_generator_abc.py
+++ b/src/polystar/frame_generators/cv2_frame_generator_abc.py
@@ -1,30 +1,33 @@
-from abc import ABC, abstractmethod
-from dataclasses import dataclass
 from typing import Any, Iterable, Iterator
 
-import cv2
+from cv2.cv2 import CAP_PROP_BUFFERSIZE, VideoCapture
 
 from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC
 from polystar.models.image import Image
 
 
-@dataclass
-class CV2FrameGeneratorABC(FrameGeneratorABC, ABC):
+class CV2FrameGenerator(FrameGeneratorABC):
+    def __init__(self, *capture_params: Any):
+        self.capture_params = capture_params or (0,)
+
     def __iter__(self) -> Iterator[Image]:
-        _cap = self._open()
-        while 1:
-            is_open, frame = _cap.read()
-            if not is_open:
-                break
-            yield frame
-        _cap.release()
-
-    def _open(self) -> cv2.VideoCapture:
-        _cap = cv2.VideoCapture(*self._capture_params())
-        _cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
-        assert _cap.isOpened()
-        return _cap
-
-    @abstractmethod
-    def _capture_params(self) -> Iterable[Any]:
-        pass
+        return CV2Capture(self.capture_params)
+
+
+class CV2Capture(Iterator[Image]):
+    def __init__(self, capture_params: Iterable):
+        self._cap = VideoCapture(*capture_params)
+        assert self._cap.isOpened()
+        self._cap.set(CAP_PROP_BUFFERSIZE, 0)
+
+    def __next__(self) -> Image:
+        success, frame = self._cap.read()
+
+        if success:
+            return frame
+
+        raise StopIteration()
+
+    def __del__(self):
+        if self._cap.isOpened():
+            self._cap.release()
diff --git a/src/polystar/frame_generators/fps_video_frame_generator.py b/src/polystar/frame_generators/fps_video_frame_generator.py
index 926d6b6..83889e4 100644
--- a/src/polystar/frame_generators/fps_video_frame_generator.py
+++ b/src/polystar/frame_generators/fps_video_frame_generator.py
@@ -1,17 +1,14 @@
-from dataclasses import dataclass
-from typing import Iterator
+from pathlib import Path
+from typing import Iterator, Optional
 
 from polystar.frame_generators.video_frame_generator import VideoFrameGenerator
 from polystar.models.image import Image
 
 
-@dataclass
 class FPSVideoFrameGenerator(VideoFrameGenerator):
-
-    desired_fps: int
-
-    def __post_init__(self):
-        self.frame_rate: int = self._video_fps // self.desired_fps
+    def __init__(self, video_path: Path, desired_fps: int, offset_seconds: Optional[int] = None):
+        super().__init__(video_path, offset_seconds)
+        self.frame_rate: int = self._video_fps // desired_fps
 
     def __iter__(self) -> Iterator[Image]:
         for i, frame in enumerate(super().__iter__(), -1):
diff --git a/src/polystar/frame_generators/video_frame_generator.py b/src/polystar/frame_generators/video_frame_generator.py
index 001eb0f..1d9220b 100644
--- a/src/polystar/frame_generators/video_frame_generator.py
+++ b/src/polystar/frame_generators/video_frame_generator.py
@@ -1,24 +1,24 @@
-import cv2
-
-from dataclasses import dataclass
 from pathlib import Path
-from typing import Any, Iterable, Optional
+from typing import Iterable, Iterator, Optional
 
 import ffmpeg
 from cv2.cv2 import CAP_PROP_POS_FRAMES
 from memoized_property import memoized_property
 
-from polystar.frame_generators.cv2_frame_generator_abc import CV2FrameGeneratorABC
-
+from polystar.frame_generators.cv2_frame_generator_abc import CV2Capture, CV2FrameGenerator
+from polystar.models.image import Image
 
-@dataclass
-class VideoFrameGenerator(CV2FrameGeneratorABC):
 
-    video_path: Path
-    offset_seconds: Optional[int]
+class VideoFrameGenerator(CV2FrameGenerator):
+    def __init__(self, video_path: Path, offset_seconds: Optional[int] = None):
+        super().__init__(str(video_path))
+        self.offset_seconds = offset_seconds
+        self.video_path = video_path
 
-    def _capture_params(self) -> Iterable[Any]:
-        return (str(self.video_path),)
+    def __iter__(self) -> Iterator[Image]:
+        if self.offset_seconds:
+            return CV2CaptureWithOffset(self.capture_params, self._video_fps * self.offset_seconds - 2)
+        return CV2Capture(self.capture_params)
 
     @memoized_property
     def _video_fps(self) -> int:
@@ -29,8 +29,8 @@ class VideoFrameGenerator(CV2FrameGeneratorABC):
             return round(eval(stream_info["avg_frame_rate"]))
         raise ValueError(f"No fps found for video {self.video_path.name}")
 
-    def _open(self) -> cv2.VideoCapture:
-        _cap = super()._open()
-        if self.offset_seconds:
-            _cap.set(CAP_PROP_POS_FRAMES, self._video_fps * self.offset_seconds - 2)
-        return _cap
+
+class CV2CaptureWithOffset(CV2Capture):
+    def __init__(self, capture_params: Iterable, offset_frames: int):
+        super().__init__(capture_params)
+        self._cap.set(CAP_PROP_POS_FRAMES, offset_frames)
-- 
GitLab