Skip to content
Snippets Groups Projects
Commit 5ed42492 authored by Mathieu Beligon's avatar Mathieu Beligon
Browse files

[common] (filters) add abc, and add FilteredDatasets

parent 0ce1a865
No related branches found
No related tags found
No related merge requests found
Showing
with 198 additions and 97 deletions
from typing import Iterable
from polystar.common.filters.filter_abc import FilterABC, T
class ExcludeFilter(FilterABC[T]):
def __init__(self, to_remove: Iterable[T]):
self.to_remove = set(to_remove)
def validate_single(self, example: T) -> bool:
return example not in self.to_remove
from abc import ABC, abstractmethod
from typing import Generic, Iterable, List, Tuple, TypeVar
T = TypeVar("T")
class FilterABC(Generic[T], ABC):
def filter(self, examples: List[T]) -> List[T]:
return self.filter_with_siblings(examples)[0]
def filter_with_siblings(self, examples: List[T], *siblings: List) -> Tuple[List[T], ...]:
return self.split_with_siblings(examples, *siblings)[True]
def split(self, examples: List[T]) -> Tuple[List[T], List[T]]:
splits = self.split_with_siblings(examples)
return (splits[False][0], splits[True][0])
def split_with_siblings(
self, examples: List[T], *siblings: List
) -> Tuple[Tuple[List[T], ...], Tuple[List[T], ...]]:
are_valid = self.validate(examples)
if not any(are_valid):
return (examples, *siblings), tuple([] for _ in range(len(siblings) + 1))
elif all(are_valid):
return tuple([] for _ in range(len(siblings) + 1)), (examples, *siblings)
return (
_filter_with_siblings_from_preds(are_valid, examples, *siblings, expected_value=False),
_filter_with_siblings_from_preds(are_valid, examples, *siblings, expected_value=True),
)
def validate(self, examples: List[T]) -> List[bool]:
return list(map(self.validate_single, examples))
@abstractmethod
def validate_single(self, example: T) -> bool:
pass
def _filter_with_siblings_from_preds(
are_valid: List[bool], examples: List[T], *siblings: List, expected_value: bool = True
) -> Tuple[List[T], ...]:
iterable_results = zip(
*((ex, *s) for is_valid, ex, *s in zip(are_valid, examples, *siblings) if is_valid == expected_value)
)
return _format_res(iterable_results)
def _format_res(res: Tuple[Iterable[T]]) -> Tuple[List[T]]:
return tuple(map(list, res))
from typing import List
from polystar.common.filters.filter_abc import FilterABC, T
class IntersectionFilter(FilterABC[T]):
def __init__(self, filters: List[FilterABC[T]]):
self.filters = filters
assert self.filters
def validate_single(self, example: T) -> bool:
return all(f.validate_single(example) for f in example)
from typing import Iterable
from polystar.common.filters.filter_abc import FilterABC, T
class KeepFilter(FilterABC[T]):
def __init__(self, to_keep: Iterable[T]):
self.to_keep = set(to_keep)
def validate_single(self, example: T) -> bool:
return example in self.to_keep
from typing import List
from polystar.common.filters.filter_abc import FilterABC, T
class UnionFilter(FilterABC[T]):
def __init__(self, filters: List[FilterABC[T]]):
self.filters = filters
assert self.filters
def validate_single(self, example: T) -> bool:
return any(f.validate_single(example) for f in example)
......@@ -11,13 +11,16 @@ def load_image(image_path: Path, conversion: int = cv2.COLOR_BGR2RGB) -> Image:
return cv2.cvtColor(cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED), conversion)
def save_image(image: Image, image_path: Path, conversion: int = cv2.COLOR_RGB2BGR):
image_path.parent.mkdir(exist_ok=True, parents=True)
cv2.imwrite(str(image_path), cv2.cvtColor(image, conversion))
def load_images(images: Iterable[Path], conversion: int = cv2.COLOR_BGR2RGB) -> Iterable[Image]:
return (load_image(p, conversion) for p in images)
def load_images_in_directory(
directory: Path, pattern: str = "*", conversion: int = cv2.COLOR_BGR2RGB
) -> Iterable[Image]:
for image_path in directory.glob(pattern):
yield load_image(image_path, conversion)
return load_images(directory.glob(pattern), conversion)
def save_image(image: Image, image_path: Path, conversion: int = cv2.COLOR_RGB2BGR):
image_path.parent.mkdir(exist_ok=True, parents=True)
cv2.imwrite(str(image_path), cv2.cvtColor(image, conversion))
from polystar.common.filters.filter_abc import FilterABC
from research.common.datasets.dataset import Dataset, ExampleT, TargetT
from research.common.datasets.simple_dataset import SimpleDataset
class FilteredTargetsDataset(SimpleDataset[ExampleT, TargetT]):
def __init__(self, dataset: Dataset[ExampleT, TargetT], targets_filter: FilterABC[TargetT]):
targets, examples, names = targets_filter.filter_with_siblings(
list(dataset.targets), list(dataset.examples), list(dataset.names)
)
super().__init__(examples, targets, names, dataset.name)
class FilteredExamplesDataset(SimpleDataset[ExampleT, TargetT]):
def __init__(self, dataset: Dataset[ExampleT, TargetT], examples_filter: FilterABC[ExampleT]):
super().__init__(
*examples_filter.filter_with_siblings(list(dataset.examples), list(dataset.targets), list(dataset.names)),
dataset.name,
)
......@@ -25,12 +25,16 @@ class ImageFileDataset(LazyDataset[Path, TargetT], ABC):
pass
def open(self) -> ImageDataset:
return self.transform_examples(load_image)
return open_file_dataset(self)
def __len__(self):
return ilen(self.image_files)
def open_file_dataset(dataset: Dataset[Path, TargetT]) -> ImageDataset:
return dataset.transform_examples(load_image)
class ImageDirectoryDataset(ImageFileDataset[TargetT], ABC):
def __init__(self, images_dir: Path, name: str, extension: str = "jpg"):
super().__init__(name)
......
......@@ -7,7 +7,7 @@ from typing import Any, Dict, Iterable, List, Sequence, Tuple
import numpy as np
from memoized_property import memoized_property
from polystar.common.image_pipeline.image_pipeline import ImagePipeline
from polystar.common.models.image import Image, load_image
from polystar.common.models.image import Image, load_images
from research.common.datasets.roco.directory_roco_dataset import \
DirectoryROCODataset
from research.robots_at_robots.dataset.armor_value_dataset import \
......@@ -86,10 +86,11 @@ class ImagePipelineEvaluator:
def load_datasets(
datasets: List[DirectoryROCODataset], image_dataset_generator: ArmorValueDatasetGenerator
roco_datasets: List[DirectoryROCODataset], image_dataset_generator: ArmorValueDatasetGenerator,
) -> Tuple[List[Path], List[Image], List[Any], List[int]]:
dataset_sizes = [len(d) for d in datasets]
dataset = image_dataset_generator.from_roco_datasets(datasets)
dataset = image_dataset_generator.from_roco_datasets(roco_datasets)
dataset_sizes = [len(d) for d in dataset.datasets]
paths, targets = list(dataset.examples), list(dataset.targets)
images = list(map(load_image, paths))
images = list(load_images(paths))
return paths, images, targets, dataset_sizes
from unittest import TestCase
from polystar.common.filters.filter_abc import FilterABC
class OddFilter(FilterABC[int]):
def validate_single(self, n: int) -> bool:
return not n % 2
class TestFilterABC(TestCase):
def test_filter(self):
f = OddFilter()
numbers = [1, 2, 3, 4, 5, 6]
self.assertEqual([2, 4, 6], f.filter(numbers))
def test_filter_with_siblings(self):
f = OddFilter()
numbers = [1, 2, 3, 4, 5, 6]
names = list("abcdef")
squares = [1, 4, 9, 16, 25, 36]
f_numbers, f_names, f_squares = f.filter_with_siblings(numbers, names, squares)
self.assertEqual([2, 4, 6], f_numbers)
self.assertEqual(["b", "d", "f"], f_names)
self.assertEqual([4, 16, 36], f_squares)
def test_split(self):
f = OddFilter()
numbers = [1, 2, 3, 4, 5, 6]
self.assertEqual(([1, 3, 5], [2, 4, 6]), f.split(numbers))
def test_split_with_siblings(self):
f = OddFilter()
numbers = [1, 2, 3, 4, 5, 6]
names = list("abcdef")
squares = [1, 4, 9, 16, 25, 36]
(
(f_numbers_neg, f_names_neg, f_squares_neg),
(f_numbers_pos, f_names_pos, f_squares_pos),
) = f.split_with_siblings(numbers, names, squares)
self.assertEqual([2, 4, 6], f_numbers_pos)
self.assertEqual(["b", "d", "f"], f_names_pos)
self.assertEqual([4, 16, 36], f_squares_pos)
self.assertEqual([1, 3, 5], f_numbers_neg)
self.assertEqual(["a", "c", "e"], f_names_neg)
self.assertEqual([1, 9, 25], f_squares_neg)
def test_validate(self):
f = OddFilter()
numbers = [1, 2, 3, 4, 5, 6]
self.assertEqual(([False, True, False, True, False, True]), f.validate(numbers))
from pathlib import Path
from polystar.common.models.object import Armor
from research.robots_at_robots.dataset.armor_image_dataset_factory import ArmorImageDatasetGenerator
class ArmorColorDatasetGenerator(ArmorImageDatasetGenerator[str]):
task_name: str = "colors"
def _label_from_armor_info(self, armor: Armor, k: int, path: Path) -> str:
return armor.color.name
from pathlib import Path
from typing import Set
from polystar.common.models.object import Armor
from research.robots_at_robots.dataset.armor_image_dataset_factory import ArmorImageDatasetGenerator
class ArmorDigitDatasetGenerator(ArmorImageDatasetGenerator[int]):
task_name: str = "digits"
def __init__(self, acceptable_digits: Set[int]):
self.acceptable_digits = acceptable_digits
def _label_from_str(self, label: str) -> int:
return int(label)
def _label_from_armor_info(self, armor: Armor, k: int, path: Path) -> int:
return armor.number
def _valid_label(self, label: int) -> bool:
return label in self.acceptable_digits
import json
from abc import abstractmethod
from pathlib import Path
from typing import Iterable, Tuple, TypeVar
import cv2
from polystar.common.models.image import Image
from polystar.common.models.object import Armor
from polystar.common.utils.time import create_time_id
from research.common.dataset.directory_roco_dataset import DirectoryROCODataset
from research.common.image_pipeline_evaluation.image_dataset_generator import ImageDatasetGenerator
from research.robots_at_robots.dataset.armor_dataset_factory import ArmorDatasetFactory
T = TypeVar("T")
class ArmorImageDatasetGenerator(ImageDatasetGenerator[T]):
task_name: str
def from_roco_dataset(self, dataset: DirectoryROCODataset) -> Iterable[Tuple[Image, T]]:
if not (dataset.dataset_path / self.task_name / ".lock").exists():
self._create_labelized_armor_images_from_roco(dataset)
return self._get_images_paths_and_labels(dataset)
def _create_labelized_armor_images_from_roco(self, dataset):
dset_path = dataset.dataset_path / self.task_name
dset_path.mkdir(exist_ok=True)
for (armor_img, armor, k, path) in ArmorDatasetFactory.from_dataset(dataset):
label = self._label_from_armor_info(armor, k, path)
cv2.imwrite(str(dset_path / f"{path.stem}-{k}-{label}.jpg"), cv2.cvtColor(armor_img, cv2.COLOR_RGB2BGR))
(dataset.dataset_path / self.task_name / ".lock").write_text(
json.dumps({"version": "0.0", "date": create_time_id()})
)
def _get_images_paths_and_labels(self, dataset: DirectoryROCODataset) -> Iterable[Tuple[Image, T]]:
return (
(image_path, self._label_from_filepath(image_path))
for image_path in (dataset.dataset_path / self.task_name).glob("*.jpg")
if self._valid_label(self._label_from_filepath(image_path))
)
def _label_from_filepath(self, image_path: Path) -> T:
return self._label_from_str(image_path.stem.split("-")[-1])
@abstractmethod
def _label_from_armor_info(self, armor: Armor, k: int, path: Path) -> T:
pass
def _valid_label(self, label: T) -> bool:
return True
def _label_from_str(self, label: str) -> T:
return label
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment