Skip to content
Snippets Groups Projects
Commit 14d882fa authored by Mathieu Beligon's avatar Mathieu Beligon
Browse files

[DebugTargetPipeline] Add flow_debug function

parent 9024e4e9
No related branches found
No related tags found
No related merge requests found
from dataclasses import dataclass, field
from typing import List
from typing import Iterable, List
from injector import inject
......@@ -7,7 +7,7 @@ from polystar.models.image import Image
from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor
from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot
from polystar.target_pipeline.target_abc import TargetABC
from polystar.target_pipeline.target_pipeline import TargetPipeline, _assert_armors_detected
from polystar.target_pipeline.target_pipeline import TargetPipeline, assert_any_armors_detected
@dataclass
......@@ -26,10 +26,14 @@ class DebugTargetPipeline(TargetPipeline):
debug_info_: DebugInfo = field(init=False)
def flow_debug(self, images: Iterable[Image]):
for _ in self.flow_targets(images):
yield self.debug_info_
def _make_target_from_robots(self, image: Image, robots: List[DetectedRobot]) -> TargetABC:
self.debug_info_ = DebugInfo(image, robots)
self.debug_info_.validated_robots = self.robots_filters.filter(self.debug_info_.detected_robots)
_assert_armors_detected(self.debug_info_.validated_robots)
assert_any_armors_detected(self.debug_info_.validated_robots)
self.debug_info_.selected_armor = self.object_selector.select(self.debug_info_.validated_robots, image)
self.debug_info_.target = self.target_factory.from_object(self.debug_info_.selected_armor, image)
return self.debug_info_.target
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Iterable, List, Tuple
from typing import List
import numpy as np
from injector import inject
from polystar.models.box import Box
from polystar.models.image import Image
from polystar.models.label_map import LabelMap
from polystar.models.roco_object import ObjectType
from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor
from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot
from polystar.target_pipeline.detected_objects.objects_params import ObjectParams
from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC
class ObjectsDetectorABC(ABC):
@abstractmethod
def detect(self, image: np.array) -> List[ObjectParams]:
pass
@inject
@dataclass
class RobotsDetector:
label_map: LabelMap
object_detector: ObjectsDetectorABC
objects_linker: ObjectsLinkerABC
def flow_robots(self, image_iterator: Iterable[Image]) -> Iterable[List[DetectedRobot]]:
for image in image_iterator:
yield image, self.detect_robots(image)
def detect_robots(self, image: Image) -> List[DetectedRobot]:
objects_params = self.object_detector.detect(image)
robots, armors = self.make_robots_and_armors(objects_params, image)
return list(self.objects_linker.link_armors_to_robots(robots, armors, image))
def make_robots_and_armors(
self, objects_params: List[ObjectParams], image: Image
) -> Tuple[List[DetectedRobot], List[DetectedArmor]]:
image_height, image_width, *_ = image.shape
robots, armors = [], []
for object_params in objects_params:
object_type = ObjectType(self.label_map.name_of(object_params.object_class_id))
box = Box.from_positions(
# TODO what about using relative coordinates ?
x1=int(object_params.xmin * image_width),
y1=int(object_params.ymin * image_height),
x2=int(object_params.xmax * image_width),
y2=int(object_params.ymax * image_height),
)
if object_type is ObjectType.ARMOR:
armors.append(DetectedArmor(object_type, box, object_params.score))
else:
robots.append(DetectedRobot(object_type, box, object_params.score))
return robots, armors
from dataclasses import dataclass
from typing import Iterable, List, Tuple
from injector import inject
from polystar.models.box import Box
from polystar.models.image import Image
from polystar.models.label_map import LabelMap
from polystar.models.roco_object import ObjectType
from polystar.target_pipeline.detected_objects.detected_armor import DetectedArmor
from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot
from polystar.target_pipeline.detected_objects.objects_params import ObjectParams
from polystar.target_pipeline.objects_detectors.objects_detector_abc import ObjectsDetectorABC
from polystar.target_pipeline.objects_linker.objects_linker_abs import ObjectsLinkerABC
@inject
@dataclass
class RobotsDetector:
label_map: LabelMap
object_detector: ObjectsDetectorABC
objects_linker: ObjectsLinkerABC
def flow_robots(self, images: Iterable[Image]) -> Iterable[List[DetectedRobot]]:
for image in images:
yield image, self.detect_robots(image)
def detect_robots(self, image: Image) -> List[DetectedRobot]:
objects_params = self.object_detector.detect(image)
robots, armors = self.make_robots_and_armors(objects_params, image)
return list(self.objects_linker.link_armors_to_robots(robots, armors, image))
def make_robots_and_armors(
self, objects_params: List[ObjectParams], image: Image
) -> Tuple[List[DetectedRobot], List[DetectedArmor]]:
image_height, image_width, *_ = image.shape
robots, armors = [], []
for object_params in objects_params:
object_type = ObjectType(self.label_map.name_of(object_params.object_class_id))
box = Box.from_positions(
# TODO what about using relative coordinates ?
x1=int(object_params.xmin * image_width),
y1=int(object_params.ymin * image_height),
x2=int(object_params.xmax * image_width),
y2=int(object_params.ymax * image_height),
)
if object_type is ObjectType.ARMOR:
armors.append(DetectedArmor(object_type, box, object_params.score))
else:
robots.append(DetectedRobot(object_type, box, object_params.score))
return robots, armors
......@@ -7,7 +7,7 @@ from polystar.filters.filter_abc import FilterABC
from polystar.models.image import Image
from polystar.target_pipeline.detected_objects.detected_robot import DetectedRobot
from polystar.target_pipeline.object_selectors.object_selector_abc import ObjectSelectorABC
from polystar.target_pipeline.objects_detectors.objects_detector_abc import RobotsDetector
from polystar.target_pipeline.objects_detectors.robots_detector import RobotsDetector
from polystar.target_pipeline.target_abc import TargetABC
from polystar.target_pipeline.target_factories.target_factory_abc import TargetFactoryABC
......@@ -24,25 +24,21 @@ class TargetPipeline:
object_selector: ObjectSelectorABC
target_factory: TargetFactoryABC
def flow_targets(self, image_iterator: Iterable[Image]) -> Iterable[Optional[TargetABC]]:
for image, robots in self.robots_detector.flow_robots(image_iterator):
def flow_targets(self, images: Iterable[Image]) -> Iterable[Optional[TargetABC]]:
for image, robots in self.robots_detector.flow_robots(images):
try:
yield self._make_target_from_robots(image, robots)
except NoTargetFoundException:
yield None
def predict_target(self, image: Image) -> TargetABC:
robots = self.robots_detector.detect_robots(image)
return self._make_target_from_robots(image, robots)
def _make_target_from_robots(self, image: Image, robots: List[DetectedRobot]) -> TargetABC:
robots = self.robots_filters.filter(robots)
_assert_armors_detected(robots)
assert_any_armors_detected(robots)
selected_armor = self.object_selector.select(robots, image)
target = self.target_factory.from_object(selected_armor, image)
return target
def _assert_armors_detected(robots):
def assert_any_armors_detected(robots):
if not any(robot.armors for robot in robots):
raise NoTargetFoundException()
......@@ -12,6 +12,9 @@ class MyThread(Thread):
def run(self) -> None:
self.running = True
self.loop()
def loop(self):
while self.running:
self.step()
......
......@@ -3,7 +3,6 @@ from injector import inject
from polystar.dependency_injection import make_injector
from polystar.target_pipeline.debug_pipeline import DebugTargetPipeline
from polystar.target_pipeline.objects_filters.armor_digit_filter import KeepArmorsDigitFilter
from polystar.target_pipeline.target_pipeline import NoTargetFoundException
from polystar.view.plt_results_viewer import PltResultViewer
from research.common.datasets.roco.roco_annotation_filters.roco_annotation_object_filter import (
ROCOAnnotationObjectFilter,
......@@ -15,18 +14,14 @@ from research.common.datasets.roco.zoo.roco_dataset_zoo import ROCODatasetsZoo
def demo_pipeline_on_images(pipeline: DebugTargetPipeline):
with PltResultViewer("Demo of tf model") as viewer:
for builder in ROCODatasetsZoo.DEFAULT_TEST_DATASETS:
for image in (
for debug_info in pipeline.flow_debug(
builder.to_images()
.filter_targets(ROCOAnnotationObjectFilter(KeepArmorsDigitFilter((1, 3, 4))))
.shuffle()
.cap(15)
.build_examples()
):
try:
pipeline.predict_target(image)
except NoTargetFoundException:
pass
viewer.display_debug_info(pipeline.debug_info_)
viewer.display_debug_info(debug_info)
if __name__ == "__main__":
......
......@@ -6,7 +6,7 @@ from polystar.communication.cs_link_abc import CSLinkABC
from polystar.communication.togglabe_cs_link import TogglableCSLink
from polystar.dependency_injection import make_injector
from polystar.frame_generators.frames_generator_abc import FrameGeneratorABC
from polystar.target_pipeline.debug_pipeline import DebugTargetPipeline
from polystar.target_pipeline.debug_pipeline import DebugInfo, DebugTargetPipeline
from polystar.target_pipeline.target_abc import SimpleTarget
from polystar.utils.fps import FPS
from polystar.utils.thread import MyThread
......@@ -24,9 +24,9 @@ class CameraPipelineDemo:
def run(self):
with CV2ResultViewer("Pipeline demo", key_callbacks={" ": self.cs_link.toggle}) as viewer:
for target in self.pipeline.flow_targets(self.webcam):
self._send_target(target)
self._display(viewer)
for debug_info in self.pipeline.flow_debug(self.webcam):
self._send_target(debug_info.target)
self._display(viewer, debug_info)
def _send_target(self, target: Optional[SimpleTarget]):
if target is not None:
......@@ -38,8 +38,8 @@ class CameraPipelineDemo:
self.persistence_last_detection -= 1
def _display(self, viewer: CV2ResultViewer):
viewer.add_debug_info(self.pipeline.debug_info_)
def _display(self, viewer: CV2ResultViewer, debug_info: DebugInfo):
viewer.add_debug_info(debug_info)
viewer.add_text(f"FPS: {self.fps.tick():.1f}", 10, 10, (0, 0, 0))
viewer.add_text("Communication: " + ("[ON]" if self.cs_link.is_on else "[OFF]"), 10, 30, (0, 0, 0))
viewer.display()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment