diff --git a/src/polystar/target_pipeline/debug_pipeline.py b/src/polystar/target_pipeline/debug_pipeline.py index b3302d68a9bf8558af773f532c732dd7f41416e9..7250fe70ea19b6bbfc3f65e4a4a93a7e096feec4 100644 --- a/src/polystar/target_pipeline/debug_pipeline.py +++ b/src/polystar/target_pipeline/debug_pipeline.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import List +from typing import Iterable, List from injector import inject @@ -7,7 +7,7 @@ from polystar.models.image import Image from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot from polystar.target_pipeline.target_abc import TargetABC -from polystar.target_pipeline.target_pipeline import TargetPipeline, _assert_armors_detected +from polystar.target_pipeline.target_pipeline import TargetPipeline, assert_any_armors_detected @dataclass @@ -26,10 +26,14 @@ class DebugTargetPipeline(TargetPipeline): debug_info_: DebugInfo = field(init=False) + def flow_debug(self, images: Iterable[Image]): + for _ in self.flow_targets(images): + yield self.debug_info_ + def _make_target_from_robots(self, image: Image, robots: List[DetectedRobot]) -> TargetABC: self.debug_info_ = DebugInfo(image, robots) self.debug_info_.validated_robots = self.robots_filters.filter(self.debug_info_.detected_robots) - _assert_armors_detected(self.debug_info_.validated_robots) + assert_any_armors_detected(self.debug_info_.validated_robots) self.debug_info_.selected_armor = self.object_selector.select(self.debug_info_.validated_robots, image) self.debug_info_.target = self.target_factory.from_object(self.debug_info_.selected_armor, image) return self.debug_info_.target diff --git a/src/polystar/target_pipeline/objects_detectors/objects_detector_abc.py b/src/polystar/target_pipeline/objects_detectors/objects_detector_abc.py index 298398f05918ceebcc6338f6b50b672d0fcfd500..a5b5e38664b38a37cb453081875426adf9db92c3 100644 --- a/src/polystar/target_pipeline/objects_detectors/objects_detector_abc.py +++ b/src/polystar/target_pipeline/objects_detectors/objects_detector_abc.py @@ -1,59 +1,12 @@ from abc import ABC, abstractmethod -from dataclasses import dataclass -from typing import Iterable, List, Tuple +from typing import List import numpy as np -from injector import inject -from polystar.models.box import Box -from polystar.models.image import Image -from polystar.models.label_map import LabelMap -from polystar.models.roco_object import ObjectType -from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor -from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot from polystar.target_pipeline.detected_objects.objects_params import ObjectParams -from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC class ObjectsDetectorABC(ABC): @abstractmethod def detect(self, image: np.array) -> List[ObjectParams]: pass - - -@inject -@dataclass -class RobotsDetector: - label_map: LabelMap - object_detector: ObjectsDetectorABC - objects_linker: ObjectsLinkerABC - - def flow_robots(self, image_iterator: Iterable[Image]) -> Iterable[List[DetectedRobot]]: - for image in image_iterator: - yield image, self.detect_robots(image) - - def detect_robots(self, image: Image) -> List[DetectedRobot]: - objects_params = self.object_detector.detect(image) - robots, armors = self.make_robots_and_armors(objects_params, image) - return list(self.objects_linker.link_armors_to_robots(robots, armors, image)) - - def make_robots_and_armors( - self, objects_params: List[ObjectParams], image: Image - ) -> Tuple[List[DetectedRobot], List[DetectedArmor]]: - image_height, image_width, *_ = image.shape - - robots, armors = [], [] - for object_params in objects_params: - object_type = ObjectType(self.label_map.name_of(object_params.object_class_id)) - box = Box.from_positions( - # TODO what about using relative coordinates ? - x1=int(object_params.xmin * image_width), - y1=int(object_params.ymin * image_height), - x2=int(object_params.xmax * image_width), - y2=int(object_params.ymax * image_height), - ) - if object_type is ObjectType.ARMOR: - armors.append(DetectedArmor(object_type, box, object_params.score)) - else: - robots.append(DetectedRobot(object_type, box, object_params.score)) - return robots, armors diff --git a/src/polystar/target_pipeline/objects_detectors/robots_detector.py b/src/polystar/target_pipeline/objects_detectors/robots_detector.py new file mode 100644 index 0000000000000000000000000000000000000000..c78878bf0e663d158c50a1a16ee3c037be5afd76 --- /dev/null +++ b/src/polystar/target_pipeline/objects_detectors/robots_detector.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from typing import Iterable, List, Tuple + +from injector import inject + +from polystar.models.box import Box +from polystar.models.image import Image +from polystar.models.label_map import LabelMap +from polystar.models.roco_object import ObjectType +from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor +from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot +from polystar.target_pipeline.detected_objects.objects_params import ObjectParams +from polystar.target_pipeline.objects_detectors.objects_detector_abc import ObjectsDetectorABC +from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC + + +@inject +@dataclass +class RobotsDetector: + label_map: LabelMap + object_detector: ObjectsDetectorABC + objects_linker: ObjectsLinkerABC + + def flow_robots(self, images: Iterable[Image]) -> Iterable[List[DetectedRobot]]: + for image in images: + yield image, self.detect_robots(image) + + def detect_robots(self, image: Image) -> List[DetectedRobot]: + objects_params = self.object_detector.detect(image) + robots, armors = self.make_robots_and_armors(objects_params, image) + return list(self.objects_linker.link_armors_to_robots(robots, armors, image)) + + def make_robots_and_armors( + self, objects_params: List[ObjectParams], image: Image + ) -> Tuple[List[DetectedRobot], List[DetectedArmor]]: + image_height, image_width, *_ = image.shape + + robots, armors = [], [] + for object_params in objects_params: + object_type = ObjectType(self.label_map.name_of(object_params.object_class_id)) + box = Box.from_positions( + # TODO what about using relative coordinates ? + x1=int(object_params.xmin * image_width), + y1=int(object_params.ymin * image_height), + x2=int(object_params.xmax * image_width), + y2=int(object_params.ymax * image_height), + ) + if object_type is ObjectType.ARMOR: + armors.append(DetectedArmor(object_type, box, object_params.score)) + else: + robots.append(DetectedRobot(object_type, box, object_params.score)) + return robots, armors diff --git a/src/polystar/target_pipeline/target_pipeline.py b/src/polystar/target_pipeline/target_pipeline.py index b166286de0e83f8d722d0c3b9d539307bf89bc5c..e4cf84e7fae50a6cc801abcc45573c0adb718a7e 100644 --- a/src/polystar/target_pipeline/target_pipeline.py +++ b/src/polystar/target_pipeline/target_pipeline.py @@ -7,7 +7,7 @@ from polystar.filters.filter_abc import FilterABC from polystar.models.image import Image from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot from polystar.target_pipeline.object_selectors.object_selector_abc import ObjectSelectorABC -from polystar.target_pipeline.objects_detectors.objects_detector_abc import RobotsDetector +from polystar.target_pipeline.objects_detectors.robots_detector import RobotsDetector from polystar.target_pipeline.target_abc import TargetABC from polystar.target_pipeline.target_factories.target_factory_abc import TargetFactoryABC @@ -24,25 +24,21 @@ class TargetPipeline: object_selector: ObjectSelectorABC target_factory: TargetFactoryABC - def flow_targets(self, image_iterator: Iterable[Image]) -> Iterable[Optional[TargetABC]]: - for image, robots in self.robots_detector.flow_robots(image_iterator): + def flow_targets(self, images: Iterable[Image]) -> Iterable[Optional[TargetABC]]: + for image, robots in self.robots_detector.flow_robots(images): try: yield self._make_target_from_robots(image, robots) except NoTargetFoundException: yield None - def predict_target(self, image: Image) -> TargetABC: - robots = self.robots_detector.detect_robots(image) - return self._make_target_from_robots(image, robots) - def _make_target_from_robots(self, image: Image, robots: List[DetectedRobot]) -> TargetABC: robots = self.robots_filters.filter(robots) - _assert_armors_detected(robots) + assert_any_armors_detected(robots) selected_armor = self.object_selector.select(robots, image) target = self.target_factory.from_object(selected_armor, image) return target -def _assert_armors_detected(robots): +def assert_any_armors_detected(robots): if not any(robot.armors for robot in robots): raise NoTargetFoundException() diff --git a/src/polystar/utils/thread.py b/src/polystar/utils/thread.py index ed556d64ef9ed10fe059af8c76645ad1cadafd80..581d06aab4defe54adaf4abaddfab517ae227a07 100644 --- a/src/polystar/utils/thread.py +++ b/src/polystar/utils/thread.py @@ -12,6 +12,9 @@ class MyThread(Thread): def run(self) -> None: self.running = True + self.loop() + + def loop(self): while self.running: self.step() diff --git a/src/research/scripts/demo_pipeline.py b/src/research/scripts/demo_pipeline.py index 0ed50788901b7015d1c21391f902665dde3cc3cb..4b2d7209f3e8b1e6f8ea8ad977f4d91e64e4719e 100644 --- a/src/research/scripts/demo_pipeline.py +++ b/src/research/scripts/demo_pipeline.py @@ -3,7 +3,6 @@ from injector import inject from polystar.dependency_injection import make_injector from polystar.target_pipeline.debug_pipeline import DebugTargetPipeline from polystar.target_pipeline.objects_filters.armor_digit_filter import KeepArmorsDigitFilter -from polystar.target_pipeline.target_pipeline import NoTargetFoundException from polystar.view.plt_results_viewer import PltResultViewer from research.common.datasets.roco.roco_annotation_filters.roco_annotation_object_filter import ( ROCOAnnotationObjectFilter, @@ -15,18 +14,14 @@ from research.common.datasets.roco.zoo.roco_dataset_zoo import ROCODatasetsZoo def demo_pipeline_on_images(pipeline: DebugTargetPipeline): with PltResultViewer("Demo of tf model") as viewer: for builder in ROCODatasetsZoo.DEFAULT_TEST_DATASETS: - for image in ( + for debug_info in pipeline.flow_debug( builder.to_images() .filter_targets(ROCOAnnotationObjectFilter(KeepArmorsDigitFilter((1, 3, 4)))) .shuffle() .cap(15) .build_examples() ): - try: - pipeline.predict_target(image) - except NoTargetFoundException: - pass - viewer.display_debug_info(pipeline.debug_info_) + viewer.display_debug_info(debug_info) if __name__ == "__main__": diff --git a/src/research/scripts/demo_pipeline_camera.py b/src/research/scripts/demo_pipeline_camera.py index 5a035b5fd7bde4bee6549ed02c8704f9dd13dbd9..7cb78c7b6cf2a34ab72f46831d01f055bd0090da 100644 --- a/src/research/scripts/demo_pipeline_camera.py +++ b/src/research/scripts/demo_pipeline_camera.py @@ -6,7 +6,7 @@ from polystar.communication.cs_link_abc import CSLinkABC from polystar.communication.togglabe_cs_link import TogglableCSLink from polystar.dependency_injection import make_injector from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC -from polystar.target_pipeline.debug_pipeline import DebugTargetPipeline +from polystar.target_pipeline.debug_pipeline import DebugInfo, DebugTargetPipeline from polystar.target_pipeline.target_abc import SimpleTarget from polystar.utils.fps import FPS from polystar.utils.thread import MyThread @@ -24,9 +24,9 @@ class CameraPipelineDemo: def run(self): with CV2ResultViewer("Pipeline demo", key_callbacks={" ": self.cs_link.toggle}) as viewer: - for target in self.pipeline.flow_targets(self.webcam): - self._send_target(target) - self._display(viewer) + for debug_info in self.pipeline.flow_debug(self.webcam): + self._send_target(debug_info.target) + self._display(viewer, debug_info) def _send_target(self, target: Optional[SimpleTarget]): if target is not None: @@ -38,8 +38,8 @@ class CameraPipelineDemo: self.persistence_last_detection -= 1 - def _display(self, viewer: CV2ResultViewer): - viewer.add_debug_info(self.pipeline.debug_info_) + def _display(self, viewer: CV2ResultViewer, debug_info: DebugInfo): + viewer.add_debug_info(debug_info) viewer.add_text(f"FPS: {self.fps.tick():.1f}", 10, 10, (0, 0, 0)) viewer.add_text("Communication: " + ("[ON]" if self.cs_link.is_on else "[OFF]"), 10, 30, (0, 0, 0)) viewer.display()